RabbitMQ延迟队列

lijunyi2022-08-05中间件RabbitMQ

延迟队列介绍

延迟队列概念:

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延迟队列使用场景:

  • 订单在十分钟之内未支付则自动取消
  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
  • 用户注册成功后,如果三天内没有登陆则进行短信提醒
  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员
  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

TTL的两种设置

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。

换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为「死信」。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

队列设置 TTL

在创建队列的时候设置队列的 x-message-ttl 属性

Map<String, Object> params = new HashMap<>();
params.put("x-message-ttl",5000);
return QueueBuilder.durable("QA").withArguments(args).build(); // QA 队列的最大存活时间位 5000 毫秒

消息设置 TTL

针对每条消息设置 TTL

rabbitTemplate.converAndSend("X","XC",message,correlationData -> {
    correlationData.getMessageProperties().setExpiration("5000");
});

两者区别

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间,具体看下方案例。

另外还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃

延迟队列示例

代码已经上传到 Giteeopen in new window

队列TTL

代码架构图

创建两个队列 QA 和 QB,两个队列的 TTL 分别设置为 10S 和 40S,然后再创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

rabbitmq

TtlQueueConfig配置

/**
 * @version 1.0.0
 * @className: TtlQueueConfig
 * @description: TTL队列配置
 * @author: LiJunYi
 * @create: 2022/8/4 12:42
 */
@Configuration
public class TtlQueueConfig
{
    /**普通交换机*/
    private static final String X_EXCHANGE = "X";
    /**死信交换机*/
    private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    /**普通队列*/
    private static final String QUEUE_A ="QA";
    private static final String QUEUE_B ="QB";
    /**
     * 死信队列
     */
    private static final String DEAD_LETTER_QUEUE = "QD";


    /**
     * 声明XExchange  别名
     * @return {@link DirectExchange}
     */
    @Bean("xExchange")
    public DirectExchange xDirectExchange(){
        return new DirectExchange(X_EXCHANGE);
    }


    /**
     * 声明XExchange  别名
     *
     * @return {@link DirectExchange}
     */
    @Bean("yExchange")
    public DirectExchange yDirectExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }


    /**
     * 声明queueA 设置设置过期时间 10s
     *
     * @return {@link Queue}
     */
    @Bean("queueA")
    public Queue queueA(){
        HashMap<String, Object> map = new HashMap<>(4);
        // 设置死信队列的交换机
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 设置死信队列的routingKey
        map.put("x-dead-letter-routing-key", "YD");
        // 设置过期时间
        map.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
    }


    /**
     * 声明queueB 设置过期时间40s
     *
     * @return {@link Queue}
     */
    @Bean("queueB")
    public Queue queueB(){
        HashMap<String, Object> map = new HashMap<>(4);
        // 设置死信队列的交换机
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 设置死信队列的routingKey
        map.put("x-dead-letter-routing-key", "YD");
        // 设置过期时间
        map.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(map).build();
    }

    /**声明死信队列*/
    @Bean("queueD")
    public Queue queueD()
    {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    /**
     * 声明队列 QA 绑定 X 交换机
     *
     * @param xExchange x交换
     * @return {@link Binding}
     */
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") Exchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA").noargs();
    }

    /**
     * 声明队列 QB 绑定 X 交换机
     *
     * @param queueB    queueb
     * @param xExchange x交换
     * @return {@link Binding}
     */
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") Exchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB").noargs();
    }

    /**
     * 声明队列 QD 绑定 Y 交换机
     *
     * @param queueD    排队
     * @param yExchange y交换
     * @return {@link Binding}
     */
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") Exchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD").noargs();
    }
}

消费者

/**
 * @version 1.0.0
 * @className: DeadLetterConsumer
 * @description: 监听延迟队列 QD的消息
 * @see com.example.delayqueue.config.TtlQueueConfig#DEAD_LETTER_QUEUE
 * @author: LiJunYi
 * @create: 2022/8/4 12:58
 */
