博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ入门高级特性
阅读量:3928 次
发布时间:2019-05-23

本文共 31956 字,大约阅读时间需要 106 分钟。

    上篇文章:

一、消息如何保障100%的投递成功?

1、什么是生产端的可靠性投递?

  保障消息的成功发出

  保障MQ节点的成功接收

  发送端收到MQ节点(Broker)确认应答

  完善的消息进行补偿机制(如网络问题没有返回确认应答)

2、可靠性投递的解决方案

方案一:消息入库,对消息状态进行打标(变更消息状态)。     

1.生产者将业务数据和消息入库,并设置消息状态为0,即初始待投递(可能涉及多个数据库,业务库,消息库等)

2.生产者将消息发送至MQ节点(Broker)

3.Broker向生产者发送确认

4.生产者收到Broker确认后修改消息状态为1,即消息投递成功

5.系统定时任务扫描未投递成功的消息(消息状态为0)

6.生产者将未投递的消息重发给Broker,并记录消息重发次数

7.当重发次数大于3(阈值自定义)时,此时修改消息状态为2,即消息投递失败。对于投递失败的消息启动补偿机制或者人工去处理失败消息。

存在的问题:在高并发场景下,每次要对业务数据和消息数据入库,数据库会遇到瓶颈,所以会采用方案二。

方案二:消息的延迟投递,做二次确认,回调检查。

在高并发的场景下,少做一次数据库持久操作,提高系统处理能力,故将业务和消息的持久化拆开。所以在高并发的场景下,消息就不要入库了,延迟投递,可以不保证首次100%的成功,但是一定要保证性能。

    

