RabbitMQ的使用

本文系统介绍了 RabbitMQ 的基础用法,并深入解析如何通过 Spring Boot 集成实现消息的发送、接收与监听。内容涵盖交换机、队列、绑定关系及消息确认机制等核心概念,旨在帮助开发者高效构建稳定可靠的消息驱动系统。
single

核心概念介绍

工作原理图: RabbitMQ工作原理图

Virtual Hosts:虚拟机,在 RabbitMQ 中不同虚拟机之间的资源是完全隔离的。不考虑资源分配的情况下,每个虚拟机就可以当成一个独立的 RabbitMQ 服务来使用。平常业务中,我们可以通过设置 Virtual Host 来区分不同的环境

Exchange:交换机,用来辅助发送消息,Exchange 和 Queue 会建立一种绑定关系,Exchange 的作用,就是将发送到 Exchange 的消息转发到绑定的队列上。在具体使用时,通常只有消息生产者需要与 Exchange 打交道。而消费者只要从 Queue 中消费消息就可以了。

Queue:一个典型的 FIFO 的队列数据结构,RabbitMQ 中的消息都是通过 Queue 队列传递的。

Connection:客户端与 RabbitMQ 进行交互,首先就需要建立一个 TPC 连接,这个连接就是 Connection。既然是通道,那就需要尽量注意在停止使用时要关闭释放资源。

Channel:一旦客户端与 RabbitMQ 建立了连接,就会分配一个 AMQP 信道 Channel。每个信道都会被分配一个唯一的 ID。也可以理解为是客户端与 RabbitMQ 实际进行数据交互的通道,我们后续的大多数的数据操作都是在信道 Channel 这个层面展开的。 RabbitMQ 为了减少性能开销,也会在一个 Connection 中建立多个 Channel,这样便于客户端进行多线程连接,这些连接会复用同一个 Connection 的 TCP 通道,所以在实际业务中,对于 Connection 和 Channel 的分配也需要根据实际情况进行考量。

RabbitMQ 的队列模型可以参照官网,已经说明的很清晰了:RabbitMQ Tutorials | RabbitMQ

基础编程模型

1. 创建连接,获取 Channel

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(jannaRabbitProperties.getHost());
factory.setPort(jannaRabbitProperties.getPort());
factory.setUsername(jannaRabbitProperties.getUsername());
factory.setPassword(jannaRabbitProperties.getPassword());
factory.setVirtualHost(jannaRabbitProperties.getVhost());

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

通常情况下,我们在一个客户端里都只是创建一个 Channel 就可以了,因为一个 Channel 只要不关闭,是可以一直复用的。但是,如果你想要创建多个 Channel,要注意一下 Channel 冲突的问题。

在创建 channel 时,可以在 createChannel 方法中传入一个分配的 int 参数 channelNumber 。这个 ChannelNumber 就会作为 Channel 的唯一标识。而 RabbitMQ 防止 ChannelNumber 重复的方式是:

  • 如果对应的 Channel 没有创建过,就会创建一个新的 Channel。
  • 但是如果 ChannelNumber 已经创建过一个 Channel 了,这时就会返回一个 null

2. 声明 Exchange-可选

Exchange 在消息收发过程中是一个可选的步骤,如果要使用就需要先进行声明。在声明 Exchange 时需要注意,如果 Broker 上没有对应的 Exchange,那么 RabbitMQ 会自动创建一个新的交换机。但是如果 Broker 上已经有了这个 Exchange,那么你声明时的这些参数需要与 Broker 上的保持一致。如果不一致就会报错。

channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException;

3. 声明 Queue

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                             Map<String, Object> arguments) throws IOException;

与 Exchange 一样,如果你声明的 Queue 在 Broker 上不存在,RabbitMQ 会创建一个新的队列。但是如果 Broker 上已经有了这个队列,那么声明的属性必须和 Broker 上的队列保持一致,否则也会报错。

Durablility 表示是否持久化。Durable 选项表示会将队列的消息写入硬盘,这样服务重启后这些消息就不会丢失。而另外一个选项 Transient 表示不持久化,消息只在内存中流转。这样服务重启后这些消息就会丢失。当然这也意味着消息读写的效率会比较高。

但是 Queue 与 Exchange 不同的是,队列类型并没有在 API 中体现。这是因为不同类型之间的 Queue 差距是很大的,无法用统一的方式来描述不同类型的队列。比如对于 Quorum 和 Stream 类型,根本就没有 Durability 和 AutoDelete 属性,他们的消息默认就是会持久化的。后面的属性参数也会有很大的区别。

唯一有点不同的是队列的 Type 属性。在客户端 API 中,目前并没有一个单独的字段来表示队列的类型。只能通过后面的 arguments 参数来区分不同的队列。

如果要声明一个 Quorum 队列,则只需要在后面的 arguments 中传入一个参数,x-queue-type,参数值设定为 quorum

4. 声明 Exchange 与 Queue 的绑定关系-可选

channel.queueBind(String queue, String exchange, String routingKey) throws IOException;

如果我们声明了 Exchange 和 Queue,那么就还需要声明 Exchange 与 Queue 的绑定关系 Binding。有了这些 Binding,Exchange 才可以知道 Producer 发送过来的消息将要分发到哪些 Queue 上。这些 Binding 涉及到消息的不同分发逻辑,与 Exchange 和 Queue 一样,如果 Broker 上没有建立绑定关系,那么 RabbitMQ 会按照客户端的声明,创建这些绑定关系。但是如果声明的 Binding 存在了,那么就需要与 Broker 上的保持一致。 另外,在声明 Binding 时,还可以传入两个参数, routingKey 和 props,这两个参数都是跟 Exchange 的消息分发逻辑有关。

