www.2527.com_澳门新葡8455手机版_新京葡娱乐场网址_
做最好的网站

代码实例,RabbitMQ基础入门

2019-05-25 10:27 来源:未知

在事先的一篇博客RabbitMQ入门:认知并安装RabbitMQ(以Windows系统为例)中,大家设置了RabbitMQ并且对其也部分初叶的认知,明天就来写个入门小例子来深化概念精通并问询代码怎么落实。

 

RabbitMQ是什么?

rabbitMQ是1款基于AMQP协议的音信中间件,它亦可在使用之间提供可信的音信传输,即选择和发送新闻。

本篇博客围绕上边多少个方面实行:

RabbitMQ是四个新闻中间件,在部分须求异步管理、发布/订阅等现象的时候,使用RabbitMQ可以成功大家的须要。 下边是自身在就学java语言实现RabbitMQ(自RabbitMQ官网的Tutorials)的1部分记下。

RabbitMQ专业原理

图片 1

其一类别架构图版权属于sunjun041640。
下边来介绍RabbitMQ里的片段宗旨概念,首要如下:
RabbitMQ Server:提供消息一条从Producer到Consumer的处理。
Exchange:壹边从发表者方接收消息,壹边把新闻推送到行列。
producer只好将音讯发送给exchange。而exchange担负将音讯发送到queues。Procuder Publish的Message进入了exchange,exchange会依据routingKey管理接收到的信息,判定音信是应当推送到钦点的行列照旧是两个连串,可能是直接忽略音讯。那么些规则是经过沟通机类型(exchange type)来定义的重中之重的type有direct,topic,headers,fanout。具体针对不同的情景使用区别的type。
queue也是通过那几个routing keys来做的绑定。调换机将会对绑定键(binding key)和路由键(routing key)进行精确相称,从而明确新闻该分发到哪个队列。
Queue:新闻队列。接收来自exchange的音信,然后再由consumer抽取。exchange和queue能够一对一,也得以一对多,它们的涉及通过routingKey来绑定。
Producer:Client A & B,生产者,音信的来源,新闻必须发送给exchange。而不是平素给queue
Consumer:Client 一,二,3顾客,直接从queue中拿走消息实行消费,而不是从exchange中取得音讯举办费用。
再有局部在上海体育地方中并未有显示,但在应用程序中会用到的定义。
Connection: 就是3个TCP的连接。Producer和Consumer都以由此TCP连接到RabbitMQ Server的。今后大家得以看出,程序的早先处便是建设构造那一个TCP连接。
Channels: **虚拟连接。它创造在上述的TCP连接中。数据流动都是在Channel中开始展览的。相当于说,一般情形是先后初叶创设TCP连接,第二步就是创设那一个Channel。
Bindings:绑定(binding)是指沟通机(exchange)和队列(queue)举办关联。可以省略明了为:那一个行列(queue)对这一个沟通机(exchange)的新闻感兴趣。

  1. 代码前的抵触热身
  2. 代码实例:Hello RabbitMQ
  3. 运转代码并调节和测试难题

先是有八个称呼掌握一下(以下图片来源rabbitMQ官方网址)

Hello,World

上边将从最轻便易行的采纳起来轻易询问一下RabbitMQ的施用。
生产者

import com.rabbitmq.client.*;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class Send {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //声明一个TCP连接
        Connection connection = factory.newConnection();
        //声明一个channel
        Channel channel = connection.createChannel();
        BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
        boolean flag = true;
        while(flag){
            System.out.print("往交换机中推送消息:");
            String message = input.readLine();
            if (message == "quit"){
                flag = false;
                continue;
            }
            //往exchange中推送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '"   message   "'");
        }
        channel.close();
        connection.close();
    }

}

消费者

import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages.");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '"   message   "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

第二列举一下多少个基本点注明的参数

  1. 队列注明queueDeclare
