Redis发布订阅模式
Redis发布订阅概述
Redis
发布订阅 (publish/subscribe
) 是一种消息通信模式:发送者pub
发送消息,订阅者 (sub
接收消息。
PUBLISH
命令向通道发送信息,此客户端称为publisher发布者
;SUBSCRIBE
向命令通道订阅信息,此客户端称为subscriber订阅者
;Redis
中 发布订阅模块的名字叫着PubSub
,也就是PublisherSubscriber
;- 一个发布者向一个通道发送消息,订阅者可以向多个通道订阅消息;当发布者向通道发布消息后,如果有订阅者订阅该通道,订阅者就会收到消息。
命令 | 描述 |
---|---|
Redis Unsubscribe 命令 | 指退订给定的频道。 |
Redis Subscribe 命令 | 订阅给定的一个或多个频道的信息。 |
Redis Pubsub 命令 | 查看订阅与发布系统状态。 |
Redis Punsubscribe 命令 | 退订所有给定模式的频道。 |
Redis Publish 命令 | 将信息发送到指定的频道。 |
Redis Psubscribe 命令 | 订阅一个或多个符合给定模式的频道。 |
命令演示
订阅端
127.0.0.1:6379> SUBSCRIBE channel-1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel-1"
3) (integer) 1
1) "message" # 消息
2) "channel-1" # 频道
3) "hello,redis" # 消息具体内容
发送端
127.0.0.1:6379> PUBLISH channel-1 "hello,redis"
(integer) 1
127.0.0.1:6379>
Redis发布订阅模式与消息中间件的对比
redis
- 轻量级,低延迟,高并发,低可靠性;
rabbitmq
- 重量级,高可靠,异步,不保证实时;
SpringBoot整合Redis实现发布订阅模式
订阅者接受消息的接口
/**
* description: 订阅者接受消息的通用接口
**/
@Component
public interface RedisMsg {
/**
* Redis订阅者接受消息的接口
*
* @param message 订阅的消息
*/
void receiveMessage(String message);
}
实际订阅者接口实现
public class RedisChannelSub implements RedisMsg {
@Override
public void receiveMessage(String message) {
//注意通道调用的方法名要和 RedisPubSubConfig 的listenerAdapter的 MessageListenerAdapter 参数2相同
System.out.println("这是RedisChannelSub" + "-----" + message);
}
}
public class RedisPmpSub implements RedisMsg {
/**
* 接收消息的方法
*
* @param message 订阅消息
*/
@Override
public void receiveMessage(String message) {
//注意通道调用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter参数2相同
System.out.println("这是RedisPmpSub---" + message);
}
}
RedisPubSubConfig配置
/**
* description: Redis 发布订阅的配置
**/
@Configuration
public class RedisPubSubConfig {
/**
* Redis消息监听器容器
*
* @param connectionFactory /
* @return /
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅了一个叫pmp和channel 的通道,多通道
container.addMessageListener(listenerAdapter(new RedisPmpSub()), new PatternTopic("pmp"));
container.addMessageListener(listenerAdapter(new RedisChannelSub()), new PatternTopic("channel"));
//这个container 可以添加多个 messageListener
return container;
}
/**
* 配置消息接收处理类
*
* @param redisMsg 自定义消息接收类
* @return Redis的监听适配器
*/
@Bean
@Scope("prototype")
MessageListenerAdapter listenerAdapter(RedisMsg redisMsg) {
//这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
//也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
//注意2个通道调用的方法都要为receiveMessage
return new MessageListenerAdapter(redisMsg, "receiveMessage");
}
}
定义发布者
@EnableScheduling
@Component
public class TestScheduleRedisPublishController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 向redis消息队列index通道发布消息
*/
@Scheduled(fixedRate = 2000)
public void sendMessage() {
stringRedisTemplate.convertAndSend("pmp", String.valueOf(Math.random()));
stringRedisTemplate.convertAndSend("channel", String.valueOf(Math.random()));
}
}