@Component
@Slf4j
public class DeadLetterConsumer
{
    /**
     * 收到了
     *
     * @param msg     信息
     * @param channel 通道
     * @throws Exception 异常
     */
    @RabbitListener(queues = {"QD"})
    public void receiveD(Message msg, Channel channel) throws Exception{
        String result = new String(msg.getBody());
        log.info("当前时间:{},收到死信队列的消息: {}",new Date(),result);
    }
}

生产者

/**
 * @version 1.0.0
 * @className: MsgController
 * @description:
 * @author: LiJunYi
 * @create: 2022/8/4 12:53
 */
@RestController
@RequestMapping("/ttl")
@Slf4j
public class MsgController
{
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送信息
     *
     * @param msg 信息
     */
    @GetMapping("/sendMsg/{msg}")
    public void sendMsg(@PathVariable("msg")String msg){
        // 后者会给占位符赋值,实现动态传递
        log.info("当前时间:{},发送一条消息给两个TTL队列",new Date(),msg);
        rabbitTemplate.convertAndSend("X","XA", "消息来自ttl为10s的队列:" + msg);
        rabbitTemplate.convertAndSend("X","XB", "消息来自ttl为40s的队列:" + msg);
    }
}

然后通过浏览器发送一条请求:http://localhost:8081/ttl/sendMsg/rabbitmq测试

rabbitmq

延迟队列优化

在上面示例中,我们是在TtlQueueConfig中配置的过期时间,但是实际上过期时间是多变的,如果是多个的话,按照我们上面的配置,那得要配置多个,显然这是不可理的。

我们新增了一个队列 QC,该队列不设置 TTL 时间,根据前端的请求确定 TTL 时间,绑定关系如下:

rabbitmq

修改TtlQueueConfig配置

/**
 * @version 1.0.0
 * @className: TtlQueueConfig
 * @description: TTL队列配置
 * @author: LiJunYi
 * @create: 2022/8/4 12:42
 */
@Configuration
public class TtlQueueConfig
{
    /**普通交换机*/
    private static final String X_EXCHANGE = "X";
    /**死信交换机*/
    private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    /**普通队列*/
    private static final String QUEUE_A ="QA";
    private static final String QUEUE_B ="QB";
    /**
     * 死信队列
     */
    private static final String DEAD_LETTER_QUEUE = "QD";

    /**
     * 新增一个普通队列,该队列的TTL通过前端控制
     */
    private static final String QUEUE_C ="QC";

    /**
     * 声明XExchange  别名
     * @return {@link DirectExchange}
     */
    @Bean("xExchange")
    public DirectExchange xDirectExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    /**
     * 声明XExchange  别名
     *
     * @return {@link DirectExchange}
     */
    @Bean("yExchange")
    public DirectExchange yDirectExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明queueA 设置设置过期时间 10s
     *
     * @return {@link Queue}
     */
    @Bean("queueA")
    public Queue queueA(){
        HashMap<String, Object> map = new HashMap<>(4);
        // 设置死信队列的交换机
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 设置死信队列的routingKey
        map.put("x-dead-letter-routing-key", "YD");
        // 设置过期时间
        map.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
    }

    /**
     * 声明queueB 设置过期时间40s
     *
     * @return {@link Queue}
     */
    @Bean("queueB")
    public Queue queueB(){
        HashMap<String, Object> map = new HashMap<>(4);
        // 设置死信队列的交换机
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 设置死信队列的routingKey
        map.put("x-dead-letter-routing-key", "YD");
        // 设置过期时间
        map.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(map).build();
    }

    /**
     * 声明queueC:用来存放发送者自定义的延迟队列 因此取消定义队列过期时间
     *
     * @return {@link Queue}
     */
    @Bean("queueC")
    public Queue queueC(){
        HashMap<String, Object> map = new HashMap<>(4);
        // 设置死信队列的交换机
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 设置死信队列的routingKey
        map.put("x-dead-letter-routing-key", "YD");

        return QueueBuilder.durable(QUEUE_C).withArguments(map).build();
    }