0.先将业务数据入库(一定等到业务数据入库之后再发送消息,Upstream serivce上游服务

1.生产者第一次向MQ节点(Broker)发送消息

2.生产者第二次向MQ节点(Broker)发送check延迟消息,一般按自己业务设为2min-5min

3.Consumer消费者从MessageQueue获取消息

4.Consumer成功消费消息后,会Broker发送确认消息(设其队列名为ConsumerQueueConfirm)

5.Callback Service服务监听ConsumerQueueConfirm,并将成功消费的消息入库MSG DB。

6.同时Callback Service服务监听checkdetailQueue(处理第2步发送check延迟消息),并去MSG DB查询该消息是否被成功消费。如果查询不到check message,则Callback Service服务向Upstream Service服务发送RPC请求,让其重发消息,设置重发次数,达到重发次数后,设置其为消费失败

7.人工处理因网络闪断或者业务问题产生的未成功消费消息,使系统消息投递几乎达到100%

 

二、如何保证消息的幂等性

首先,无论是RabbitMQ、RocketMQ还是kafka,都有可能出现消息的重复发送,这个是MQ无法保障的。

幂等性:就是相同条件下对一个业务的操作,不管操作多少次,结果都是一样。

消息的幂等性:就是即使我们收到多条一样的消息,永远也不会重复消费,即消息只会被消费一次。

例如:向支付宝发起支付请求,无论是由于网络问题无法收到请求结果而重新发起请求,或是前端的操作抖动而造成重复提交情况。支付宝只能扣一次钱。

1、RabbitMQ可能导致出现非幂等性的情况

1.可靠性消息投递机制:consumer回复confirm出现网络闪断,producer没有收到ack,定时任务轮询可能就会重新发送消息,这样consumer就会收到两条消息

2.MQ Broker与消费端传输消息的过程出现网络抖动

3.消费端故障或异常

2、解决方案

方案一:令牌机制,即唯一ID + 指纹码

原理就是利用数据库主键去重,业务完成后插入主键标识,使用ID进行分库分表算法路由,从单库的幂等性到多库的幂等性

1.这里唯一ID一般就是业务表的主键,比如商品ID

2.指纹码:每次操作都要生成指纹码,可以用时间戳+业务编号+...组成,目的是保证每次操作都是正常的

   

整体流程:

每次操作都生产一个唯一标记(统一ID生成服务),通过客户端传给服务器端,服务器端通过这个标记去查询数据库里面是否有该唯一标记,如果有就是重复消费了。

1.需要一个统一ID生成服务,为了保证可靠性,上游服务也要有个本地ID生成服务,然后发送消息给Broker

2.需要ID规则路由组件去监听消息,先入库,如果入库成功,证明没有重复,然后发给下游,如果发现库里面有了这条消息,就不发给下游

好处:整体实现相对简单

坏处:高并发下有数据库写入的性能瓶颈。

优化:跟进id进行分库分表进行算法的路由分压分流。

方案二:利用Redis的原子性去实现。

利用redis的原子操作,做个操作完成的标记,Redis的实现性能比较好,而且Redis一般使用集群,不用担心某台机器挂掉了,影响服务。但是也存在一些的问题:

我们是否需要进行数据落库,如果落库的话,怎么保证缓存和storage的一致性、事务,关键解决的问题是数据库和Redis操作如何做到原子性?如果不进行落库,那么都存储到缓存中,如何设置定时同步策略?

 

三、confirm确认消息

1、理解Confirm消息确认机制

  消息的确认:是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。

  生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障。

      

2、如何实现Confirm确认消息?

在生产端

    第一步:在channel上开启确认模式:channel.confirmSelect()

    第二步:在channel上添加确认监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或者记录日志等后续处理。

3、代码实现

1.生产端的代码

import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmListener;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;public class Producer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setHost("localhost");        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //4.指定消息的投递模式:确认模式        channel.confirmSelect();        //5.发布消息        String exchangeName = "test_confirm_exchange";        String routingKey = "test.confirm";        String msg = "hello rabbitmq consumer, test_direct-message: confirm";        for (int i = 0; i < 2; i++) {            /**             参数:             exchange -将消息发布到的交换机, 若为空字符串时,使用默认的交换机             routingKey -路由键             mandatory -如果要设置“强制性”标志,则为true             props -消息的其他属性-路由标头等             body -消息正`文             */            channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes());        }        //6.添加确认监听,来监听消息中间件的确认消息。        channel.addConfirmListener(new ConfirmListener() {            //当消息中间件接收到消息后,要执行的函数            // deliveryTag表示是Broker给每条消息指定的唯一ID(从1开始)            // multiple表示是否接收所有的应答消息,比如multiple=true时,发送100条消息成功过后,我们并不会收到100次handleAck方法调用。            @Override            public void handleAck(long deliveryTag, boolean multiple) throws IOException {                System.out.println("----ack----");                System.out.println("deliveryTag:"+ deliveryTag +" multiple:" + multiple);            }            //消息中间件出现异常(比如队列满了)后,要执行的函数            @Override            public void handleNack(long deliveryTag, boolean multiple) throws IOException {                System.out.println("-----no ack----");                System.out.println("deliveryTag:"+ deliveryTag +" multiple:" + multiple);            }        });        //7.释放资源        channel.close();        connection.close();    }}

2.消费端的代码

import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        connectionFactory.setHost("127.0.0.1");        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定        String queueName = "test_confirm_queue";        String exchangeName="test_confirm_exchange";        String exchangeType="direct";        String routingKey="test.confirm";        /**         参数:         queue -队列名称         durable -如果我们声明一个持久队列,则为true(该队列将在服务器重启后保留下来)         exclusive -如果我们声明一个排他队列,则为true(仅限此连接)         autoDelete -如果我们声明一个自动删除队列,则为true(服务器将在不再使用它时将其删除)         arguments -队列的其他属性(构造参数)         */        channel.queueDeclare(queueName, true, false, false, null);        /**         参数:         exchange -交易所名称         type -交易所类型         durable -如果我们声明持久交换,则为true(该交换将在服务器重启后保留下来)         autoDelete -如果服务器在不再使用交换机时应删除该交换机,则为true         arguments -用于交换的其他属性(构造参数)         */        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);        /**         参数:         queue -队列名称         exchange -交易所名称         routingKey -用于绑定的路由键         */        channel.queueBind(queueName, exchangeName, routingKey);        //5.通过channel把消费者和消息队列进行关联,获取消息进行处理        /**         参数:         queue -队列名称         autoAck-如果服务器应考虑一旦传递已确认的消息,则为true;如果服务器应该期望显式确认,则为false         callback -消费者对象的接口         */        boolean autoAck = true;        channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {            /**             参数:             consumerTag-与消费者相关联的消费者标签             envelope -消息的打包数据             properties -消息的内容头数据             body -消息正文(客户端特定的不透明字节数组)             */            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,                                       byte[] body) throws IOException {                System.out.println("------------consumer message-----------");                System.out.println("sonsumerTag:" + consumerTag);                System.out.println("envelope:" + envelope);                System.out.println("properties:" + properties);                System.out.println("msg:" + new String(body));            }        });    }}

    

