RabbitMQ - 从下载安装到集群部署
因为最近 ChatGPT 热度飙升,而且我在实际尝试之后,觉得它对于学习而言,是非常有利的,本文有不少知识点处自 ChatGPT 的回答。
概览
消息队列
ChatGPT:关于消息队列的理解!
消息队列所解决的并不只是存放消息这一目的,更主要是用于解决通信问题。通常,消息队列会应用于以下几种场景:
-
异步任务处理:将需要花费较长时间的任务放入消息队列中,让消费者异步地处理它们,避免阻塞生产者线程。
-
解耦系统组件:通过消息队列将不同的系统组件解耦,使得系统的扩展更为容易,并且可以提高系统的可靠性和稳定性。
-
流量削峰:当系统面临突发大量请求时,可以使用消息队列来平滑处理流量,避免系统瘫痪或者出现过载。
-
日志处理:将日志信息写入消息队列中,由消费者异步地将其写入存储系统中,从而减轻生产者的负担。
-
应用解耦:不同的应用之间可以通过消息队列进行通信,从而实现应用解耦和松耦合。
AMQP
AMQP 是 Advanced Message Queuing Protocol 的缩写,是一种用于消息传递的开放式标准协议。它为面向消息的中间件设计,并且允许应用程序之间在可靠、安全和高效的方式下进行异步通信,并支持多种消息传递模型,如点对点、发布/订阅、请求/响应等。RabbitMQ 就是 AMQP 的一种实现方案。
AMQP 可以在不同的平台上实现,包括 Java、C++、Python 等,因此具有很好的跨平台性。由于其稳定性和安全性,AMQP 被广泛应用于企业级应用程序中,如金融交易处理、电信网络管理、医疗保健等。
安装
在 Windows 环境下安装
在 Windows 环境下安装 RabbitMQ,可以按照以下步骤进行操作:
-
下载 RabbitMQ 安装包:从官网下载适用于 Windows 的二进制安装包。
-
安装 Erlang 运行时环境:由于 RabbitMQ 是使用 Erlang 编写的,因此需要先安装 Erlang 运行时环境。从 Erlang 官网下载适用于 Windows 的二进制安装包并安装,注意配置 Erlang 环境变量。
-
安装 RabbitMQ:运行 RabbitMQ 的安装程序,并按照提示完成安装过程。在安装过程中,可以选择自定义安装路径和开启管理插件。
-
启动 RabbitMQ 服务:安装完成后,打开命令提示符或 PowerShell 窗口,切换到 RabbitMQ 安装目录的 sbin 子目录,运行
rabbitmq-server.bat
文件即可启动 RabbitMQ 服务。 -
访问 RabbitMQ 管理界面:在浏览器中输入 http://localhost:15672/,使用默认的用户名(guest)和密码(guest)登录 RabbitMQ 管理界面。
注意:在安装和配置过程中,需要确保防火墙不会阻止 RabbitMQ 的网络流量。
在 Linux 环境下安装
如果你手上没有 Linux 环境,可以安装 VMware Workstation 进行学习。
-
安装 Erlang 环境。
yum -y update # 安装前先检查更新软件包 yum -y install epel-release # Erlang在默认的YUM存储库中不可用,因此需要安装EPEL存储库 yum -y install erlang socat erl -version # 安装完成后通过查看版本号进行验证
-
安装 RabbitMQ。
yum -y install rabbitmq-server
-
RabbitMQ 启动操作。
service rabbitmq-server start # 启动 service rabbitmq-server status # 查看状态 service rabbitmq-server stop # 停止 service rabbitmq-server restart # 重启 rabbitmq-plugins enable rabbitmq_management # 启用插件
-
控制台访问。
登录成功后,可以看到如下界面:
注:RabbitMQ 默认账户:guest(guest)。外部访问时,需要配置防火墙策略,如果是学习使用,可以考虑关闭防火墙。
使用 Docker 安装
如您对 Docker 相关知识仍有所疑惑,可以前往阅读 Docker 相关教程:Docker 详细教程
略
RabbitMQ 架构
RabbitMQ 是一个基于 AMQP 协议的开源消息中间件,用于在分布式系统之间传递和处理数据。它主要由如下组件构成:
-
Broker:消息代理,负责接收、存储、转发消息。RabbitMQ 中的 Broker 主要由 Exchange 和 Queue 两部分组成。
-
Exchange:消息交换机,负责接收消息并将消息路由到一个或多个 Queue 中。Exchange 有四种类型:Direct、Topic、Fanout 和 Headers。
-
Queue:消息队列,存储消息的容器。生产者将消息发送到 Exchange,Exchange 将消息路由到一个或多个 Queue 中,消费者从 Queue 中获取消息。
-
Connection:连接,客户端与 Broker 之间的连接。
-
Channel:通道,建立在 Connection 之上的虚拟连接,用于发送和接收消息。Channel 是线程安全的,一个 Connection 可以拥有多个 Channel。
-
Producer:生产者,向 Exchange 发送消息的客户端。
-
Consumer:消费者,从 Queue 获取消息并处理的客户端。
-
Binding:绑定,将 Exchange 和 Queue 绑定在一起,定义 Exchange 如何将消息路由到 Queue。
-
Virtual Host:虚拟主机,RabbitMQ 允许创建多个虚拟主机,每个虚拟主机相当于一个独立的 RabbitMQ 服务器,拥有自己的 Exchange、Queue、Binding 等组件。虚拟主机之间相互隔离,不会互相干扰。
RabbitMQ 一个简单的架构如下图所示:
虚拟主机
虚拟主机 vhost 本质上就是一个迷你版的消息队列服务器,有自己的队列、交换器和绑定,最重要的是它有自己的权限机制。vhost 提供了逻辑上的分离,可以将众多客户端进行区分,又可以避免队列和交换器的命名冲突。vhost 必须在连接时指定,RabbitMQ 包含一个缺省的虚拟主机:/
,通过缺省用户和口令 guest 进行访问。
虚拟主机保证了用户可以在多个不同的应用中使用 RabbitMQ。
在 RabbitMQ 中创建用户,必须要被指派给至少一个 vhost,并且只能访问被指派内的队列、交换器和绑定。vhost 必须通过 RabbitMQ 的管理控制工具创建。
交换器
RabbitMQ 支持多种交换器类型:
-
Direct
direct 为默认的交换器类型,也非常的简单,如果路由键匹配的话,消息就投递到相应的队列。
使用代码:
channel.basicPublish("", queueName, null, message)
推送direct交换器消息到对于的队列,空字符为默认的direct交换器,用队列名称当做路由键。channel.basicPublish("", queueName, null, message);
-
Fanout (发布/订阅模式)
fanout 有别于 direct 交换器,fanout 是一种发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。
和 direct 交换器不同,我们在发送消息的时候新增
channel.exchangeDeclare(exchangeName, "fanout")
,这行代码声明 fanout 交换器。channel.exchangeDeclare(exchangeName, "fanout");
-
Topic (匹配订阅模式)
topic 交换器运行和 fanout 类似,但是可以更灵活的匹配自己想要订阅的信息,它使用路由键进行消息(规则)匹配。
通过使用
*
和#
这两种符号,使来自不同源头的消息到达同一个队列,符号.
则将路由键分为了几个标识符,符号*
匹配1个,符号#
匹配一个或多个。channel.queueBind(queueName, exchangeName, "#.error");
-
headers
几乎和 direct 一样,不实用,可以忽略。
路由键
队列通过路由键(routing key,某种确定的规则)绑定到交换器,生产者将消息发布到交换器,交换器根据绑定的路由键将消息路由到特定队列,然后由订阅这个队列的消费者进行接收。
消息的确认
消费者收到的每一条消息都必须进行确认(自动确认和自行确认)。
消费者在声明队列时,可以指定 autoAck 参数,当 autoAck=false
时,RabbitMQ 会等待消费者显式发回 ack 信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,在 RabbitMQ 队列中消息被消费后会立即删除它。
采用消息确认机制后,只要令 autoAck=false
,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直持有消息直到消费者显式调用 basicAck 为止。
当 autoAck=false
时,对于 RabbitMQ 服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者 ack 信号的消息。如果服务器端一直没有收到消费者的 ack 信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
RabbitMQ 不会为未 ack 的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。
RabbitMQ 的队列模式
简单队列模式
简单队列模式的基本原理如下:
一个简单的使用示例如下:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
# 1.获取连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("guest");
factory.setPassword("guest");
try (Connection connection = factory.newConnection(); # 2.获取连接对象
Channel channel = connection.createChannel()) { # 3.创建连接通道
channel.queueDeclare(QUEUE_NAME, false, false, false, null); # 4.声明队列
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); # 5.发送消息
System.out.println(" [x] Sent '" + message + "'");
}
}
}
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1); // 设置每个消费者最多处理一条消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// Consumer consumer = new DefaultConsumer(channel) {
// @Override
// public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// String message = new String(body, "UTF-8");
// System.out.println(" [x] Received '" + message + "'");
// }
// };
// channel.basicConsume(QUEUE_NAME, true, consumer);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
System.out.println(" [x] Received '" + new String(delivery.getBody(), "UTF-8") + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, customerTag -> {}); # 其中true表示自动签收
}
}
注:
-
生产者或消费者在声明队列时,会检查是否存在,存在则直接使用,不存在则新建。
-
简单队列模式使用默认的交换机,因此在执行发布时,交换机参数指定为空字符串。
-
消费者需要对消息进行签收,只有被签收的消息才被认为消费成功。
-
其实本质来说,简单队列模式就是缺省了一部分信息,使用默认配置。
工作队列模式
对于简单队列模式,当多个消费者消费同一个队列时,RabbitMQ 会开启公平调度机制(轮流消费),而不会关注消费者的消费能力,因此整个队列的消费性能将会取决于消费能力最低的消费者,从而导致性能的浪费。
以下是同一个队列的两个消费者:
Consumer consumer1 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer1);
Consumer consumer2 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Thread.sleep(500);
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer2);
由于两个消费者均采用自动签收模式,队列则会公平地向消费者推送消息,性能较差的消费者处理能力较低,则会在短时间内引起消息堆积,最终影响到整个生产消费系统的性能。
从这个示例可以看出,简单消费模式的弊端的本质原因在于公平消费机制与消费者能力不均衡之间的矛盾。因此,对于不需要保证消费者之间绝对公平的消费场景,则可以采用手动签收机制,根据消费者实际的消费能力情况,动态地进行消息签收,这种手动签收消息的模式则被称为工作队列模式。
工作队列模式,也称为任务队列,当多个消费者从同一个队列中获取消息并处理,消费者之间会根据能力竞争处理消息,适合一些较为耗时的消费场景。
工作队列模式通过手动签收实现:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
try {
Thread.sleep(1000); // 模拟耗时任务
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息已经被消费
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer); // 手动确认消息
发布订阅模式
在前简单队列模式和工作队列模式中,对于生产者发布的同一条消息,仅支持单一消费者消费,无法支持一条消息同时被多个消费者消费。要实现这种功能,就需要使用发布订阅模式。
而要使用发布订阅模式,就需要借助交换机(Exchange)来进行实现,一个简单的发布订阅模式示例如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); # 声明交换机类型:fanout
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("Sent message: " + message);
}
}
}
}
import com.rabbitmq.client.*;
public class Consumer {
private final static String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, ""); # 将队列绑定到交换机
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Received message: " + new String(body, "UTF-8"));
}
};
channel.basicConsume(queueName, true, consumer);
}
}
路由模式
在发布订阅模式中,生产者将消息发布到交换机,绑定该交换机的所有队列都将获取到一份消息,并将获取到的消息推送给各自的消费者。这种路由模式可以使生产者的同一条消息被所有订阅的消费者所消费,但对于交换机内的同一条消息,它将无差别地发送到所有与之绑定的队列中。
一个简单的路由模式示例如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String routingKey = "info";
for (int i = 0; i < 10; i++) {
String message = "Message " + i + " with routing key " + routingKey;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("Sent message: " + message);
}
}
}
}
但通过指定交换机类型为direct
,我们却可以通过一种路由策略(routingKey),设定允许接受交换机消息的队列,这种模式被称为路由模式。
import com.rabbitmq.client.*;
public class Consumer {
private final static String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
String routingKey = "info"; # routingKey与生产者一致时能够监听到消息
// String routingKey = "error"; # routingKey与生产者不一致时则监听不到消息
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
主题模式
对于路由模式,队列与其所绑定的交换机之间,必须保证路由键(routingKey)完全一致,才能够接收到路由器发出的消息。在此基础上,主题模式(Topic 模式)则扩宽了路由键的匹配范围,路由匹配更加灵活。
主题模式通过使用*
、#
和.
这三个符号,扩宽了路由键的匹配范围。
-
.
:用于分割路由键字符串。 -
*
:在同一级内进行通配,不能跨越.
进行匹配。 -
#
:匹进行通配,可以跨越.
进行匹配。
与路由模式十分相似,主题模式在使用时需要声明交换机类型为 Topic:
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
消费者在绑定路由键时,则根据主题模式的特殊规则进行灵活指定即可:
channel.queueBind(queueName, EXCHANGE_NAME, "info.*.name.#");
SpringBoot 整合
-
添加依赖。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
配置连接信息。
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.listener.simple.acknowledge-mode=manual # 手动ack,默认为自动
-
创建配置类。
/** * 对于不同的队列模式,注入Bean的内容及关系绑定的参数略有不同 */ @Configuration public class RabbitConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout_exchange"); // return new TopicExchange("topic_exchange"); } @Bean public Queue queue() { return new AnonymousQueue(); } @Bean public Binding binding(FanoutExchange fanoutExchange, Queue queue) { return BindingBuilder.bind(queue).to(fanoutExchange); // return BindingBuilder.bind(queue).to(topicExchange).with("info.*"); } }
-
创建消息队列。
@Component public class Receiver { @RabbitListener(queues = "#{queue.name}") // @RabbitHandler public void process(Message message, Channel channel) throws IOException { System.out.println("Received message: " + new String(message.getBody())); // 手动ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
-
发送消息到队列。
@Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { // 对于不同的队列模式,发送方法的参数也有所不同 rabbitTemplate.convertAndSend("fanout_exchange", message); // rabbitTemplate.convertAndSend("topic_exchange", "info.detail", message); }
高级特性
如何保证 RabbitMQ 消息的可靠性,主要从哪几个方面考虑?要保证 RabbitMQ 消息的可靠性,需要从以下几个方面考虑:
-
消息确认机制:RabbitMQ 提供了消息确认机制,可以确保消息被正确地发送到了队列中。消息确认机制包括生产者确认和消费者确认两种方式,可以根据业务需求选择使用。
-
消息持久化:将消息标记为持久化,可以确保消息在 RabbitMQ 服务器宕机或重启后不会丢失。可以通过将消息的 delivery mode 设置为 2 来实现消息持久化。
-
集群模式:在 RabbitMQ 集群模式下,可以将消息复制到多个节点,确保消息在节点宕机时不会丢失。可以通过配置 RabbitMQ 集群来实现。
-
限流机制:在消息生产者发送消息的时候,可以设置每秒发送的消息数量,防止消息发送过快导致 RabbitMQ 服务器负载过高,从而影响消息可靠性。
-
监控机制:通过监控 RabbitMQ 服务器的运行状态,可以及时发现问题并进行处理,确保消息的可靠性。可以使用 RabbitMQ 自带的管理界面或者第三方监控工具来实现监控。
消息确认机制
RabbitMQ 提供了一种 confirm 机制,用于保证交换机中消息的可靠性。
-
消息确认机制在 Spring 中的应用。
public static void main(String[] argv) throws Exception { # 1.获取连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/test"); factory.setUsername("guest"); factory.setPassword("guest"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.confirmSelect(); // 开启confirm机制 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 消息被Broker签收时回调此方法 } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { // 消息被Broker拒收时回调此方法(可以尝试重试,或进行容错处理) } }); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } }
-
消息确认机制在 SpringBoot 中的应用。
添加配置信息:
spring.rabbitmq.publisher-confirm-type=correlated
publisher-confirm-type 取值范围如下:
-
simple:简单的执行 ack 判断,在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判断下一步的逻辑。但值得注意的是,当 waitForConfirmsOrDie 方法返回 false 时,则会关闭 channel。
-
correlated:执行 ack 时还会携带消息元数据。
-
none:缺省值,禁用发布确认模式。
创建监听器类:
@Component public class RabbitConfirmListener implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { // 将监听器注入到rabbitTemplate rabbitTemplate.setConfirmCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { // 消息投递成功 } else { // 消息投递失败,可进行后续处理,例如存入redis等 } } }
-
注:confirm 机制只能保证生产者与交换机之间消息传递的可靠性,而无法确认交换机与队列之间的可靠性。
Return 机制
Return 机制保证了交换机与队列之间的消息可靠性,在 SpringBoot 中使用 Return 机制,首先需要在配置文件中进行开启:
spring.rabbitmq.publisher.returns=true
同时,与消息确认机制类似,需要创建一个实现RabbitTemplate.ReturnCallback
接口的监听器类:
@Component
public class RabbitConfirmListener implements RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// 将监听器注入到rabbitTemplate
rabbitTemplate.setReturnCallback(this);
}
/**
* 当消息没有传递到队列时回调此方法
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 消息未能成功投递到队列
}
}
而队列到消费者之间的消息可靠性,则是在消费者端通过手动 ack 实现(前文已有叙述)。
ack 与 nack
-
不进行 ack:消费者在接收到消息后,队列中的消息将进入
Unacked
状态,直到该消费者失去会话,消息会从新进入Ready
状态。 -
ack:确认签收,已签收的消息会从队列中出队。
-
reject:拒签消息,一次只支持处理一条消息。此时若 requeue 为 false 时,将会进入死信队列;若 request 为 true,消息则会在当前会话中重新进入队列中。
-
nack:拒签消息,与 reject 相似,不同点在于 nack 支持批量处理。
消息元数据
生产者发布消息时,可以对消息的元数据进行封装,以下是一个简单的封装示例:
Map<String, Object> map = new HashMap<>();
map.put("name", "Jack");
map.put("job", "Developer");
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2) // 消息是否支持持久化:1-不支持,2-支持
.messageId(UUID.randomUUID().toString()) // 自定义消息的业务ID
.expiration("30000") // 自定义消息过期时间,过期则会成为死信
.headers(map)
.build();
String message = "heallo";
channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, properties, message.getBytes());
注:消息的持久化与交换机和队列的持久化并不是一并指定的,而需要各自单独声明。
消息的消费者端则可以通过如下方式获取元数据:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Map<String, Object> map = properties.getHeaders();
String id = properties.getMessageId();
}
};
channel.basicConsume(queueName, true, consumer);
如何防止重复消费
为了防止 RabbitMQ 中的消息被重复消费,可以采用以下几种方法:
-
消息去重:在消费者端记录已经消费过的消息 ID,每次消费消息前先检查该消息是否已经被消费过,如果已经被消费过,则不再进行消费。需要注意的是,该方法需要在消费者端进行处理,可能会增加消费者端的复杂度。
-
消息标记:在消息体中添加一个唯一标识符,例如 UUID,然后在消费者端记录已经消费过的标识符,每次消费消息前先检查该消息的标识符是否已经被消费过,如果已经被消费过,则不再进行消费。该方法需要在生产者端进行处理,在消息发布时添加唯一标识符。
-
消息 ACK:在消费者端消费消息时,如果消费成功,则向 RabbitMQ 发送 ACK 确认消息,告诉 RabbitMQ 该消息已经被消费。如果消费者端在消费时发生异常,可以不发送 ACK 消息,让 RabbitMQ 将该消息重新入队列,等待下一次消费。需要注意的是,该方法需要开启 RabbitMQ 的 ACK 机制。
-
TTL 机制:在生产者端发布消息时,可以设置消息的 TTL(Time To Live),表示该消息的有效期。如果消息在有效期内没有被消费,就会被 RabbitMQ 删除。该方法可以用来防止消息过期被重复消费。
死信队列
死信队列(Dead-Letter Exchange,简称 DLX)是一个特殊的交换机,用于处理未能被消费者正确消费的消息。当消息被拒绝、消息过期或者队列达到最大长度时,消息就会被发送到死信队列中。使用死信队列可以有效地避免消息丢失或者无限循环消费的问题。使用死信队列可以有效地避免消息丢失或者无限循环消费的问题。
使用死信队列的步骤如下:
-
创建一个正常的队列和一个 DLX,将正常队列绑定到 DLX 上。
-
在正常队列上设置 DLX 的名称和路由键,当消息被拒绝、过期或者队列达到最大长度时,该消息就会被发送到 DLX 中。
-
在 DLX 中创建一个队列,用于存储死信消息。
-
将死信队列绑定到 DLX 上,并设置路由键,用于匹配死信消息。
在实际使用中,可以通过设置队列的 x-dead-letter-exchange 和 x-dead-letter-routing-key 属性来实现 DLX 的绑定和路由键设置。需要注意的是,DLX 的设置必须在队列创建时进行,因为队列一旦创建后就无法修改其属性。
使用死信队列可以有效地处理未能被消费者正确消费的消息,避免消息丢失或者无限循环消费的问题。同时,也可以通过设置不同的 DLX 和路由键,将不同类型的死信消息发送到不同的队列中进行处理。
以下是一个死信队列的简单示例:
-
创建正常队列,并设置其 x-dead-letter-exchange 和 x-dead-letter-routing-key 属性:
@Bean public Queue normalQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "dlx.exchange"); args.put("x-dead-letter-routing-key", "dlx.routing.key"); return new Queue("normal.queue", true, false, false, args); }
-
创建 DLX:
@Bean public DirectExchange dlxExchange() { return new DirectExchange("dlx.exchange"); }
-
创建死信队列,并将其绑定到 DLX 上:
@Bean public Queue dlxQueue() { return new Queue("dlx.queue", true); } @Bean public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key"); }
-
创建死信队列消费者:
@Component public class Receiver { @RabbitListener(queues = "dlx.queue") public void process(Message message, Channel channel) throws IOException { System.out.println("DLX message: " + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
通过以上配置,当消息被拒绝、消息过期或者队列达到最大长度时,该消息就会被发送到 DLX 的 dlx.exchange 交换机上,并被路由到 dlx.routing.key 路由键所绑定的死信队列中进行处理。需要注意的是,DLX 和死信队列的处理逻辑需要在消费者端进行实现。
参考
著作権声明
本記事のリンク:https://www.chinmoku.cc/dev/java/advanced/rabbitmq-tutorial/
本博客中的所有内容,包括但不限于文字、图片、音频、视频、图表和其他可视化材料,均受版权法保护。未经本博客所有者书面授权许可,禁止在任何媒体、网站、社交平台或其他渠道上复制、传播、修改、发布、展示或以任何其他方式使用此博客中的任何内容。