当前位置:首页 » 《随便一记》 » 正文

RabbitMQ系列【8】消息可靠性之ACK机制

23 人参与  2022年11月15日 13:53  分类 : 《随便一记》  评论

点击全文阅读


有道无术,术尚可求,有术无道,止于术。

文章目录

前言自动确认1. 配置2. 演示 手动确认1. 配置2. 代码3. 测试

前言

在之前分析了对于生产者来说,可以使用消息发布确认及退回机制,保证消息被成功发送到MQ中。

但对于消费者来说,消息传递过来,可能会丢失,也有可能接收到消息,但还未处理完,发生宕机或者异常,导致消息没有被成功消费。

为了保证消息在消费过程中的可靠性,RabbitMQ 引入消息确认机制(ACK(Acknowledge)),消费者在接收到消息并且处理该消息之后,告诉RabbitMQ 它已经处理,RabbitMQ 再讲该消息删除。

消费端收到消息后的确认方式有三种:

自动确认:当消息一旦被消费者接收到,则自动确认收到,并将相应消息从 RabbitMQ的消息缓存中移除

手动确认:将消息分发给了消费者,并且只有当消费者处理完成了整个消息之后才会被认为消息传递成功了,然后才会将内存中的消息删除。

根据异常情况确认:根据侦听器检测是正常返回、还是抛出异常来确认

自动确认

其中自动确认是指,当消息一旦被消费者接收到,则自动确认收到,并将相应其从 RabbitMQ的消息缓存中移除。

但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。实际开发时,不推荐使用这种方式。

1. 配置

添加配置,设置 acknowledge-modenone,该配置项共有三种:

none:自动确认manual:手动确认auto:根据异常情况确认
spring:  rabbitmq:    username: guest    password: guest    host: localhost    port: 5672    # 消息监听器配置    listener:      # 消息监听容器类型,默认 simple      type: simple      simple:        # 消息确认模式,none、manual和auto        acknowledge-mode: none        # 应用启动时是否启动容器,默认true        auto-startup: true        # listener最小消费者数        concurrency: 10        # listener最大消费者数        max-concurrency: 100        # 一个消费者最多可处理的nack消息数量        prefetch: 10        # 被拒绝的消息是否重新入队,默认true        default-requeue-rejected: true        # 如果容器声明的队列不可用,是否失败;或如果在运行时删除一个或多个队列,是否停止容器,默认true        missing-queues-fatal: true        # 空闲容器事件应多久发布一次        idle-event-interval: 10        # 重试配置        retry:          # 是否开启消费者重试,默认false          enabled: true          # 第一次和第二次尝试发送消息的时间间隔,默认1000ms          initial-interval: 1000ms          # 最大重试次数,默认3          max-attempts: 3          # 最大重试间隔,默认10000ms          max-interval: 10000ms          # 应用于前一个重试间隔的乘数          multiplier: 1          # 重试是无状态还是有状态,默认true          stateless: true

2. 演示

添加一个消费者,接收到消息后抛出异常,模拟没有正常消费:

@Componentpublic class RabbitConsumer {    @RabbitListener(queues = {"bootQueue"})    public void rabbitListener(Message message) {        System.out.println("收到消息===" + message);        // 发生异常        int i=5/0;    }}

直接发送一条消息:

@SpringBootTestpublic class MqTest {    @Autowired    private RabbitTemplate rabbitTemplate;    @Test    public void testRabbitPub() {        rabbitTemplate.convertAndSend("bootExchange", "boot.key", "HELLO SPRING BOOT");    }}

运行程序,可以看到发生了异常:
在这里插入图片描述
由于开启了重试机制,异常时,会进行重试消费:
在这里插入图片描述
查看控制台,发现消息没有被成功消费,但是 RabbitMQ已经将该消息移除。
在这里插入图片描述

手动确认

手动确认只有当消费正确消费掉之后,再手动告诉RabbitMQ该消息已经被成功接收并消费,这时RabbitMQ才会将消息从队列中删除掉。

1. 配置

设置acknowledge-modemanual

spring:  rabbitmq:    # 省略....    # 消息监听器配置    listener:      # 消息监听容器类型,默认 simple      type: simple      simple:        # 消息确认模式,none、manual和auto,默认auto        acknowledge-mode: manual

2. 代码

如果消息成功处理,需要调用channel.basicAck()方法进行签收:

void basicAck(long deliveryTag, boolean multiple) throws IOException {}

basicAck()方法需要两个参数:

deliveryTag(唯一标识 ID):当一个消费者向RabbitMQ 注册后,会建立起一个 Channel ,向消费者推送消息,这个方法携带了一个deliveryTag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag的范围仅限于当前 Channelmultiple:为了减少网络流量,手动确认可以被批处理,当该参数为true时,则可以一次性确认 deliveryTag小于等于传入值的所有消息

如果消息处理失败,调用channel.basicNack()方法拒绝签收:

 public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {}

basicNack()方法需要三个参数:

deliveryTag:同basicAckmultiple:同basicAckrequeue:重回队列。如果设置为true,则消息重新回到queue,服务端会重新发送该消息给消费端

消费者代码如下:

@Componentpublic class RabbitConsumer {    @RabbitListener(queues = {"bootQueue"})    public void receiveMessage(Message message, Channel channel) throws IOException {        // 当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel        long deliveryTag = message.getMessageProperties().getDeliveryTag();        try {            System.out.println("收到消息===" + new String(message.getBody()));            System.out.println("处理业务逻辑");            // 发生异常            // int i = 5 / 0;            // 处理完成后,确认            channel.basicAck(deliveryTag, true);        } catch (Exception e) {            // 发生异常,拒绝签收            e.printStackTrace();            channel.basicNack(deliveryTag, true, true);        }    }}

3. 测试

没有异常时,消息被成功消费:
在这里插入图片描述
打开异常代码注释,运行程序,此时控制台显示有一个消息未被确认状态:
在这里插入图片描述
并且程序一直在死循环接收=》拒绝签收=》返回队列=》接收=》
在这里插入图片描述
死循环问题是因为,在basicNack方法中我们设置了重回队列,这样会有问题,一般需要设置为不重回到队列:

channel.basicNack(deliveryTag, true, false);

点击全文阅读


本文链接:http://m.zhangshiyu.com/post/48522.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1