queueDeclare(String queue, boolean durable, boolean exclusive, boolean
 autoDelete, Map<String, Object> arguments)
  1. 劳动者发送新闻basicPublish
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
  1. 买主消费basicConsume
    消费者到行列中得到消息进行消费。autoAck=true表示顾客接受那么些新闻就能够向队列发送确认ack,表示本人收到了,队列接到报告后,就能把那条音讯从队列中除去,callback代表顾客收到音讯后,对音讯进行的拍卖。
basicConsume(String queue, boolean autoAck, Consumer callback)

上述两段代码,有弹指间几点要验证。
壹、从Send.java 代码中得以见到并不曾注解架构图中的exchange,而是方法basicPublish中采纳了空字符串 (""),那代码应用了暗中同意/佚名交流机。
二.默许交流机有一个不相同日常的地点,那就是生产者和消费者会公用2个类别。 在Send.java的basicPublish方法中,routingKey参数的地方大家给了二个队列名,暗中认可交换机会把推给她的信息一向再推到相应名称的连串中。在顾客中,注解了队列后,也无需和沟通机进行绑定。消费者会一直到对应队列获取音讯。所以大家的统一准备看起来如下所示(注意只是看起来):

图片 2

运作生产者和消费者,运维结果如下:

图片 3

图片 4

Now, Let's begin !

  • producer是用户使用担任发送音讯

轮询分发机制

在地点的运转结果中,唯有三个买主在开销队列中的信息,假诺有三个买主同一时候从那些队列中获得新闻举办拍卖,那么结果会是怎么样呢?
与此同时运营三个顾客,运行结果如下:
生产者

图片 5

第一个买主

图片 6

第2个顾客

图片 7

从运维结果中能够看看,RabbitMQ 会顺序循环的分发每一种Message。当每一个收到ack后,会将该Message删除,然后将下一个Message分发到下2个Consumer。这种分发格局叫做round-robin。

1、代码前的理论热身

图片 8

音信确认

此处就会有七个标题,在上述消费者代码中,我们设置autoAck=true,即消费者收到那条音讯就通报队列将其除去。然后才会去管理这条新闻。
若果consumer管理音讯的时候猛然挂了,音讯还一直不被成功,那么那条音信就能够熄灭。为了演示消息丢失,将消费者代码改成如下

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Created by sunyan on 17/8/1.
 */
public class Recv {
    private final static String QUEUE_NAME = "hello";


    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages.");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                try {
                    Thread.sleep(10000);
                    System.out.println(" [x] Received '"   message   "'");
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

在音信输出前,sleep伍秒,上边,运行五个顾客,并在出殡和埋葬第叁条音讯后,将第壹个买主关闭,

图片 9

图片 10

图片 11

从运营结果能够观望,一共丢失了两条新闻。为了不让新闻丢失,RabbitMQ
提供了音讯确认机制,consumer在收取到,试行完消息后会发送一个ack给RabbitMQ告诉它能够从queue中移除消息了。如若没接过ack。Rabbitmq会重新发送此条音讯,假若有其余的consumer在线,将会收到并费用那条音讯。新闻确认机制是暗许张开的。假诺想关闭只需求按如下设置 即可。

channel.basicConsume(QUEUE_NAME, false, consumer);

图片 12

图片 13

图片 14

从上海体育场地能够看看,第二个买主退出后,原本轮询分发给她的数据又给了第3个买主,并从未消息丢失。
假使管理完结后忘记发送ack,那么固然已经被拍卖的新闻,RabbitMQ不会将它删除,且在消费者退出后,将音讯持续发送给其余顾客举行双重管理。
把上段代码中的相应内容注释掉。

channel.basicAck(envelope.getDeliveryTag(), false);

图片 15

图片 16

图片 17

从下边3张图能够看出,已经管理过的新闻h一,h二又被拍卖了三次。忘记basicAck是3个科学普及的谬误,但结局是生死攸关的。因为RabbitMQ不能删除任何音信,将会花费越来越多的内部存储器。

笔者们来看张图:

  • queue是储存音讯的缓冲(buffer)

正义派遣

看看这里,你或许早已以为轮询分发有点难题了,要是二个顾客管理2个主题素材要十s,另二个甩卖只需弹指间,那么2个主顾将不唯有艰苦,另1个买主差不离不会做任何工作。但RabbitMQ不清楚哪些,还大概会平均分配音信。那是因为当新闻进入队列时,RabbitMQ只会分派新闻。它不看消费者的未确认新闻的数目。它只是不足为训地向第n个顾客发送每一个第n个消息。大家得以采纳basicQos方法与 prefetchCount = 1设置。那告诉RabbitMQ在顾客再次回到贰个拍卖消息的ack以前不要再给他发送新新闻。相反,它将发送到下贰个还不忙的消费者。
二个顾客设置

 Thread.sleep(10000);

另一个消费者不设置,让其立刻管理音信。运营结果如下

图片 18

图片 19

图片 20

从图中得以看来,原本应该有Recv管理的信息“hello三”,由Recv1管理了。

图片 21

图片 22

消息长久化

音信确认中讲了哪些保险固然消费者去世,音信也不会丢掉。但是只要RabbitMQ服务器甘休,音讯依旧会丢掉。
当RabbitMQ退出或崩溃时,它会遗忘队列和音讯,供给两件事来确认保证消息不会丢掉:大家需求将队列和音信标志为确实。
先是,大家要求确定保障RabbitMQ不会错过大家的序列。为了那样做,大家供给将其声称为悠久的:

channel.queueDeclare(QUEUE_NAME,durable,false,false,null);

就算这么些命令本身是毋庸置疑的,不过在我们当下的装置中是不行的。那是因为我们曾经定义了一个不耐用的名称叫hello的行列。RabbitMQ不允许你再一次定义具备分化参数的依存队列,并会向尝试实行此操作的别的程序再次回到错误。不过有二个飞快的消除办法

  • 用分歧的名目评释一(Wissu)个队列。

    channel.queueDeclare("hello_world",durable,false,false,null);

消息漫长化
亟需经过将MessageProperties(完结BasicProperties)设置为值PEQashqaiSISTENT_TEXT_PLAIN来标识大家的新闻。
计算一下,为了多少不丢掉,大家采纳了:

  1. 在数据管理终结后发送ack,这样RabbitMQ Server会感觉Message Deliver 成功。
  2. 持久化queue,能够免御RabbitMQ Server 重启大概crash引起的数码丢失。
  3. 悠久化Message,理由同上。

Publisher(生产者)生成音信,然后publish(发表)音信到exchange(路由器,也可能有材质翻译成沟通机),然后依据路由规则将音讯传递到Queue(队列),最后交由Consumer(消费者)进行消费管理。

  • consumer是用户选用担当接收消息

总结

在本篇中,首要介绍了须臾间暗许沟通机,在暗中认可调换机下,一条新闻只可以分发给三个顾客。那要是有多数两样的主顾都对这条音信的话,暗中认可交流机就不大概落到实处了?那么具体该怎么落到实处,就要RabbitMQ--交换机中实行求证。

那边的生产者和顾客都是我们的行使,因而咱们的代码中要贯彻那多少个部分。

图片 23

中等的节点正是RabbitMQ 提供的从头到尾的经过,须要再生产者和顾客里面调用其接口来定义和选拔这么些节点。

上边是自个儿使用rabbitMQ原生的jar包做的测试方法

 

maven pom.xml 加入

2、代码实例:Hello RabbitMQ

<dependency>

  1. 首先来兑现生产者,这里笔者并未有用Publisher做类名,而是用的Provider,未有特意的计划,就是在起名字的时候十分大心写成了那般,无需在意那些细节,O(∩_∩)O。