    /**声明死信队列*/
    @Bean("queueD")
    public Queue queueD()
    {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    /**
     * 声明队列 QA 绑定 X 交换机
     *
     * @param xExchange x交换
     * @return {@link Binding}
     */
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") Exchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA").noargs();
    }

    /**
     * 声明队列 QB 绑定 X 交换机
     *
     * @param queueB    queueb
     * @param xExchange x交换
     * @return {@link Binding}
     */
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") Exchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB").noargs();
    }

    /**
     * 声明队列 QC 绑定 X 交换机
     *
     * @param queueC    queuec
     * @param xExchange x交换
     * @return {@link Binding}
     */
    @Bean
    public Binding queueCBindingY(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") Exchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC").noargs();
    }

    /**
     * 声明队列 QD 绑定 Y 交换机
     *
     * @param queueD    排队
     * @param yExchange y交换
     * @return {@link Binding}
     */
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") Exchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD").noargs();
    }
}

生产者修改

/**
 * @version 1.0.0
 * @className: MsgController
 * @description:
 * @author: LiJunYi
 * @create: 2022/8/4 12:53
 */
@RestController
@RequestMapping("/ttl")
@Slf4j
public class MsgController
{
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送信息
     *
     * @param msg 信息
     */
    @GetMapping("/sendMsg/{msg}")
    public void sendMsg(@PathVariable("msg")String msg){
        // 后者会给占位符赋值,实现动态传递
        log.info("当前时间:{},发送一条消息给两个TTL队列",new Date(),msg);
        rabbitTemplate.convertAndSend("X","XA", "消息来自ttl为10s的队列:" + msg);
        rabbitTemplate.convertAndSend("X","XB", "消息来自ttl为40s的队列:" + msg);
    }

    /**
     * 开始发定义有过期时间的消息
     *
     * @param msg     信息
     * @param ttlTime ttl时间
     */
    @GetMapping("/sendMsg/{msg}/{ttlTime}")
    public void sendExpireMsg(@PathVariable("msg")String msg,@PathVariable("ttlTime")String ttlTime){
        log.info("当前时间:{},发送一条时长是{}ms的TTL消息给QC队列,内容是{}",new Date(),ttlTime,msg);
        rabbitTemplate.convertAndSend("X","XC", "ttl消息为"+ ttlTime +"的时间,内容为:" + msg,message ->{
            // 生产者设置过期时间
            message.getMessageProperties().setExpiration(ttlTime);
            return message;
        });
    }
}

同样的我们发送一条请求:http://localhost:8081/ttl/sendMsg/rabbitmq测试/2000,观察控制台输出

rabbitmq

Rabbitmq插件实现延迟队列

  • 是为了实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡

安装延时队列插件

  • 点击进入GitHub下载open in new window,放置到 RabbitMQ 的插件目录。

  • linux下,将插件放置到RabbitMQ的插件目录后,执行命令使其生效:

# 安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 重启服务
systemctl restart rabbitmq-server
  • windows下

1、把下载下来的文件拷贝到RabbitMQ安装目录下的 plugins 目录。

2、进入RabbitMQ安装目录下的 sbin目录,在cmd窗口下执行如下命令使插件生效 如果后面发现在未失效请重启服务再查看

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 打开rabbitmq控制台,点击Exchanges,如果Add a new exchange功能里的Type下拉框里出现x-delayed-message类型,则说明安装成功,可以发布延时消息了。
rabbitmq

基于插件实现延迟队列

新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

rabbitmq

新增DelayedQueueConfig配置

/**
 * @version 1.0.0
 * @className: DelayedQueueConfig
 * @description: 基于延迟插件实现的延迟队列
 * @author: LiJunYi
 * @create: 2022/8/4 13:30
 */
