核心概念介绍
工作原理图:
Virtual Hosts:虚拟机,在 RabbitMQ 中不同虚拟机之间的资源是完全隔离的。不考虑资源分配的情况下,每个虚拟机就可以当成一个独立的 RabbitMQ 服务来使用。平常业务中,我们可以通过设置 Virtual Host
来区分不同的环境
Exchange:交换机,用来辅助发送消息,Exchange 和 Queue 会建立一种绑定关系,Exchange 的作用,就是将发送到 Exchange 的消息转发到绑定的队列上。在具体使用时,通常只有消息生产者需要与 Exchange 打交道。而消费者只要从 Queue 中消费消息就可以了。
Queue:一个典型的 FIFO 的队列数据结构,RabbitMQ 中的消息都是通过 Queue 队列传递的。
Connection:客户端与 RabbitMQ 进行交互,首先就需要建立一个 TPC 连接,这个连接就是 Connection。既然是通道,那就需要尽量注意在停止使用时要关闭释放资源。
Channel:一旦客户端与 RabbitMQ 建立了连接,就会分配一个 AMQP 信道 Channel。每个信道都会被分配一个唯一的 ID。也可以理解为是客户端与 RabbitMQ 实际进行数据交互的通道,我们后续的大多数的数据操作都是在信道 Channel 这个层面展开的。 RabbitMQ 为了减少性能开销,也会在一个 Connection 中建立多个 Channel,这样便于客户端进行多线程连接,这些连接会复用同一个 Connection 的 TCP 通道,所以在实际业务中,对于 Connection 和 Channel 的分配也需要根据实际情况进行考量。
RabbitMQ 的队列模型可以参照官网,已经说明的很清晰了:RabbitMQ Tutorials | RabbitMQ
基础编程模型
1. 创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(jannaRabbitProperties.getHost());
factory.setPort(jannaRabbitProperties.getPort());
factory.setUsername(jannaRabbitProperties.getUsername());
factory.setPassword(jannaRabbitProperties.getPassword());
factory.setVirtualHost(jannaRabbitProperties.getVhost());
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
通常情况下,我们在一个客户端里都只是创建一个 Channel 就可以了,因为一个 Channel 只要不关闭,是可以一直复用的。但是,如果你想要创建多个 Channel,要注意一下 Channel 冲突的问题。
在创建 channel 时,可以在 createChannel 方法中传入一个分配的 int 参数 channelNumber
。这个 ChannelNumber 就会作为 Channel 的唯一标识。而 RabbitMQ 防止 ChannelNumber 重复的方式是:
- 如果对应的 Channel 没有创建过,就会创建一个新的 Channel。
- 但是如果
ChannelNumber
已经创建过一个 Channel 了,这时就会返回一个null
。
2. 声明 Exchange-可选
Exchange 在消息收发过程中是一个可选的步骤,如果要使用就需要先进行声明。在声明 Exchange 时需要注意,如果 Broker 上没有对应的 Exchange,那么 RabbitMQ 会自动创建一个新的交换机。但是如果 Broker 上已经有了这个 Exchange,那么你声明时的这些参数需要与 Broker 上的保持一致。如果不一致就会报错。
channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException;
3. 声明 Queue
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
与 Exchange 一样,如果你声明的 Queue 在 Broker 上不存在,RabbitMQ 会创建一个新的队列。但是如果 Broker 上已经有了这个队列,那么声明的属性必须和 Broker 上的队列保持一致,否则也会报错。
Durablility 表示是否持久化。Durable 选项表示会将队列的消息写入硬盘,这样服务重启后这些消息就不会丢失。而另外一个选项 Transient 表示不持久化,消息只在内存中流转。这样服务重启后这些消息就会丢失。当然这也意味着消息读写的效率会比较高。
但是 Queue 与 Exchange 不同的是,队列类型并没有在 API 中体现。这是因为不同类型之间的 Queue 差距是很大的,无法用统一的方式来描述不同类型的队列。比如对于 Quorum 和 Stream 类型,根本就没有 Durability 和 AutoDelete 属性,他们的消息默认就是会持久化的。后面的属性参数也会有很大的区别。
唯一有点不同的是队列的 Type 属性。在客户端 API 中,目前并没有一个单独的字段来表示队列的类型。只能通过后面的 arguments 参数来区分不同的队列。
如果要声明一个 Quorum 队列,则只需要在后面的 arguments 中传入一个参数,x-queue-type,参数值设定为 quorum。
4. 声明 Exchange 与 Queue 的绑定关系-可选
channel.queueBind(String queue, String exchange, String routingKey) throws IOException;
如果我们声明了 Exchange 和 Queue,那么就还需要声明 Exchange 与 Queue 的绑定关系 Binding。有了这些 Binding,Exchange 才可以知道 Producer 发送过来的消息将要分发到哪些 Queue 上。这些 Binding 涉及到消息的不同分发逻辑,与 Exchange 和 Queue 一样,如果 Broker 上没有建立绑定关系,那么 RabbitMQ 会按照客户端的声明,创建这些绑定关系。但是如果声明的 Binding 存在了,那么就需要与 Broker 上的保持一致。 另外,在声明 Binding 时,还可以传入两个参数, routingKey 和 props,这两个参数都是跟 Exchange 的消息分发逻辑有关。
5. 发送消息
channel.basicPublish(String exchange, String routingKey, BasicProperties props, message.getBytes("UTF-8"));
props 参数,可以传入一些消息相关的属性,我们可以在管理控制台上查看明确的说明。
6. Consumer 消费消息
定义消费者,消费消息进行处理,并向 RabbitMQ 进行消息确认。确认了之后就表明这个消息已经消费完 了,否则 RabbitMQ 还会继续发起重试。
被动消费模式
Consumer 等待 rabbitMQ 服务器将 message 推送过来再消费。一般是启一个一直挂起的线程来等待。
channel.basicConsume(String queue, boolean autoAck, Consumer callback);
主动消费模式
Comsumer 主动到 RabbitMQ 服务器上去拉取消息进行消费。
GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck);
autoAck
为 true
则表示消息被 Consumer 消费成功后,后续就无法再消费了。而如果 autoAck
设置为 false
,就需要在处理过程中手动去调用 channel 的 basicAck 方法进行应答。如果不应答的话,这个消息同样会继续被 Consumer 重复处理。所以这里要注意,如果消费者一直不对消息进行应答,那么消息就会不断的发起重试,这就会不断的消耗系统资源,最终造成服务宕机。
如果 autoAck 设置成了 true,那么在回调函数中就不能再手动进行 ack。重复的 ack 会造成 Consumer 无法正常消费更多的消息。
Spring 集成
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
基础配置
ConnectionFactory
ConnectionFactory
是 Spring 和 RabbitMQ 集成中用于创建和配置连接到 RabbitMQ 代理(broker)的连接工厂:
@Bean(name = "fifoRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory fifoRabbitListenerContainerFactory(
ConnectionFactory connectionFactory, RetryOperationsInterceptor retryOperationsInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAdviceChain(retryOperationsInterceptor);
factory.setPrefetchCount(1);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
失败重试规则
@Bean
public RetryOperationsInterceptor retryOperationsInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(3) // 最大重试次数
.backOffOptions(1000 * 60, 2.0, 10000 * 60) // 初始间隔、乘数和最大间隔
// 达到失败上限,转死信队列
.recoverer(new RejectAndDontRequeueRecoverer())
.build();
}
SimpleRabbitListenerContainerFactory
是 Spring AMQP 框架中用于创建和配置 RabbitMQ 消息监听器容器的工厂类。它简化了 RabbitListener
监听器容器的配置和管理,特别是在使用注解驱动的消息监听时。
@Bean(name = "fifoRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory fifoRabbitListenerContainerFactory(
ConnectionFactory connectionFactory, RetryOperationsInterceptor retryOperationsInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAdviceChain(retryOperationsInterceptor);
factory.setPrefetchCount(1);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory, RetryOperationsInterceptor retryOperationsInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAdviceChain(retryOperationsInterceptor);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
这里的 fifoRabbitListenerContainerFactory 是用来给那些需要严格控制 FIFO 消息的消费者使用的。
创建交换机、队列、绑定关系
RabbitAdmin
是 Spring AMQP 提供的一个用于管理 RabbitMQ 基础设施组件的工具类。它可以用于声明(创建)、绑定和删除队列、交换机和绑定关系。RabbitAdmin
自动执行这些操作,因此简化了在应用启动时的基础设施配置工作。
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
创建交换机
public DirectExchange normalExchange() {
DirectExchange normalExchange = new DirectExchange("normal_direct_exchange", true, false);
this.rabbitAdmin.declareExchange(normalExchange);
return normalExchange;
}
创建队列
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 43200000L);
Queue build = QueueBuilder.durable(queueName)
.deadLetterExchange("dead_letter_exchange")
.deadLetterRoutingKey(queueName)
.withArguments(args)
.build();
this.rabbitAdmin.declareQueue(build)
绑定关系
Binding bind = BindingBuilder.bind(queue).to(dlExchange).with(routingKey);
this.rabbitAdmin.declareBinding(bind);
发送和消费
RabbitTemplate
Spring AMQP 框架中的一个核心类,用于与 RabbitMQ 交互。它提供了一种简化的方式来发送和接收消息。
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
public void send(String queueName, String msg) {
String msgId = UUID.randomUUID().toString();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(msgId);
// 消息延时5000毫秒
messageProperties.setHeader("delay", "10000");
// 创建Message。
Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
// 当不指定 exchange 时,RabbitMQ 使用默认交换机(direct exchange,名称为空字符串)并根据 routingKey 将消息投递到相应的队列。
rabbitTemplate.send(queueName, message);
}
消费者
@RabbitListener(queues = "vitah_queue", concurrency = "1", containerFactory = "fifoRabbitListenerContainerFactory")
public void receiveFromQueue(Message rabbitMsg, Channel channel) throws IOException {
System.out.println(new String(rabbitMsg.getBody()) + "-" + Thread.currentThread().getId());
channel.basicAck(rabbitMsg.getMessageProperties().getDeliveryTag(), false);
}