    package com.sam.hello_rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Provider {
    
        //定义队列名
        static String QUEUE_NAME = "helloRabbit";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                //1.创建连接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                //2.为通道声明队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
                //3.发布消息
                String msg = " hello rabbitmq, welcome to sam's blog.";
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                System.out.println("provider send a msg: "   msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                //4.关闭连接
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
    }
    

    在第3步中,channel.queueDeclare 用来创设队列,有5个参数:String queue, 队列名; boolean durable, 该队列是不是供给持久化; boolean exclusive,该队列是不是为该通道独占的(其余通道是不是可以费用该队列); boolean autoDelete,该队列不再选取的时候,是或不是让RabbitMQ服务器自动删除掉; Map<String, Object> arguments 别的参数。第1步中,channel.basicPublish 公布音讯(用在劳动者),有四个参数:String exchange, 路由器(有的资料翻译成沟通机)的名字,就要音讯发到哪个路由器; String routingKey, 路由键,即公布新闻时,该音信的路由键是什么样; BasicProperties props, 钦定音信的为主性子; byte[] body 音信体,也正是消息的内容,是字节数组。 大概您会纳闷,为啥一贯不exchange呢?因为只要申明了队列,能够不表明路由器。

  2. 接着来贯彻消费者,消费者完毕和生产者进度大约,可是在此处并从未关闭连接和通道,是因为要消费者直接等待随时大概发来的音讯。代码如下:

    package com.sam.hello_rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class HelloConsumer {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.创建连接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.为通道声明队列
                channel.queueDeclare(Provider.QUEUE_NAME, false, false, false, null);
                System.out.println(" **** keep alive ,waiting for messages, and then deal them");
                // 3.通过回调生成消费者
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                        //获取消息内容然后处理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** HelloConsumer"   " get message :["   msg  "]");
                    }
                };
    
                //4.消费消息
                channel.basicConsume(Provider.QUEUE_NAME, true, consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    

    在第五步中,channel.basicConsume 用来接受音信,用在消费者,有3个参数:String queue, 队列名字,即要从哪些队列中接受消息; boolean autoAck, 是还是不是自动确认,暗中同意true; Consumer callback 消费者,即哪个人收到音信。

    <groupId>com.rabbitmq</groupId>

 

    <artifactId>amqp-client</artifactId>

3、运转代码并调整难题

    <version>3.5.6</version>

代码写好了,接下去进行测试,

</dependency>

  1. 先来试行下Provider.java,开掘报错了:

    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:124)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:120)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:142)
        at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:952)
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
        at com.sam.hello_rabbitmq.Provider.main(Provider.java:36)
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136)
        ... 3 more
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
        at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:509)
        at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340)
        at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
        at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)
        at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:643)
        at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581)
        at java.lang.Thread.run(Thread.java:745)
    Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
        at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:345)
        at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:286)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:600)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:527)
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68)
        at com.sam.hello_rabbitmq.Provider.main(Provider.java:60)
    
    关键堆栈信息是:inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true',说是helloRabbit这个队列durable(是否需要持久化)
    参数已经设定成了true 但是代码中指定的是false,冲突了,纳尼?访问RabbitMQ管理页面:http://localhost:15672/#/queues 发现已经存在一个队列helloRabbit,
    

    图片 24

    点helloRabbit的链接,开采队列的durable属性确实是true。哦,原来自家在此之前在做别的练习的时候,创制过1个叫这么些名字的行列了,而且属性值刚好为true.

    图片 25

    那么接下去删掉这些既存的行列

    图片 26

    再去推行Provider.java,后台打印了剧情,并且队列中有了一条ready的音讯。图片 27图片 28

    主题材料化解!

  2. 举行HelloConsumer.java,预想的结果是在开发银行后,调控台直接打字与印刷出log并且RabbitMQ管理页面未有ready的音讯:图片 29

    图片 30

    结果符合预期。

主意达成暗意图

到此,全体干活左右逢源停止。

图片 31

 

发送音讯方法(Send.java)

 1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 
 5 public class Send {
 6      
 7     private static final String QUEUE_NAME = "hello";
 8  
 9     public static void main(String[] args) throws Exception {
10         ConnectionFactory factory = new ConnectionFactory();
11         factory.setHost("192.168.1.7");
12         factory.setPort(5672);
13         factory.setUsername("admin");
14         factory.setPassword("admin");
15         Connection connection = factory.newConnection();
16         Channel channel = connection.createChannel();
17  
18         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
19         String message = "Hello World!";
20         // "" 表示默认exchange
21         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
22         System.out.println(" [x] Sent '"   message   "'");
23  
24         channel.close();
25         connection.close();
26     }
27  
28 }

