RabbitMQ的死信队列和延迟队列

死信队列(DLX)和延迟队列是构建健壮消息系统的关键机制。本文深入讲解了 RabbitMQ 中死信队列的触发场景(如消息过期、拒收、队列满等),以及如何利用插件或 TTL+死信机制实现延迟队列。通过配置示例帮助你掌握在生产环境中正确使用这两类队列,以实现消息重试、延迟处理、异常监控等常见需求。
single

死信队列

在 RabbitMQ 中,消息会变成死信有以下几种情况:

  1. 消息被拒绝(rejected),并且 requeue 参数设置为 false
  2. 消息过期(TTL,Time-To-Live)。
  3. 队列达到最大长度,导致新的消息无法被放入队列。

死信队列也是一个普通的队列,同样可以在队列上声明消费者,继续对消息进行消费处理。

关键参数

对于死信队列,在RabbitMQ中主要涉及到几个参数:

  • x-dead-letter-exchange:指定当消息变成死信时应发送到的交换机。这是最重要的参数,用于定义死信交换机。
  • x-dead-letter-routing-key:指定当消息变成死信时应使用的路由键。这个参数是可选的,如果不设置,死信消息会使用其原始的路由键。死信在转移到死信队列的过程中,是没有经过消息发送者确认的,所以并不能保证消息的安全性
  • x-message-ttl:消息的生存时间(TTL,Time-To-Live)。设置消息在队列中的存活时间,超过这个时间未被消费的消息将变成死信。消息在队列中保存时间超过这个TTL,即会被认为死亡。死亡的消息会被丢入死信队列,如果没有配置死信队列的话,RabbitMQ会保证死了的消息不会再次被投递,并且在未来版本中,会主动删除掉这些死掉的消息。
  • x-max-length:队列的最大长度。设置队列中可以容纳的最大消息数量,超出这个数量的消息将变成死信。
  • x-max-length-bytes :队列的最大长度(以字节为单位)。设置队列中所有消息的总大小,超出这个大小的消息将变成死信。

如何确定一个消息是不是死信消息

消息被作为死信转移到死信队列后,会在 Header 当中增加一些消息。在官网的详细介绍中,可以看到很多内容,比如时间、原因(rejected,expired,maxlen)、队列等。然后 Header 中还会加上第一次成为死信的三个属性,并且这三个属性在以后的传递过程中都不会更改。

  • x-first-death-reason
  • x-first-death-queue
  • x-first-death-exchange

延迟队列

其实从前面的配置过程能够看到,所谓死信交换机或者死信队列,不过是在交换机或者队列之间建立一种死信对应关系,而死信队列可以像正常队列一样被消费。他与普通队列一样具有FIFO的特性。对死信队列的消费逻辑通常是对这些失效消息进行一些业务上的补偿。 RabbitMQ中,是不存在延迟队列的功能的,而通常如果要用到延迟队列,就会采用 TTL+死信队列的方式来处理。

RabbitMQ 提供了一个 rabbitmq_delayed_message_exchange 插件,可以实现延迟队列的功能,但是并没有集成到官方的发布包当中,需要单独去下载。

阿里云版本的 RabbitMQ 则进行了优化,可以很方便的发送延时消息,在构建消息的时候设置 delay 参数即可:

MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(msgId);
// 阿里云RabbitMQ可以直接设置延迟时间
if (null != delaySecond) {
    messageProperties.setHeader("delay", 1 * 1000);
}

return new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);

懒队列

RabbitMQ从3.6.0版本开始,就引入了懒队列(Lazy Queue)的概念。懒队列会尽可能早的将消息内容保存到硬盘当中,并且只有在用户请求到时,才临时从硬盘加载到RAM内存当中。懒队列的设计目标是为了支持非常长的队列(数百万级别)。队列可能会因为一些原因变得非常长-也就是数据堆积。

懒队列的特点

  1. 消息存储在磁盘上:懒队列会将消息直接存储到磁盘上,只有在需要传递给消费者时才会加载到内存中。
  2. 减少内存使用:由于消息大部分时间存储在磁盘上,懒队列大幅减少了内存使用,这在消息量非常大时尤为重要。
  3. 适用于高积压场景:特别适合处理高消息积压(backlog)场景,例如批处理任务、日志收集等。

可以解决如下问题:

  • 消费者服务宕机了
  • 有一个突然的消息高峰,生产者生产消息超过消费者
  • 消费者消费太慢了

在代码中可以通过 x-queue-mode 参数指定是懒队列:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

要注意的是,当一个队列被声明为懒队列,那即使队列被设定为不持久化,消息依然会写入到硬盘中。如 果是在集群模式中使用,这会给集群资源带来很大的负担。

最后一句话总结:懒队列适合消息量大且长期有堆积的队列,可以减少内存使用,加快消费速度。但是这 是以大量消耗集群的网络及磁盘IO为代价的