RabbitMQ发布确认高级
前述
首先发布消息后进行备份在缓存里,如果消息成功发布确认到交换机,则从缓存里删除该消息,如果没有成功发布,则设置一个定时任务,重新从缓存里获取消息发布到交换机,直到成功发布到交换机。
确认机制图例:
实战
- 笔记代码已经上传到 Gitee
一个交换机:confirm.exchange
,一个队列:confirm.queue
,一个消费者:confirm.consumer
其中交换机类型时 direct
,与队列关联的 routingKey
是 confirm-key
代码架构图:
修改yaml配置
server:
port: 8081
spring:
application:
name: rabbitmq-basis
rabbitmq:
host: localhost
port: 5672
virtual-host: /ems
username: guest
password: guest
publisher-confirm-type: correlated
# publisher-confirm-type
# NONE 值是禁用发布确认模式,是默认值
# CORRELATED 值是发布消息成功到交换器后会触发回调方法
# SIMPLE 值经测试有两种效果
# 其一效果和 CORRELATED 值一样会触发回调方法
# 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果
## 根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
添加ConfirmConfig配置
/**
* @version 1.0.0
* @className: ConfirmConfig
* @description: 发布确认兜底方案
* @author: LiJunYi
* @create: 2022/8/4 14:54
*/
public class ConfirmConfig
{
private static final String CONFIRM_EXCHANGE_NAME = "confirm-exchange";
private static final String CONFIRM_QUEUE_NAME = "confirm-queue";
private static final String CONFIRM_ROUTING_KEY = "confirm-key";
/**
* 声明交换机
*
* @return {@link DirectExchange}
*/
@Bean("confirmExchange")
public DirectExchange directExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
/**
* 声明队列
*
* @return {@link Queue}
*/
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
/**
* 队列确认绑定交换
*
* @param queue 队列
* @param exchange 交换
* @return {@link Binding}
*/
@Bean
public Binding queueConfirmBindingExchange(@Qualifier("confirmQueue")Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
}
}
消息生产者
/**
* @version 1.0.0
* @className: ConfirmController
* @description: 消息发送
* @author: LiJunYi
* @create: 2022/8/4 15:01
*/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ConfirmController
{
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendConfirm/{msg}")
public void sendConfirmMessage(@PathVariable("msg")String msg)
{
// 声明回调的形参
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, msg,correlationData);
log.info("发送信息为:" + msg);
//指定消息 id 为 2
CorrelationData correlationData2 = new CorrelationData("2");
String CONFIRM_ROUTING_KEY = "key2";
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
CONFIRM_ROUTING_KEY,msg+"key2",correlationData2);
log.info("发送信息为:{}",msg+"key2");
}
}
消息消费者
/**
* @version 1.0.0
* @className: ConfirmConsumer
* @description: 消息的消费者,监听 confirm-queue 队列
* @author: LiJunYi
* @create: 2022/8/4 15:01
*/
@Component
@Slf4j
public class ConfirmConsumer
{
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message)
{
log.info("接收到的消息为: " + new String(message.getBody()));
}
}
消息生产者发布消息后的回调接口
只要生产者发布消息,交换机不管是否收到消息,都会调用该类的 confirm
方法
/**
* @version 1.0.0
* @className: MyCallBack
* @description: 消息生产者发布消息后的回调接口
* @author: LiJunYi
* @create: 2022/8/4 15:05
*/
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback
{
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 该注解会在其他注解执行完毕之后,进行一个属性的注入,必须将该类注入到rabbitTemplate的内部类中,内部类就是这个ConfirmCallback
*/
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交换机不管是否收到消息的一个回调方法
* 1. 发消息 交换机接收到了 回调
* @param correlationData 包含了消息的ID和其他数据信息 这个需要在发送方创建,否则没有
* @param ack 返回的一个交换机确认状态 true 为确认 false 为未确认
* @param cause 未确认的一个原因,如果ack为true的话,此值为null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
{
String id = correlationData != null ? correlationData.getId() : "";
if (ack){
log.info("消息发送成功,id 是{} ",id);
}else{
log.info("消息发送失败,原因 是{} id 为{}",cause,id);
}
}
}
模拟发送消息
浏览器访问接口:http://localhost:8081/confirm/sendConfirm/test20220804,观察控制台情况
可以看到,发送了两条消息,第一条消息的 RoutingKey
为 confirm-key
,第二条消息的 RoutingKey
为 key2
,两条消息都成功被交换机接收,也收到了交换机的确认回调.
但消费者只收到了一条消息,因为第二条消息的 RoutingKey
与队列的 BindingKey
不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。
丢弃的消息交换机是不知道的,需要解决告诉生产者消息传送失败。
回退消息
介绍
获取回退的消息,首先在配置文件开启该功能,然后需要自定义类实现 RabbitTemplate.ReturnsCallback
接口,并且初始化时,使用该自定义类作为回退消息的处理类,同时开启 Mandatory
,设置为 true
配置类文件开启
# 新版
spring:
rabbitmq:
template:
mandatory: true
# 旧版
spring:
rabbitmq:
mandatory: true
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
实战
修改配置文件
server:
port: 8081
spring:
application:
name: rabbitmq-puback
rabbitmq:
host: localhost
port: 5672
virtual-host: /ems
username: guest
password: guest
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
修改回调接口
- 旧版本实现
RabbitTemplate.ReturnCallback
接口 - 最新版实现
RabbitTemplate.ReturnsCallback
接口
/**
* @version 1.0.0
* @className: MyCallBack
* @description: 消息生产者发布消息后的回调接口
* @author: LiJunYi
* @create: 2022/8/4 15:05
*/
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback
{
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 该注解会在其他注解执行完毕之后,进行一个属性的注入,必须将该类注入到rabbitTemplate的内部类中,内部类就是这个ConfirmCallback
*/
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 交换机不管是否收到消息的一个回调方法
* 1. 发消息 交换机接收到了 回调
* @param correlationData 包含了消息的ID和其他数据信息 这个需要在发送方创建,否则没有
* @param ack 返回的一个交换机确认状态 true 为确认 false 为未确认
* @param cause 未确认的一个原因,如果ack为true的话,此值为null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
{
String id = correlationData != null ? correlationData.getId() : "";
if (ack){
log.info("消息发送成功,id 是{} ",id);
}else{
log.info("消息发送失败,原因 是{} id 为{}",cause,id);
}
}
/**
* 可以在消息传递过程中,如果交换机遇到不可路由的情况,会将消息返回给生产者
* returned#message 消息
* returned#replyCode 回复状态码
* returned#replyText 退回原因
* returned#exchange 交换机
* returned#routingKey 路由Key
* @param returned 返回
*/
@Override
public void returnedMessage(ReturnedMessage returned)
{
log.error("消息{},被交换机{}退回,退回原因:{},路由key:{}",
new String(returned.getMessage().getBody()),returned.getExchange(),
returned.getReplyText(),returned.getRoutingKey());
}
}
模拟发送消息
浏览器访问接口:http://localhost:8081/confirm/sendConfirm/test20220804,观察控制台情况
备份交换机
介绍
有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?
前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。 在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。
什么是备份交换机呢?
备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
实战
需要一个备份交换机 backup.exchange
,类型为 fanout
,该交换机发送消息到队列 backup.queue
和 warning.queue
代码架构图:
修改ConfirmConfig配置
/**
* @version 1.0.0
* @className: ConfirmConfig
* @description: 发布确认兜底方案
* @author: LiJunYi
* @create: 2022/8/4 14:54
*/
@Configuration
public class ConfirmConfig
{
public static final String CONFIRM_EXCHANGE_NAME = "confirm-exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm-queue";
public static final String CONFIRM_ROUTING_KEY = "confirm-key";
/**
* 备份交换机
*/
private static final String BACKUP_EXCHANGE_NAME = "backup-exchange";
/**
* 备份队列名称
*/
public static final String BACKUP_QUEUE_NAME = "backup-queue";
/**
* 警告队列名称
*/
public static final String WARNING_QUEUE_NAME = "warning-queue";
/**
* 声明交换机
*
* @return {@link DirectExchange}
*/
@Bean("confirmExchange")
public DirectExchange directExchange(){
// 确认交换机配置备份交换机 以确保宕机后将消息转发到备份交换机
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();
}
/**
* 声明队列
*
* @return {@link Queue}
*/
@Bean("confirmQueue")
public Queue confirmQueue(){
HashMap<String, Object> map = new HashMap<>(8);
return new Queue(CONFIRM_QUEUE_NAME,false,false,false,map);
}
/**
* 队列确认绑定交换
*
* @param queue 队列
* @param exchange 交换
* @return {@link Binding}
*/
@Bean
public Binding queueConfirmBindingExchange(@Qualifier("confirmQueue")Queue queue,
@Qualifier("confirmExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs();
}
/**
* 备份交换机的创建
*
* @return {@link FanoutExchange}
*/
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
/**
* 备份队列
*
* @return {@link Queue}
*/
@Bean("backupQueue")
public Queue backupQueue(){
HashMap<String, Object> map = new HashMap<>(8);
return new Queue(BACKUP_QUEUE_NAME,false,false,false,map);
}
/**
* 警告队列
*
* @return {@link Queue}
*/
@Bean("warningQueue")
public Queue warningQueue(){
HashMap<String, Object> map = new HashMap<>(8);
return new Queue(WARNING_QUEUE_NAME,false,false,false,map);
}
/**
* 备份队列绑定交换机
*
* @param queue 队列
* @param exchange 交换
* @return {@link Binding}
*/
@Bean
public Binding backupConfirmBindingExchange(@Qualifier("backupQueue")Queue queue,
@Qualifier("backupExchange")FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
/**
* 警告队列绑定交换机
*
* @param queue 队列
* @param exchange 交换
* @return {@link Binding}
*/
@Bean
public Binding warningConfirmBindingExchange(@Qualifier("warningQueue")Queue queue,
@Qualifier("backupExchange")FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
}
新增报警消费者
/**
* @version 1.0.0
* @className: WarningConsumer
* @description:
* @author: LiJunYi
* @create: 2022/8/4 15:41
*/
@Component
@Slf4j
public class WarningConsumer
{
/**
* 收到警告信息
*
* @param message 消息
*/
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message){
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:{}",msg);
}
}
如果启动 confirm.exchange
交换机有关错误,那么及就先到控制台删除confirm.exchange
交换机与confirm.queue
队列
模拟发送消息
浏览器访问接口:http://localhost:8081/confirm/sendConfirm/test20220804,观察控制台情况
Mandatory
参数与备份交换机可以一起使用的时候,如果两者同时开启,究竟谁优先级高?
经过上面代码示例结果,答案是备份交换机优先级高。