引言
RabbitMQ的模型是生产者发送信息到 Broker (代理),消费者从 Broker 中取出信息。但是生产者怎么知道消息是否真的发送到 Broker
中了呢?Broker 又怎么知道消息到底有没有被消费者消费?
如果由于网络原因出现故障,生产者生产的消息未到达 Broker 或者 Broker 的消息被虚假消费,而它们又不知道,就会产生很严重的问题,如重复消费等。
01 RabbitMQ的消息确认流程
从图中可以看出:
消息确认机制分为生产者确认和消费者确认
- ConfirmCallback 生产者
- ReturnCallback 生产者
- ACK 消费者
02 生产者确认
- 消息到达RabbitMQ的Exchange:Exchange向生产者发送Confirm确认。成功抑或失败都会返回一个confirmCallback
- 消息成功达到Exchange,但是从Exchange投递Queue失败:向生产者返回一个returnCallback。只有失败才会返回
03 消费者确认
- 消费者收到消息后需要对 RabbitMQ Server 进行消息 ACK 确认,RabbitMQ 根据确认信息决定是删除队列中的该信息还是重新发送
04 代码实现
4.1 生产者确认
重点在于生产者重写下面两个方法
- rabbitMQTemplate.setConfirmCallback
- rabbitMQTemplate.setReturnCallback
1.开启生产者消息确认
代码语言:txt复制spring:代码语言:txt复制 rabbitmq:代码语言:txt复制 host: localhost代码语言:txt复制 port: 5672代码语言:txt复制 virtual-host: /代码语言:txt复制 username: root代码语言:txt复制 password: root代码语言:txt复制 # 开启两个模式的生产者消息确认代码语言:txt复制 publisher-confirm-type: simple代码语言:txt复制 publisher-returns: true2.声明交换机、队列,绑定交换机和队列
代码语言:txt复制@Configuration代码语言:txt复制public class RabbitMQConfig {代码语言:txt复制 private static final String SB_TOPIC_EXCHANGE="sb_topic_exchange";代码语言:txt复制 private static final String SB_TOPIC_QUEUE="sb_topic_queue1";代码语言:txt复制 // 注入交换机 topic类型代码语言:txt复制 @Bean("topicExchange")代码语言:txt复制 public Exchange topicExchange(){代码语言:txt复制 return ExchangeBuilder.topicExchange(SB_TOPIC_EXCHANGE).durable(true)代码语言:txt复制 .autoDelete().build();代码语言:txt复制 }代码语言:txt复制 // 声明队列代码语言:txt复制 @Bean代码语言:txt复制 public Queue queue1(){代码语言:txt复制 return QueueBuilder.durable(SB_TOPIC_QUEUE).build();代码语言:txt复制 }代码语言:txt复制 // 绑定队列和交换机代码语言:txt复制 @Bean代码语言:txt复制 public Binding exchangQueue(@Qualifier("queue1") Queue queue, @Qualifier("topicExchange") Exchange exchange){代码语言:txt复制 return BindingBuilder.bind(queue).to(exchange).with("user.#").noargs();代码语言:txt复制 }3.创建消费者
代码语言:txt复制@Component代码语言:txt复制@RabbitListener(queues = "sb_topic_queue1")代码语言:txt复制public class Consumer {代码语言:txt复制 @RabbitHandler代码语言:txt复制 public void testPublishConfirm(String msg) {代码语言:txt复制 System.out.println("收到的信息:" msg);代码语言:txt复制 }代码语言:txt复制}4.创建生产者
创建生产者发送消息到消息队列,模拟两种异常情况
代码语言:txt复制@SpringBootTest代码语言:txt复制class RabiitmqSpringbootApplicationTests {代码语言:txt复制 @Autowired代码语言:txt复制 RabbitTemplate template;代码语言:txt复制 @Test代码语言:txt复制 void testConfirmTrue() {代码语言:txt复制 // 设置confirm回调函数代码语言:txt复制 template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {代码语言:txt复制 @Override代码语言:txt复制 public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {代码语言:txt复制 if (b) System.out.println("消息发送成功");代码语言:txt复制 else System.out.println("消息发送失败");代码语言:txt复制 }代码语言:txt复制 });代码语言:txt复制 // 模拟生产者发送信息--正常情况代码语言:txt复制 template.convertAndSend("sb_topic_exchange","user.info","日志级别:info;日志模块:user;日志信息:*****");代码语言:txt复制 }代码语言:txt复制 @Test代码语言:txt复制 void testConfirmFalse() {代码语言:txt复制 // 设置confirm回调函数代码语言:txt复制 template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {代码语言:txt复制 @Override代码语言:txt复制 public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {代码语言:txt复制 if (b) System.out.println("消息发送成功");代码语言:txt复制 else System.out.println("消息发送失败");代码语言:txt复制 }代码语言:txt复制 });代码语言:txt复制 // 模拟生产者发送信息代码语言:txt复制 // 不存在的交换机--异常情况代码语言:txt复制 template.convertAndSend("sb_topic_exchange_noexist","user.info","日志级别:info;日志模块:user;日志信息:*****");代码语言:txt复制 }代码语言:txt复制 @Test代码语言:txt复制 void testReturnFalse() {代码语言:txt复制 // 设置return回调函数代码语言:txt复制 template.setReturnCallback(new RabbitTemplate.ReturnCallback() {代码语言:txt复制 @Override代码语言:txt复制 public void returnedMessage(Message message, int i, java.lang.String s, java.lang.String s1, java.lang.String s2) {代码语言:txt复制 System.out.println(message.toString());代码语言:txt复制 System.out.println(s "*********");代码语言:txt复制 }代码语言:txt复制 });代码语言:txt复制 template.setMandatory(true);代码语言:txt复制 // 模拟生产者发送信息代码语言:txt复制 // 正确的交换机 错误的routekey -- 异常情况代码语言:txt复制 template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****");代码语言:txt复制 }4.2 消费者确认
重点在于消费者的下面两个方法
- channel.basicAck 消费者签收
- channel.basicNAck 消费者拒绝签收
1.开启消费者确认模式
代码语言:txt复制spring:代码语言:txt复制 rabbitmq:代码语言:txt复制 host: localhost代码语言:txt复制 port: 5672代码语言:txt复制 virtual-host: /代码语言:txt复制 username: root代码语言:txt复制 password: root代码语言:txt复制# 设置消费端手动签收代码语言:txt复制 listener:代码语言:txt复制 direct:代码语言:txt复制 acknowledge-mode: manual代码语言:txt复制 simple:代码语言:txt复制 acknowledge-mode: manual2.创建消费者
代码语言:txt复制/**代码语言:txt复制 * 注入消费者--手动签到
*/
@Component
@RabbitListener(queues = "sb_topic_queue1")
public class Consumer2 {代码语言:txt复制 @RabbitHandler代码语言:txt复制 public void testComsumer(String msg, Channel channel, Message message) throws InterruptedException, IOException {代码语言:txt复制 // 消费端设置手动签收代码代码语言:txt复制 try {代码语言:txt复制 System.out.println(msg);代码语言:txt复制 // 正常签收,mq收到此消息被正常签收后即可从队列中删除vi信息代码语言:txt复制 // 是哟了那个channel的方法代码语言:txt复制 // 第一个参数是deliverytag 标识哪条信息 第二个参数是是否批量签收代码语言:txt复制 // int i=2/0; 模拟异常代码语言:txt复制 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);代码语言:txt复制 System.out.println("消费者签收了该信息,服务器你可以删了");代码语言:txt复制 }catch (Exception e){代码语言:txt复制 // 异常拒绝签收,让mq重发此信息代码语言:txt复制 System.out.println("该信息丢了,给我重发");代码语言:txt复制 channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);代码语言:txt复制 // 该信息丢了,但是不需要你重发代码语言:txt复制 // channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);代码语言:txt复制 }代码语言:txt复制 }代码语言:txt复制}3.创建生产者
代码语言:txt复制@SpringBootTest代码语言:txt复制class RabiitmqSpringbootApplicationTests {代码语言:txt复制 @Autowired代码语言:txt复制 RabbitTemplate template;代码语言:txt复制 @Test代码语言:txt复制 void testConsumerAck() {代码语言:txt复制 template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****");代码语言:txt复制 }代码语言:txt复制}