5. 发送消息

channel.basicPublish(String exchange, String routingKey, BasicProperties props, message.getBytes("UTF-8"));

props 参数,可以传入一些消息相关的属性,我们可以在管理控制台上查看明确的说明。

6. Consumer 消费消息

定义消费者,消费消息进行处理,并向 RabbitMQ 进行消息确认。确认了之后就表明这个消息已经消费完 了,否则 RabbitMQ 还会继续发起重试。

被动消费模式

Consumer 等待 rabbitMQ 服务器将 message 推送过来再消费。一般是启一个一直挂起的线程来等待。

channel.basicConsume(String queue, boolean autoAck, Consumer callback);

主动消费模式

Comsumer 主动到 RabbitMQ 服务器上去拉取消息进行消费。

GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck);

autoAcktrue 则表示消息被 Consumer 消费成功后,后续就无法再消费了。而如果 autoAck 设置为 false ,就需要在处理过程中手动去调用 channel 的 basicAck 方法进行应答。如果不应答的话,这个消息同样会继续被 Consumer 重复处理。所以这里要注意,如果消费者一直不对消息进行应答,那么消息就会不断的发起重试,这就会不断的消耗系统资源,最终造成服务宕机。

如果 autoAck 设置成了 true,那么在回调函数中就不能再手动进行 ack。重复的 ack 会造成 Consumer 无法正常消费更多的消息。

Spring 集成

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

基础配置

ConnectionFactory

ConnectionFactory 是 Spring 和 RabbitMQ 集成中用于创建和配置连接到 RabbitMQ 代理(broker)的连接工厂:

@Bean(name = "fifoRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory fifoRabbitListenerContainerFactory(
    ConnectionFactory connectionFactory, RetryOperationsInterceptor retryOperationsInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAdviceChain(retryOperationsInterceptor);
    factory.setPrefetchCount(1);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
}

失败重试规则

@Bean
public RetryOperationsInterceptor retryOperationsInterceptor() {
    return RetryInterceptorBuilder.stateless()
        .maxAttempts(3)                             // 最大重试次数
        .backOffOptions(1000 * 60, 2.0, 10000 * 60) // 初始间隔、乘数和最大间隔
        // 达到失败上限,转死信队列
        .recoverer(new RejectAndDontRequeueRecoverer())
        .build();
}

SimpleRabbitListenerContainerFactory

是 Spring AMQP 框架中用于创建和配置 RabbitMQ 消息监听器容器的工厂类。它简化了 RabbitListener 监听器容器的配置和管理,特别是在使用注解驱动的消息监听时。

@Bean(name = "fifoRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory fifoRabbitListenerContainerFactory(
    ConnectionFactory connectionFactory, RetryOperationsInterceptor retryOperationsInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAdviceChain(retryOperationsInterceptor);
    factory.setPrefetchCount(1);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
    ConnectionFactory connectionFactory, RetryOperationsInterceptor retryOperationsInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAdviceChain(retryOperationsInterceptor);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
}

这里的 fifoRabbitListenerContainerFactory 是用来给那些需要严格控制 FIFO 消息的消费者使用的。

创建交换机、队列、绑定关系

RabbitAdmin 是 Spring AMQP 提供的一个用于管理 RabbitMQ 基础设施组件的工具类。它可以用于声明(创建)、绑定和删除队列、交换机和绑定关系。RabbitAdmin 自动执行这些操作,因此简化了在应用启动时的基础设施配置工作。

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
    return new RabbitAdmin(connectionFactory);
}

创建交换机

public DirectExchange normalExchange() {
    DirectExchange normalExchange = new DirectExchange("normal_direct_exchange", true, false);
    this.rabbitAdmin.declareExchange(normalExchange);
    return normalExchange;
}

创建队列

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 43200000L);
Queue build = QueueBuilder.durable(queueName)
                  .deadLetterExchange("dead_letter_exchange")
                  .deadLetterRoutingKey(queueName)
                  .withArguments(args)
                  .build();
this.rabbitAdmin.declareQueue(build)

绑定关系

Binding bind = BindingBuilder.bind(queue).to(dlExchange).with(routingKey);
this.rabbitAdmin.declareBinding(bind);

发送和消费

RabbitTemplate

Spring AMQP 框架中的一个核心类,用于与 RabbitMQ 交互。它提供了一种简化的方式来发送和接收消息。

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    return new RabbitTemplate(connectionFactory);
}
public void send(String queueName, String msg) {
    String msgId = UUID.randomUUID().toString();
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setMessageId(msgId);
    // 消息延时5000毫秒
    messageProperties.setHeader("delay", "10000");
    // 创建Message。
    Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
    // 当不指定 exchange 时,RabbitMQ 使用默认交换机(direct exchange,名称为空字符串)并根据 routingKey 将消息投递到相应的队列。
    rabbitTemplate.send(queueName, message);
}

消费者

@RabbitListener(queues = "vitah_queue", concurrency = "1", containerFactory = "fifoRabbitListenerContainerFactory")
public void receiveFromQueue(Message rabbitMsg, Channel channel) throws IOException {
    System.out.println(new String(rabbitMsg.getBody()) + "-" + Thread.currentThread().getId());
    channel.basicAck(rabbitMsg.getMessageProperties().getDeliveryTag(), false);
}