四、Return消息机制

1、理解Return消息机制

Return Listener用于处理一些不可路由的消息。

1.我们的消息生产者,通过指定一个Exchange和Routingkey,把消息送到某一个队列中,然后我们的消费者监听队列,进行消息处理操作。

2.但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候我们需要监听这种不可达的消息,就要使用return listener。

在发送消息的basicPublish方法中有一个关键的配置项:

Mandatory:如果为true,则监听会接收到路由不可达的消息,然后进行后续处理,

          如果为false,那么broker端自动删除该消息。(默认false)

2、代码实现

在生产端,在channel上添加监听:addReturnListener,并指定mandatory为true。

1.生产端的代码

import com.rabbitmq.client.*;import java.io.IOException;public class Producer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setHost("localhost");        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //4.添加return监听        channel.addReturnListener(new ReturnListener() {            /**             * relaycode:表示中间件响应给浏览器的状态码             * relayText:表示状态码对应的文本。             * exchange:表示消息发布时对应的交换机名             * routingKey:表示的是路由键             * basicProperties:表示消息的属性,             * bytes:个表示消息的内容             */            @Override            public void handleReturn(int relaycode, String relayText, String exchange, String routingKey,                                     AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {                System.out.println("----------------handle return ----------------");                System.out.println("状态码:"+relaycode);                System.out.println("信息:"+relayText);                System.out.println("交换机:"+exchange);                System.out.println("路由键:"+routingKey);                System.out.println("属性:"+basicProperties);                System.out.println("消息内容:"+new String(bytes));            }        });        //5.发布消息        String exchangeName = "test_return_exchange2";        String routingKey = "test.return.#";        String routingKeyError="test.returnerror.#";        String msg = "hello rabbitmq consumer, test_return-message: return";        for (int i = 0; i < 2; i++) {            /**             参数:             exchange -将消息发布到的交换机, 若为空字符串时,使用默认的交换机             routingKey -路由键             mandatory -如果为true,则监听会接收到路由不可达的消息,然后进行后续处理,                  如果为false,那么broker端自动删除该消息。(默认false)             props -消息的其他属性-路由标头等             body -消息正`文             */            channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());            channel.basicPublish(exchangeName, routingKeyError, true, null, msg.getBytes());        }        //6.释放资源//        channel.close();//        connection.close();    }}

2.消费端的代码

import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        connectionFactory.setHost("127.0.0.1");        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定        String queueName = "test_return_queue2";        String exchangeName="test_return_exchange2";        String exchangeType="topic";        String routingKey="test.return.#";        channel.queueDeclare(queueName, true, false, false, null);        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);        channel.queueBind(queueName, exchangeName, routingKey);        //5.通过channel把消费者和消息队列进行关联,获取消息进行处理        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,                                       byte[] body) throws IOException {                System.out.println("------------consumer message-----------");                System.out.println("sonsumerTag:" + consumerTag);                System.out.println("envelope:" + envelope);                System.out.println("properties:" + properties);                System.out.println("msg:" + new String(body));            }        });    }}

    

五、RabbitMQ消费端自定义监听器

      我们使用自定义的Consumer更加的方便,解耦性更强,在实际工作中也最常用。 实现也计较简单,新建一个类继承DefaultConsumer,并重写其中一个handleDelivery方法即可。

1、代码实现,将四的例子中消费端使用自定义监听器使用,生产端不改变。

1.消费端自定义监听器,继承DefaultConsumer

import com.rabbitmq.client.*;import java.io.IOException;/** *  自定义消费端自定义监听器 */public class MyConsumer extends DefaultConsumer {    private Channel channel;    public MyConsumer(Channel channel) {        super(channel);        this.channel = channel;    }    @Override    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {        System.out.println("------------consumer message-----------");        System.out.println("sonsumerTag:" + consumerTag);        System.out.println("envelope:" + envelope);        System.out.println("properties:" + properties);        System.out.println("msg:" + new String(body));    }}

2.消费端的代码

