Redis发布订阅模式

lijunyi2022-08-12Redis缓存框架

Redis发布订阅概述

Redis发布订阅 (publish/subscribe) 是一种消息通信模式:发送者pub发送消息,订阅者 (sub接收消息。

  • PUBLISH 命令向通道发送信息,此客户端称为 publisher发布者
  • SUBSCRIBE 向命令通道订阅信息,此客户端称为 subscriber订阅者
  • Redis 中 发布订阅模块的名字叫着 PubSub,也就是 PublisherSubscriber
  • 一个发布者向一个通道发送消息,订阅者可以向多个通道订阅消息;当发布者向通道发布消息后,如果有订阅者订阅该通道,订阅者就会收到消息。
命令描述
Redis Unsubscribe 命令open in new window指退订给定的频道。
Redis Subscribe 命令open in new window订阅给定的一个或多个频道的信息。
Redis Pubsub 命令open in new window查看订阅与发布系统状态。
Redis Punsubscribe 命令open in new window退订所有给定模式的频道。
Redis Publish 命令open in new window将信息发送到指定的频道。
Redis Psubscribe 命令open in new window订阅一个或多个符合给定模式的频道。

命令演示

订阅端

127.0.0.1:6379> SUBSCRIBE channel-1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel-1"
3) (integer) 1
1) "message"         # 消息
2) "channel-1"       # 频道
3) "hello,redis"     # 消息具体内容

发送端

127.0.0.1:6379> PUBLISH channel-1 "hello,redis"
(integer) 1
127.0.0.1:6379> 

Redis发布订阅模式与消息中间件的对比

redis

  • 轻量级,低延迟,高并发,低可靠性;

rabbitmq

  • 重量级,高可靠,异步,不保证实时;

SpringBoot整合Redis实现发布订阅模式

订阅者接受消息的接口

/**
 * description: 订阅者接受消息的通用接口
 **/
@Component
public interface RedisMsg {

    /**
     * Redis订阅者接受消息的接口
     *
     * @param message 订阅的消息
     */
    void receiveMessage(String message);
}

实际订阅者接口实现

public class RedisChannelSub implements RedisMsg {

    @Override
    public void receiveMessage(String message) {
        //注意通道调用的方法名要和 RedisPubSubConfig 的listenerAdapter的 MessageListenerAdapter 参数2相同
        System.out.println("这是RedisChannelSub" + "-----" + message);
    }
}

public class RedisPmpSub implements RedisMsg {

    /**
     * 接收消息的方法
     *
     * @param message 订阅消息
     */
    @Override
    public void receiveMessage(String message) {
        //注意通道调用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter参数2相同
        System.out.println("这是RedisPmpSub---" + message);
    }
}

RedisPubSubConfig配置

/**
 * description: Redis 发布订阅的配置
 **/
@Configuration
public class RedisPubSubConfig {

    /**
     * Redis消息监听器容器
     *
     * @param connectionFactory /
     * @return /
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅了一个叫pmp和channel 的通道,多通道
        container.addMessageListener(listenerAdapter(new RedisPmpSub()), new PatternTopic("pmp"));
        container.addMessageListener(listenerAdapter(new RedisChannelSub()), new PatternTopic("channel"));
        //这个container 可以添加多个 messageListener
        return container;
    }

    /**
     * 配置消息接收处理类
     *
     * @param redisMsg 自定义消息接收类
     * @return Redis的监听适配器
     */
    @Bean
    @Scope("prototype")
    MessageListenerAdapter listenerAdapter(RedisMsg redisMsg) {
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
        //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
        //注意2个通道调用的方法都要为receiveMessage
        return new MessageListenerAdapter(redisMsg, "receiveMessage");
    }
}

定义发布者

@EnableScheduling
@Component
public class TestScheduleRedisPublishController {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 向redis消息队列index通道发布消息
     */
    @Scheduled(fixedRate = 2000)
    public void sendMessage() {
        stringRedisTemplate.convertAndSend("pmp", String.valueOf(Math.random()));
        stringRedisTemplate.convertAndSend("channel", String.valueOf(Math.random()));
    }
}
Last Updated 6/14/2024, 3:05:31 AM