10~16行 是获取rabbitmq.client.Channel, rabbitMQ的API操作基本都以经过channel来形成的。

18行 channel.queueDeclare(QUEUE_NAME, false, false, false, null),这里channel评释了3个名字叫“hello”的queue,证明queue的操作是幂等的,也正是说唯有不设有同样名称的queue的情景下才会创建二个新的queue。

21行 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()),chaneel在那些queue里公布了音信(字节数组)。

24~二5行 则是链接的闭馆,注意关闭顺序就好了。

收受新闻方法 (Recv.java)

 1 import com.rabbitmq.client.AMQP;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5 import com.rabbitmq.client.Consumer;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 
 9 import java.io.IOException;
10 
11 public class Recv {
12 
13   private final static String QUEUE_NAME = "hello";
14 
15   public static void main(String[] argv) throws Exception {
16     ConnectionFactory factory = new ConnectionFactory();
17     factory.setHost("192.168.1.7");
18     factory.setPort(5672);
19     factory.setUsername("admin");
20     factory.setPassword("admin");
21     Connection connection = factory.newConnection();
22     Channel channel = connection.createChannel();
23 
24     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25     System.out.println(" [*] Waiting for messages. To exit press CTRL C");
26 
27     Consumer consumer = new DefaultConsumer(channel) {
28       @Override
29       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
30           throws IOException {
31         String message = new String(body, "UTF-8");
32         System.out.println(" [x] Received '"   message   "'");
33       }
34     };
35     channel.basicConsume(QUEUE_NAME, true, consumer);
36   }
37 }

16~2二行 和Send类中一样,也是获得同三个rabbitMQ服务的channel,那也是能经受到信息的根底。

二四行 同样注明了多个和Send类中宣布的queue一样的queue。

27~35行 DefaultConsumer类完成了Consumer接口,由于推送新闻是异步的,由此在这里提供了一个callback来缓冲接受到的消息。

先运维Recv 然后再运营Send,就能够见到音讯被接受输出到调整台了,假设多运营多少个Recv,会发觉消息被各样顾客按梯次分别消费了,

那也正是rabbitMQ暗中认可使用Round-robin dispatching(轮询分发机制)。

 

Work queues

地方轻易的贯彻了rabbitMQ音讯的发送和承受,可是无论是Send类中的queueDeclare 、basicPublish方法还会有Recv类中的basicConsume方法都有为数十分多的参数,

上边大家深入分析一下多少个关键的参数。

(①)Message acknowledgment 音信答复

上面Recv.java的第35行中,channel.basicConsume(QUEUE_NAME, true, consumer),

在Channel接口中定义为 String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

这几个autoAck大家脚下促成为true,表示服务器会自行确认ack,一旦RabbitMQ将1个新闻传递到consumer,它立时会被标识为除去状态。

诸如此类一旦consumer在符合规律施行职分进度中,一旦consumer服务挂了,那么大家就恒久的失去了那几个consumer正在管理的保有音信。

为了防止万一这种场合,rabbitMQ协理Message acknowledgment,当信息被1个consumer接受并拍卖完毕后,consumer发送给rabbitMQ二个回执,然后rabbitMQ才会去除那几个新闻。

当二个消息挂了,rabbitMQ会给别的可用的consumer继续发送上个consumer因为挂了而未有管理成功的新闻。

之所以咱们能够设置autoAck=false,来展现的让服务端做音讯成功推行的承认。

(二)Message durability 新闻悠久化

Message acknowledgment 确认保证了consumer挂了的状态下,新闻还足以被其它consumer接受管理,可是借使rabbitMQ挂了吗?

在表明队列的艺术中,Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

durable=true 意味着该队列就要服务注重启后继续存在。Send和Recv三个类中宣称队列的主意都要安装durable=true。

现行反革命,大家要求将消息标记为持久性——通过将MessageProperties(它达成BasicProperties)设置为PE昂科拉SISTENT_TEXT_PLAIN

