前一篇,我们创建了工作队列,并将任务发布到队列,每一项任务都会发送给一个worker。接下来,我们将使用发布/订阅模式,将消息分发给多个消费者。
为了说明这一模式,我们将构建一个简单的日志记录系统。它将由两个程序组成----第一个将发出日志消息(生产者),第二个将接收并打印它们(消费者)。运行多个消费者,它们都可以接收消息。
1. 交换器
前边的部分我们都是通过消息队列来发布和接收消息,现在让我们看下RabbitMQ的全消息模型。让我们快速回顾一下 前边的内容:
producer:用来发送消息
queue:用来缓存消息
consumer:用来接收消息
交换器,即Exchange,交换器是消息到达的第一站,所有的消息都先发送给交换器,交换器再按照不同的规则进行消息分发。RabbitMQ中的消息传递模型的核心思想是,生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道消息是否会被传递到任何队列。相反,生产者只能发送消息给交换器。
交换器做的事情非常简单:一方面,它接收来自生产者的消息,另一边则将消息推送到队列中。交换器必须知道如何处理它接收到的消息。是否应该分发到特定的队列?是否应该分发到多个队列?或者应该被抛弃?其实,这些规则是由交换类型(exchange type)定义的。
交换类型:根据交换器的功能、用途和适用场景,将交换器进行类型定义,每种类型有各自的功能和适用场景。常见的交换类型有:direct、topic、headers、fanout。
接下来,我们学习下fanout类型的交换器。
fanout:将接收到的消息分发到所有能匹配的队列(广播)。简单而言,即:所有订阅了这些消息的队列,都能够收到消息。
创建名为logs的交换器,其类型为fanout:
channel.exchangeDeclare("logs", "fanout");
发送消息:
String message = "中文日志信息";
channel.basicPublish("logs", "", null, message.getBytes("utf-8"));
注意,上边的代码队列名称为空。
2. 临时队列
对于队列,很重要的一点是,我们需要为其命名,因为生产者和消费者必须通过队列名称来定位到具体的队列,从中发送和获取消息(前几篇的"hello"队列和"task_queue"队列名称)。上边发送消息的代码中,我们并没有给队列命名,而是使用了""。
对于我们的日志记录系统,如果要每个消费者都能获取所有的日志消息,那么我们必须完成两点:
连接到RabbitMQ时,我们需要创建一个全新的队列,里边没有任何消息;
当消费者与RabbitMQ断开连接,那么队列应该被自动删除。
对于第1点,我们可以利用随机值来命名队列;对于第2点,我们只能检测断开连接后删除队列。但是,这都不是很好的做法。
其实,我们要做的是创建一个非持久的、独占的、自动删除的队列,这个队列的名称随机。RabbitMQ已经为我们提供了这个功能:
String queueName = channel.queueDeclare().getQueue();
如上边代码所示,我们创建了一个默认的队列,这队列具有上述特性,并且名称是随机生成的,格式为amq.gen-xxxx,例如amq.gen-JzTY20BRgKO-HjmUJj0wLg,我们称之为临时队列。
关键队列的非持久、独占和自动删除:
非持久(non-durable):队列中的消息不会持久化 独占(exclusive):队列为私有队列,只有当前应用程序能够消费队列 自动删除(autodelete ):最后一个消费者取消订阅队列时,队列自动删除
3. 绑定
本文开头,我们创建一个名称为"logs"、类型为"fanout"的交换器。那么,交换器需要分发消息给队列,如果将队列与交换器进行关联呢?我们把交换器和队列之间的关联关系称为绑定(binding)。
绑定的代码如下:
channel.queueBind(queueName, "logs", "");
queueName即为前边创建的临时队列,第三个参数为routingKey,这里为空。
4. 完整代码
生产者LogSender:
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 创建exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 分发消息
String message = "中文日志信息";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
channel.close();
connection.close();
}
消费者LogReceiver:
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Channel channel = connectionFactory.newConnection().createChannel();
channel.exchangeDeclare(LogSender.EXCHANGE_NAME, "fanout");
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
System.out.println("queue name : " + queueName);
// 绑定队列
channel.queueBind(queueName, LogSender.EXCHANGE_NAME, "");
System.out.println("等待接收消息……");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("收到消息:" + message);
}
});
}
这个过程如下图所示:
LogSender创建交换器并发送消息到交换器;LogReceiver创建同名的交换器,并每次创建一个临时队列,该队列名称为RabbitMQ自动生成,而且是非持久、独占和自动删除的,然后将队列和交换器进行绑定,这样就可以从这些队列中获取消息。
启动多个LogReceiver,运行LogSender发送日志消息,可以看到每个消费者都能收到消息。
5. 补充说明
5.1. 匿名交换器
在前几篇的例子中,我们整体上演示了RabbitMQ的工作模式,并没有创建交换器,那么我们违背了RabbitMQ的核心思想吗?其实不然。前几篇的例子中,我们已经在使用交换器发送消息了,只是我们使用的是默认名称为""的交换器,即交换器的名称为空,我们称之为匿名交换器:
channel.basicPublish("", "hello", null, message.getBytes("utf-8"));
第一个参数就是交换器的名称,消息都是通过这个未命名的交换器路由到消息队列hello(通过传递一个routingKey参数)。
5.2. 交换器查询
通过rabbitmqctl list_exchanges命令可以查询交换器:
非持久(non-durable):队列中的消息不会持久化 独占(exclusive):队列为私有队列,只有当前应用程序能够消费队列 自动删除(autodelete ):最后一个消费者取消订阅队列时,队列自动删除
可以看到,名称为logs的交换器,类型为fanout,以amq开头的这些交换器是RabbitMQ默认创建的。
5.3. 绑定查询
通过rabbitmqctl list_bindings命令可以查询绑定关系:
非持久(non-durable):队列中的消息不会持久化 独占(exclusive):队列为私有队列,只有当前应用程序能够消费队列 自动删除(autodelete ):最后一个消费者取消订阅队列时,队列自动删除
可以看到,名为logs的交换器绑定了三个队列。
6. 总结
RabbitMQ中,消息都是发送给交换器exchange,然后在由交换器分发给其下绑定的队列;
fanout交换器类型:进行消息广播,所有绑定的队列都能收到消息,适用于发布/订阅模式;
系统创建的临时队列,具备独占、非持久化和自动删除特性,而且名称是随机的;
名称为""的交换器并不是说没有使用交换器,而是使用了匿名交换器;
通过rabbitmqctl list_exchanges和rabbitmqctl list_bindings命令可以分别查询交换器列表和队列与交换器的绑定关系。
下一篇,我们将了解direct类型的交换器,实现路由功能。