import com.rabbitmq.client.*;public class Consumer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        connectionFactory.setHost("127.0.0.1");        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定        String queueName = "test_return_queue2";        String exchangeName="test_return_exchange2";        String exchangeType="topic";        String routingKey="test.return.#";        channel.queueDeclare(queueName, true, false, false, null);        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);        channel.queueBind(queueName, exchangeName, routingKey);        //5.通过channel把消费者和消息队列进行关联,获取消息进行处理        channel.basicConsume(queueName, true, new MyConsumer(channel));    }}

六、消费端限流

1、什么要对消费端限流

如果是高并发的场景下,RabbitMQ服务器上收到成千上万条消息,那么当打开消费者客户端时,会出现:这些巨量的消息必定会瞬时全部推送过来,但是我们单个客户端无法同时处理这么多数据,导致消费端消费不过来甚至挂掉都有可能。

当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。

注意:在非自动确认的模式下,可以采用限流模式。RabbitMQ 提供了一种 qos(服务质量保证)功能机制来控制一次消费消息数量,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。

2、理解限流的 API方法

    限流设置 - basicQos(prefetchSize,prefetchCount,global)

prefetchSize:0,单条消息大小限制,0代表不限制消息大小

prefetchCount:一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。

global:true/false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当设置为 false 时生效,设置为 true 时没有了限流功能,因为 channel 级别尚未实现。

注意:

1.不能设置自动签收功能(autoAck = false)

2.如果消息没被手动确认,就不会再给消费端发送消息,目的就是给消费端减压

 

3、代码实现消费端限流

生产端代码改变不多,主要操作集中在消费端。

1)设置具体的限流大小以及数量:channel.basicQos(0, 4, false);

2)关闭自动 ack:将 autoAck 设置为 false。channel.basicConsume(queueName, false, consumer);

3)在 handleDelivery 消费方法中手工ACK - basicAck()。void basicAck(Integer deliveryTag,boolean multiple)

手工ACK,调用这个方法就会主动回送给Broker一个应答,表示这条消息我处理完了,你可以给我下一条了。参数multiple表示是否批量签收,由于demo我是一次处理两条消息,所以设置为true。

1.生产端的代码

import com.rabbitmq.client.*;import java.io.IOException;public class Producer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setHost("localhost");        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //4.发布消息        String exchangeName = "test_qos_exchange";        String routingKey = "test.qos.#";        String msg = "hello rabbitmq consumer, test_qos-message: qos";        for (int i = 0; i < 5; i++) {            /**             参数:             exchange -将消息发布到的交换机, 若为空字符串时,使用默认的交换机             routingKey -路由键             mandatory -如果为true,则监听会接收到路由不可达的消息,然后进行后续处理,                  如果为false,那么broker端自动删除该消息。(默认false)             props -消息的其他属性-路由标头等             body -消息正文             */            channel.basicPublish(exchangeName, routingKey, false, null, (msg + i).getBytes());        }        //5.释放资源        channel.close();        connection.close();    }}

2.消费端的代码

import com.rabbitmq.client.*;public class Consumer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        connectionFactory.setHost("127.0.0.1");        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定        String queueName = "test_qos_queue";        String exchangeName = "test_qos_exchange";        String exchangeType = "topic";        String routingKey = "test.qos.#";        channel.queueDeclare(queueName, true, false, false, null);        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);        channel.queueBind(queueName, exchangeName, routingKey);        //5.通过channel把消费者和消息队列进行关联,获取消息进行处理        // 设置限流,并autoAck设置为 false        channel.basicQos(0, 2, false);        channel.basicConsume(queueName, false, new MyConsumer(channel));    }}

3.消费端自定义监听器,继承DefaultConsumer