(三)Fair dispatch 公平分发

rabbitMQ私下认可是轮询分发,那样对多个consumer来讲,也许就能冒出负载不均衡的题目,无论是职分自己难易度,依旧consumer管理技巧的不一致,都以导致这种主题素材。

为了处理这种状态大家得以接纳basicQos格局来设置prefetchCount = 1。 那告诉rabbitMQ一次只给consumer一条音讯,换句话来讲,正是停止consumer发回ack,然后再向这几个consumer发送下一条音信。

int prefetchCount = ``1``;

channel.basicQos(prefetchCount);

图片 32

多亏因为Fair dispatch是基于ack的,全体它最棒和Message acknowledgment同不常间接选举用,不然在autoAck=true的情况下,单独设置Fair dispatch并不曾意义。

上面是自个儿测试以上二种情景的测试代码,能够间接选用。

import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.1.7");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        boolean durable = true;    //消息持久化
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

        // 多个消息使用空格分隔
        Scanner sc = new Scanner(System.in);
        String[] splits = sc.nextLine().split(" ");
        for (int i = 0; i < splits.length; i  ) {
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, splits[i].getBytes());
                System.out.println(" [x] Sent '"   splits[i]   "'");
        }

        channel.close();
        connection.close();
    }
}

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {

  private final static String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.7");
    factory.setPort(5672);
    factory.setUsername("admin");
    factory.setPassword("admin");
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL C");

    // basicQos方法来设置prefetchCount = 1。 这告诉RabbitMQy一次只给worker一条消息,换句话来说,就是直到worker发回ack,然后再向这个worker发送下一条消息。
    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '"   message   "'");
        try {
          doWork(message);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    // 当consumer确认收到某个消息,并且已经处理完成,RabbitMQ可以删除它时,consumer会向RabbitMQ发送一个ack(nowledgement)。
    boolean autoAck = true;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

    protected static void doWork(String message) throws InterruptedException {
        for (char ch: message.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }
}

 

发布/订阅(Publish/Subscribe)

一个完好的rabbitMQ音讯模型是会有Exchange的。

rabbitMQ的信息模型的宗旨情想是producer永世不会平素发送任何音信到queue中,实际上,在大多景况下producer根本不晓得一条音信是还是不是被发送到了哪些queue中。

在rabbitMQ中,producer仅仅将新闻发送到八个exchange中。要掌握exchange也特别轻松,它一面担负接收producer发送的讯息, 另三头将音讯推送到queue中。

exchange必须精通的精通在接收新闻之后该怎样开始展览下一步的管理,例如是还是不是合宜将那条音信发送到某些queue中? 仍旧应当发送到七个queue中?如故应当一贯废弃那条新闻等。

exchange模型如下:

图片 33

exchange类型也是有有个别种:directtopicheaders以及fanout。

Fanout exchange

上边大家来创设贰个fanout花色的exchange,以文害辞,fanout会向装有的queue广播全部接收的音讯。

 1 import java.io.IOException;
 2 import java.util.Scanner;
 3 import java.util.concurrent.TimeoutException;
 4 
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8 
 9 import rabbitMQ.RabbitMQTestUtil;
10 
11 public class EmitLog {
12      
13     private static final String EXCHANGE_NAME = "logs";
14  
15     public static void main(String[] argv) throws IOException, TimeoutException {
16  
17         ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
18         Connection connection = factory.newConnection();
19         Channel channel = connection.createChannel();
20         
21         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
22  
23         // 多个消息使用空格分隔
24         Scanner sc = new Scanner(System.in);
25         String[] splits = sc.nextLine().split(" ");
26         for (int i = 0; i < splits.length; i  ) {
27                  channel.basicPublish(EXCHANGE_NAME, "", null, splits[i].getBytes());
28              System.out.println(" [x] Sent '"   splits[i]   "'");
29         }
30  
31         channel.close();
32         connection.close();
33     }
34 }

 

 1 import java.io.IOException;
 2 
 3 import com.rabbitmq.client.AMQP;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.ConnectionFactory;
 7 import com.rabbitmq.client.Consumer;
 8 import com.rabbitmq.client.DefaultConsumer;
 9 import com.rabbitmq.client.Envelope;
10 
11 import rabbitMQ.RabbitMQTestUtil;
12 
13 public class ReceiveLogs {
14 
15     private static final String EXCHANGE_NAME = "logs";
16      
17       public static void main(String[] argv) throws Exception {
18         ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
19           Connection connection = factory.newConnection();
20           Channel channel = connection.createChannel();
21      
22         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
23         String queueName = channel.queueDeclare().getQueue();
24         channel.queueBind(queueName, EXCHANGE_NAME, "");
25      
26         Consumer consumer = new DefaultConsumer(channel) {
27           @Override
28           public void handleDelivery(String consumerTag, Envelope envelope,
29                                      AMQP.BasicProperties properties, byte[] body) throws IOException {
30             String message = new String(body, "UTF-8");
31             System.out.println(" [x] Received '"   message   "'");
32           }
33         };
34         channel.basicConsume(queueName, true, consumer);
35       }
36 }

Direct exchange

在fanout的exchange类型中,音信的揭发已经队列的绑定方法中,routingKey参数都以暗中认可空值,因为fanout类型会一贯忽略那几个值,

可是在其余exchange类型中它抱有很重大的含义,``

图片 34      

rabbitMQ援救上述三种绑定,新闻在发表的时候,会钦赐三个routing key,而图一中exchange会把routing key为orange发送的消息将会被路由到queue Q1中,使用routing key为black或者green的将会被路由到Q2中。

图片 35

将五个queue使用同1的binding key进行绑定也是卓有成效的。能够在X和Q第11中学间增加八个routing key black。 它会向全部相称的queue进行播放,使用routing key为black发送的音信将会同不经常间被Q1Q2接收。

 上面是自身测试debug和error三种routing key公布音信并收受管理音讯的代码:

图片 36图片 37

import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import rabbitMQ.RabbitMQTestUtil;

public class EmitLog {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException, TimeoutException {

            ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
            Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 多个消息使用空格分隔
        Scanner sc = new Scanner(System.in);
        String[] splits = sc.nextLine().split(" ");
        for (int i = 0; i < splits.length; i  ) {
                 channel.basicPublish(EXCHANGE_NAME, splits[i], null, splits[i].getBytes());
             System.out.println(" [x] Sent '"   splits[i]   "'");
        }

        channel.close();
        connection.close();
    }
}

View Code

图片 38图片 39

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import rabbitMQ.RabbitMQTestUtil;

public class ReceiveLogsDebug {

    private static final String EXCHANGE_NAME = "direct_logs";

      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
          Connection connection = factory.newConnection();
          Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "debug");

        System.out.println(" [*] Waiting for messages. To exit press CTRL C");

        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '"   message   "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
}

View Code

图片 40图片 41

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import rabbitMQ.RabbitMQTestUtil;

public class ReceiveLogsError {

    private static final String EXCHANGE_NAME = "direct_logs";

      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
          Connection connection = factory.newConnection();
          Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println(" [*] Waiting for messages. To exit press CTRL C");

        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '"   message   "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
}

View Code

发送输入:

图片 42

debug接受:

图片 43

error接受:

图片 44

 

Topic exchange

发送到topic exchange中的新闻不能够有三个任性的routing_key——它必须是1个使用点分隔的单词列表。单词能够是轻便的。一些实用的routing key例子:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。

routing key的长短限制为253个字节数。

binding key也非得是平等的款式。topic exchange背后的逻辑类似于direct——一条选择一定的routing key发送的消息将会被传送至具备应用与该routing key同样的binding key举行绑定的队列中。 但是,对binding key来讲有二种独特的图景:

  1. *(star)能够代替任意三个单词
  2. #(hash)能够取代0个或多少个单词

图片 45

和Direct exchange大致,代码就不copy了,有意思味的直白看看教程

 

TAG标签:
版权声明:本文由澳门新葡8455手机版发布于计算机编程,转载请注明出处:代码实例,RabbitMQ基础入门