@Configuration
public class DelayedQueueConfig
{

    /**
     * 交换机
     */
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    /**
     * 延迟队列名称
     */
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    /**
     * 延迟队列路由key
     */
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    /**
     * 声明队列
     *
     * @return {@link Queue}
     */
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE_NAME);
    }


    /**
     * 声明交换机 基于插件的
     *
     * @return {@link CustomExchange}
     */
    @Bean
    public CustomExchange delayedExchange(){
        HashMap<String, Object> map = new HashMap<>(2);
        // 固定参数
        map.put("x-delayed-type", "direct");
        /**
         * 1、交换机名称
         * 2、交换机类型
         * 3、是否持久化
         * 4、是否自动删除
         * 5、参数
         * */
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, map);
    }


    /**
     * 绑定交换机和队列
     *
     * @param queue    队列
     * @param exchange 交换
     * @return {@link Binding}
     */
    @Bean
    public Binding queueBindingExchange(@Qualifier("delayedQueue") Queue queue,
                                        @Qualifier("delayedExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

消费者设置

/**
 * @version 1.0.0
 * @className: DelayedConsumer
 * @description: 延迟队列消费者
 * @author: LiJunYi
 * @create: 2022/8/4 13:34
 */
@Component
@Slf4j
public class DelayedConsumer
{
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveMsg(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("当前时间为{},收到延迟消息为{}",new Date(), msg);
    }
}

生产者设置

/**
 * @version 1.0.0
 * @className: MsgController
 * @description:
 * @author: LiJunYi
 * @create: 2022/8/4 12:53
 */
@RestController
@RequestMapping("/ttl")
@Slf4j
public class MsgController
{
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送信息
     *
     * @param msg 信息
     */
    @GetMapping("/sendMsg/{msg}")
    public void sendMsg(@PathVariable("msg")String msg){
        // 后者会给占位符赋值,实现动态传递
        log.info("当前时间:{},发送一条消息给两个TTL队列",new Date(),msg);
        rabbitTemplate.convertAndSend("X","XA", "消息来自ttl为10s的队列:" + msg);
        rabbitTemplate.convertAndSend("X","XB", "消息来自ttl为40s的队列:" + msg);
    }

    /**
     * 开始发定义有过期时间的消息
     *
     * @param msg     信息
     * @param ttlTime ttl时间
     */
    @GetMapping("/sendMsg/{msg}/{ttlTime}")
    public void sendExpireMsg(@PathVariable("msg")String msg,@PathVariable("ttlTime")String ttlTime){
        log.info("当前时间:{},发送一条时长是{}ms的TTL消息给QC队列,内容是{}",new Date(),ttlTime,msg);
        rabbitTemplate.convertAndSend("X","XC", "ttl消息为"+ ttlTime +"的时间,内容为:" + msg,message ->{
            // 生产者设置过期时间
            message.getMessageProperties().setExpiration(ttlTime);
            return message;
        });
    }


    /**
     * 基于插件的延时队列
     *
     * @param msg         信息
     * @param delayedTime 延迟时间
     */
    @GetMapping("/sendDelayedMsg/{msg}/{delayedTime}")
    public void sendDelayedMsg(@PathVariable("msg")String msg,@PathVariable("delayedTime")Integer delayedTime){
        log.info("当前时间:{},发送一条时长是{}ms的延时消息给QC队列,内容是{}",new Date(),delayedTime,msg);
        rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingkey",
                "延时消息的时间是"+ delayedTime +",内容为:" + msg, message ->{
                    // 生产者设置延时时间,和上面的有区别
                    message.getMessageProperties().setDelay(delayedTime);
                    return message;
                });
    }
}

访问:http://localhost:8081/ttl/sendDelayedMsg/teset-cha-jiain-yanchi/2000 查看控制台情况

rabbitmq

总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。

Last Updated 2024/5/24 16:21:58