import com.rabbitmq.client.*;import java.io.IOException;/** * 自定义消费端自定义监听器 */public class MyConsumer extends DefaultConsumer {    private Channel channel;    public MyConsumer(Channel channel) {        super(channel);        this.channel = channel;    }    @Override    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {        System.out.println("------------consumer message-----------");        System.out.println("sonsumerTag:" + consumerTag);        System.out.println("envelope:" + envelope);        System.out.println("properties:" + properties);        System.out.println("msg:" + new String(body));        try {            Thread.sleep(3000);        } catch (InterruptedException e) {            e.printStackTrace();        }        //手动签收消息        channel.basicAck(envelope.getDeliveryTag(), true);    }}

    

从图中发现 Unacked值一直都是 2 ,每过 3 秒 消费一条消息即 Ready 和 Total 都减少 2,而 Unacked的值在这里代表消费者正在处理的消息,通过demo发现了消费者一次性最多处理 2 条消息,达到了消费者限流的预期功能。

 

七、消费端手工ACK与NACK和重回队列

1、消费端的手工ACK与NACK

当我们设置 autoACK=true 时,即在自动ACK的情况下,只要Broker发消息给消费者了,这个消息就会从消息队列中移除,不会管消费端有没有异常。

当我们设置 autoACK=false 时,就可以使用手工ACK方式了,那么其实手工方式包括了手工ACK与NACK。

  • 如果我们手工 ACK 时,会发送给Broker一个应答,代表消息成功处理了,Broker就可以回送响应给生产端了。
  • 如果我们手工 NACK 时,则表示消息处理失败了,可以重回队列也可以丢弃。具体做什么操作可以通过异常类型进行判断。若重回队列,Broker端就会将没有成功处理的消息重新发送。

在消费端需要限流时,我们需要让消费端进行手工的ACK。在消费端消费消息出现异常时,我们要通知Broker消息有问题,这时我们可以手动NACK。

1.使用方式

消费端进行消费的时候,如果由于业务异常我们可以手工 NACK 并进行日志的记录,然后进行补偿!

方法:void basicNack(long deliveryTag, boolean multiple, boolean requeue)

如果由于服务器宕机等严重问题,那我们就需要手工进行 ACK 保障消费端消费成功!

方法:void basicAck(long deliveryTag, boolean multiple)

2、消费端的重回队列

消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker!

重回队列,会把消费失败的消息重新添加到队列的尾端,供消费者继续消费。

一般实在实际应用中不会设置重回队列这个属性(关闭重回队列-设置为false),我们都是自己去做补偿或者投递到延迟队列里的,然后指定时间去处理即可。

3、代码实现

1.生产端的代码

import com.rabbitmq.client.*;import java.io.IOException;public class Producer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setHost("localhost");        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //4.发布消息        String exchangeName = "test_ack_exchange";        String routingKey = "test.ack.#";        String msg = "hello rabbitmq consumer, test_ack-message: !";        for (int i = 0; i < 3; i++) {            channel.basicPublish(exchangeName, routingKey, false, null, (msg + i).getBytes());        }        //5.释放资源        channel.close();        connection.close();    }}

2.消费端的代码

import com.rabbitmq.client.*;public class Consumer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        connectionFactory.setHost("127.0.0.1");        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定        String queueName = "test_ack_queue";        String exchangeName = "test_ack_exchange";        String exchangeType = "topic";        String routingKey = "test.ack.#";        channel.queueDeclare(queueName, true, false, false, null);        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);        channel.queueBind(queueName, exchangeName, routingKey);        //5.通过channel把消费者和消息队列进行关联,获取消息进行处理        // 手工签收 必须要关闭 autoAck = false        channel.basicConsume(queueName, false, new MyConsumer(channel));    }}

3.自定义监听器,继承DefaultConsumer

import com.rabbitmq.client.*;import java.io.IOException;/** * 自定义消费端自定义监听器 */public class MyConsumer extends DefaultConsumer {    private Channel channel;    public MyConsumer(Channel channel) {        super(channel);        this.channel = channel;    }    @Override    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {        System.out.println("------------consumer message-----------");        System.out.println("sonsumerTag:" + consumerTag);        System.out.println("envelope:" + envelope);        System.out.println("properties:" + properties);        System.out.println("msg:" + new String(body));        try {            Thread.sleep(3000);        } catch (InterruptedException e) {            e.printStackTrace();        }        String msg = new String(body);        if (msg.indexOf("1") >= 0) {            //手动拒签消息NACK,第三个requeue参数表示是否对拒签的消息重回队列//            channel.basicNack(envelope.getDeliveryTag(), false, false);            channel.basicNack(envelope.getDeliveryTag(), false, true); // 重回队列        } else {            //手动签收消息ACK            channel.basicAck(envelope.getDeliveryTag(), false);        }    }}

    

消费端打印说明,消息1由于我们调用了NACK,并且设置了重回队列,所以会导致该条消息一直重复发送,消费端就会一直循环消费。

八、TTL队列

TTL是time to live的缩写,也就是生存时间。

RabbitMQ支持消息过期时间,在消息发送时可以进行指定。

RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息就会自动的清除。

RabbitMQ可以对消息和队列设置TTL. 有两种方法可以设置。也可以在RabbitMQ控制台设置(不推荐)

1、通过队列属性设置TTL

在队列属性中设置消息的TTL,那么队列中所有消息都有相同的过期时间,可以通过声明队列时设置 x-message-ttl参数。

2、对消息本身进行单独设置TTL

对每条消息本身单独设置TTL,每条消息TTL可以不同,一旦消息过期就会从队列中清除。

如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message, 消费者将无法再收到该消息。

3、代码实现

这里两种方法同时设置TTL,看效果

1.生产端的代码

import com.rabbitmq.client.*;import java.io.IOException;public class Producer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setHost("localhost");        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //5.发布消息        String exchangeName = "test_ttl_exchange";        String routingKey = "test.ttl.#";        String msg = "hello rabbitmq consumer, test_topic-message: ttl";        for (int i = 0; i < 2; i++) {            /**             参数:             exchange -将消息发布到的交换机, 若为空字符串时,使用默认的交换机             routingKey -路由键             mandatory -如果为true,则监听会接收到路由不可达的消息,然后进行后续处理,                  如果为false,那么broker端自动删除该消息。(默认false)             props -消息的其他属性-路由标头等             body -消息正`文             */            // 对消息本身进行单独设置TTL, 我这里两条消息TTL一样            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()                    .deliveryMode(2)                    .expiration("10000") // 设置TTL 10秒                    .build();            channel.basicPublish(exchangeName, routingKey, false, properties, msg.getBytes());        }        //7.释放资源        channel.close();        connection.close();    }}

2.消费端的代码

import com.rabbitmq.client.*;import java.io.IOException;import java.util.HashMap;import java.util.Map;public class Consumer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        connectionFactory.setHost("127.0.0.1");        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定        String queueName = "test_ttl_queue";        String exchangeName = "test_ttl_exchange";        String exchangeType = "topic";        String routingKey = "test.ttl.#";        // 通过队列属性设置TTL, 15秒        Map
arguments = new HashMap<>(); arguments.put("vhost", "/"); arguments.put("username","guest"); arguments.put("password", "guest"); arguments.put("x-message-ttl", 15000); /** 参数: queue -队列名称 durable -如果我们声明一个持久队列,则为true(该队列将在服务器重启后保留下来) exclusive -如果我们声明一个排他队列,则为true(仅限此连接) autoDelete -如果我们声明一个自动删除队列,则为true(服务器将在不再使用它时将其删除) arguments -队列的其他属性(构造参数) */ channel.queueDeclare(queueName, true, false, false, arguments); channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //5.不消费消息,让其超时 }}

    

两种方法同时使用,去最小TTL为10秒后超时。启动之后,监控RabbitMQ控制台,发现10秒后设置了消息超时时间的消息超时就被清除。

九、死信队列

DLX(Dead-Letter-Exchange),利用DLX,当消息在一个队列中变成死信之后,它能够被重新publish到另一个exchange,这个exchange就是DLX。

DLX也是一个正常的exchange,和一般的 exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的 exchange上去,进而被路由到另一个队列。

可以监听这个队列中消息做相应的处理。 

1、消息变成死信有以下几个情况:

  消息被拒绝(basic.reject/basic.nack)

  消息TTL过期

  队列达到最大的长度

2、死信队列的设置:

首先要设置死信队列的 exchange 和 queue,然后进行绑定:

Exchange:dlx.exchangeQueue:dlx.queueRoutingKey:#

然后进行正常声明交换机、队列、绑定,只不过需要在队列加上一个参数即可:

argument.put("x-dead-letter- exchange", "dlx.exchange");

这样消息在过期、requeue、队列在达到最大长度时,消息就可以直接路由到死信队列。

3、代码实现

1.生产端的代码

import com.rabbitmq.client.*;public class Producer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setHost("localhost");        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //5.发布消息        String exchangeName = "test_ttl_exchange";        String routingKey = "test.ttl.#";        String msg = "hello rabbitmq consumer, test_topic-message: ttl";        for (int i = 0; i < 5; i++) {            channel.basicPublish(exchangeName, routingKey, false, null, (msg + i).getBytes());        }        //7.释放资源        channel.close();        connection.close();    }}

2.消费端的代码

import com.rabbitmq.client.*;import java.io.IOException;import java.util.HashMap;import java.util.Map;public class Consumer {    public static void main(String[] args) throws Exception {        //1.创建工厂        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.setVirtualHost("/");        connectionFactory.setPort(5672);        connectionFactory.setHost("127.0.0.1");        //默认情况下为“ guest” /“ guest”,仅限本地主机连接        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        //2.通过工厂创建connection        Connection connection = connectionFactory.newConnection();        //3.创建channel对象        Channel channel = connection.createChannel();        //4.声明死信队列        channel.queueDeclare("dlx.queue", true, false, false, null);        channel.exchangeDeclare("dlx.exchange", "topic", true);        channel.queueBind("dlx.queue", "dlx.exchange", "#");        //5. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定        String queueName = "test_ttl_queue";        String exchangeName = "test_ttl_exchange";        String exchangeType = "topic";        String routingKey = "test.ttl.#";        // 通过队列属性设置TTL, 15秒        Map
arguments = new HashMap<>(); arguments.put("vhost", "/"); arguments.put("username", "guest"); arguments.put("password", "guest"); arguments.put("x-message-ttl", 15000); arguments.put("x-dead-letter-exchange", "dlx.exchange"); channel.queueDeclare(queueName, true, false, false, arguments); channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //5.通过channel把消费者和消息队列进行关联,获取消息进行处理 // 这里我们手工签收 必须要关闭 autoAck = false channel.basicConsume(queueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("------------consumer message-----------"); System.out.println("sonsumerTag:" + consumerTag); System.out.println("envelope:" + envelope); System.out.println("properties:" + properties); System.out.println("msg:" + new String(body)); String msg = new String(body); if (msg.indexOf("1") >= 0) { //手动拒签消息NACK,开启重回队列 channel.basicNack(envelope.getDeliveryTag(), false, false);// channel.basicReject(envelope.getDeliveryTag(), true); } else { //手动签收消息ACK channel.basicAck(envelope.getDeliveryTag(), false); } } }); }}

     

删除TTL创建的 exchange和 queue,然后运行 。上图说明,我这里模拟消息被拒绝(basic.reject/basic.nack)并且requeue=false或者true时分别运行了一下,共四次,四条消息都可以进去死信队列。

 

这里对RabbitMQ高级特性,做一整理,有些图来自网络,重点在知识点的掌握上。

参考文章:

 

—— Stay Hungry. Stay Foolish. 求知若饥,虚心若愚。

转载地址:http://qgdgn.baihongyu.com/

你可能感兴趣的文章
spark && Tachyon
查看>>
计算机科学不等于数学
查看>>
文件系统与NoSQL分布式存储技术对比
查看>>
rootkit技术
查看>>
调试寄存器(debug registers, DRx)理论及实践
查看>>
Linux下逻辑地址-线性地址-物理地址图解
查看>>
vim安装SrcExpl 插件,实现自动显示跳转函数及变量定义功能
查看>>
linux 版本中 i386/i686/x86-64/pcc 等... 的区别
查看>>
机器学习 | 台大林轩田机器学习基石课程笔记11 --- Linear Models for Classification
查看>>
机器学习 | 台大林轩田机器学习基石课程笔记12 --- Nonlinear Transformation
查看>>
线性代数 | (2) 矩阵Part Two
查看>>
机器学习 | 台大林轩田机器学习基石课程笔记13 --- Hazard of Overfitting
查看>>
机器学习 | 台大林轩田机器学习基石课程笔记14 --- Regularization
查看>>
机器学习 | 台大林轩田机器学习基石课程笔记15 --- Validation
查看>>
机器学习 | 台大林轩田机器学习基石课程笔记16 --- Three Learning Principles
查看>>
机器学习 | 台大林轩田机器学习技法课程笔记1 --- Linear Support Vector Machine
查看>>
机器学习 | 台大林轩田机器学习技法课程笔记2 --- Dual Support Vector Machine
查看>>
线性代数 | (3) 行列式
查看>>
学术英语 | (1) wordList1
查看>>
机器学习 | 台大林轩田机器学习技法课程笔记3 --- Kernel Support Vector Machine
查看>>