前一篇,我们创建了工作队列,并将任务发布到队列,每一项任务都会发送给一个worker。接下来,我们将使用发布/订阅模式,将消息分发给多个消费者。

为了说明这一模式,我们将构建一个简单的日志记录系统。它将由两个程序组成----第一个将发出日志消息(生产者),第二个将接收并打印它们(消费者)。运行多个消费者,它们都可以接收消息。

1. 交换器

前边的部分我们都是通过消息队列来发布和接收消息,现在让我们看下RabbitMQ的全消息模型。让我们快速回顾一下 前边的内容

  • producer:用来发送消息

  • queue:用来缓存消息

  • consumer:用来接收消息

交换器,即Exchange,交换器是消息到达的第一站,所有的消息都先发送给交换器,交换器再按照不同的规则进行消息分发。RabbitMQ中的消息传递模型的核心思想是,生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道消息是否会被传递到任何队列。相反,生产者只能发送消息给交换器

交换器做的事情非常简单:一方面,它接收来自生产者的消息,另一边则将消息推送到队列中。交换器必须知道如何处理它接收到的消息。是否应该分发到特定的队列?是否应该分发到多个队列?或者应该被抛弃?其实,这些规则是由交换类型(exchange type)定义的。

d873123c216d4a06abcfca232285227f

交换类型:根据交换器的功能、用途和适用场景,将交换器进行类型定义,每种类型有各自的功能和适用场景。常见的交换类型有:direct、topic、headers、fanout。

接下来,我们学习下fanout类型的交换器。

fanout:将接收到的消息分发到所有能匹配的队列(广播)。简单而言,即:所有订阅了这些消息的队列,都能够收到消息。

创建名为logs的交换器,其类型为fanout:

channel.exchangeDeclare("logs", "fanout");

发送消息:

String message = "中文日志信息";
channel.basicPublish("logs", "", null, message.getBytes("utf-8"));

注意,上边的代码队列名称为空。

2. 临时队列

对于队列,很重要的一点是,我们需要为其命名,因为生产者和消费者必须通过队列名称来定位到具体的队列,从中发送和获取消息(前几篇的"hello"队列和"task_queue"队列名称)。上边发送消息的代码中,我们并没有给队列命名,而是使用了""。

对于我们的日志记录系统,如果要每个消费者都能获取所有的日志消息,那么我们必须完成两点:

  1. 连接到RabbitMQ时,我们需要创建一个全新的队列,里边没有任何消息;

  2. 当消费者与RabbitMQ断开连接,那么队列应该被自动删除。

对于第1点,我们可以利用随机值来命名队列;对于第2点,我们只能检测断开连接后删除队列。但是,这都不是很好的做法。

其实,我们要做的是创建一个非持久的、独占的、自动删除的队列,这个队列的名称随机。RabbitMQ已经为我们提供了这个功能:

String queueName = channel.queueDeclare().getQueue();

如上边代码所示,我们创建了一个默认的队列,这队列具有上述特性,并且名称是随机生成的,格式为amq.gen-xxxx,例如amq.gen-JzTY20BRgKO-HjmUJj0wLg,我们称之为临时队列

关键队列的非持久、独占和自动删除:

非持久(non-durable):队列中的消息不会持久化 独占(exclusive):队列为私有队列,只有当前应用程序能够消费队列 自动删除(autodelete ):最后一个消费者取消订阅队列时,队列自动删除

3. 绑定

本文开头,我们创建一个名称为"logs"、类型为"fanout"的交换器。那么,交换器需要分发消息给队列,如果将队列与交换器进行关联呢?我们把交换器和队列之间的关联关系称为绑定(binding)。

绑定的代码如下:

channel.queueBind(queueName, "logs", "");

queueName即为前边创建的临时队列,第三个参数为routingKey,这里为空。

4. 完整代码

生产者LogSender:

public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 分发消息
        String message = "中文日志信息";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
        channel.close();
        connection.close();
    }

消费者LogReceiver:

public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Channel channel = connectionFactory.newConnection().createChannel();

        channel.exchangeDeclare(LogSender.EXCHANGE_NAME, "fanout");
        // 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        System.out.println("queue name : " + queueName);
        // 绑定队列
        channel.queueBind(queueName, LogSender.EXCHANGE_NAME, "");

        System.out.println("等待接收消息……");
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("收到消息:" + message);
            }
        });
    }

这个过程如下图所示:

baa8b9553c3d422e9ab9497b31f60098

LogSender创建交换器并发送消息到交换器;LogReceiver创建同名的交换器,并每次创建一个临时队列,该队列名称为RabbitMQ自动生成,而且是非持久、独占和自动删除的,然后将队列和交换器进行绑定,这样就可以从这些队列中获取消息。

启动多个LogReceiver,运行LogSender发送日志消息,可以看到每个消费者都能收到消息。

5. 补充说明

5.1. 匿名交换器

在前几篇的例子中,我们整体上演示了RabbitMQ的工作模式,并没有创建交换器,那么我们违背了RabbitMQ的核心思想吗?其实不然。前几篇的例子中,我们已经在使用交换器发送消息了,只是我们使用的是默认名称为""的交换器,即交换器的名称为空,我们称之为匿名交换器:

channel.basicPublish("", "hello", null, message.getBytes("utf-8"));

第一个参数就是交换器的名称,消息都是通过这个未命名的交换器路由到消息队列hello(通过传递一个routingKey参数)。

5.2. 交换器查询

通过rabbitmqctl list_exchanges命令可以查询交换器:

非持久(non-durable):队列中的消息不会持久化 独占(exclusive):队列为私有队列,只有当前应用程序能够消费队列 自动删除(autodelete ):最后一个消费者取消订阅队列时,队列自动删除

d65a2fccb2d445bfaa501ffff76d31f9

可以看到,名称为logs的交换器,类型为fanout,以amq开头的这些交换器是RabbitMQ默认创建的。

5.3. 绑定查询

通过rabbitmqctl list_bindings命令可以查询绑定关系:

非持久(non-durable):队列中的消息不会持久化 独占(exclusive):队列为私有队列,只有当前应用程序能够消费队列 自动删除(autodelete ):最后一个消费者取消订阅队列时,队列自动删除

f5d3650366cb4473a792c97789faf357

可以看到,名为logs的交换器绑定了三个队列。

6. 总结

  1. RabbitMQ中,消息都是发送给交换器exchange,然后在由交换器分发给其下绑定的队列;

  2. fanout交换器类型:进行消息广播,所有绑定的队列都能收到消息,适用于发布/订阅模式;

  3. 系统创建的临时队列,具备独占、非持久化和自动删除特性,而且名称是随机的;

  4. 名称为""的交换器并不是说没有使用交换器,而是使用了匿名交换器;

  5. 通过rabbitmqctl list_exchanges和rabbitmqctl list_bindings命令可以分别查询交换器列表和队列与交换器的绑定关系。

下一篇,我们将了解direct类型的交换器,实现路由功能。


相关阅读