RabbitMQ基础(四)——direct交换器与路由

前一篇我们构建了一个简单的日志记录系统,能够广播日志消息到所有已绑定的接收者。但是这样有一定的局限性,我们能够按照一定的条件来进行日志分发呢?例如,按照日志级别来分发日志消息,某个消费者只收到error级别的日志?在本章,我们将实现这个功能。 1. Bindings 上一篇,绑定队列的代码如下: channel.queueBind(queueName, EXCHANGE_NAME, ""); 第三个参数为routingKey,为了便于区分消息发送时的routingKey,我们将队列绑定的routingKey称为bindingKey。 BindingKey允许在队列绑定时设置额外的条件,路由器会按照这个key与消息的routingKey进行条件匹配,成功匹配的消息才会发送到该绑定队列。这个路由器按照bindingKey进行过滤的过程,我们称为消息路由(Routing)。 不同类型的交换器,bindingKey的作用和规则都有所不同。在fanout类型中,bindingKey会被忽略,因为这个交换器类型本身就是为了用于广播消息。在接下来我们要介绍的direct类型的交换器中,设定bindingKey的值有着重要意义。 2. Direct交换器 现在我们对上一篇的日志系统进行扩展,使其可以根据日志的级别进行过滤,假设我们的日志级别有debug、info、warning和error,我们想把error级别的消息单独使用一个消费者来接收,其他的由另外的消费者接收。 上篇中,我们使用的fanout交换器并不能满足上述需求----因为它只会把消息进行简单地广播。 我们将要用到的是 direct类型的交换器,它会按照绑定时给定的bindingKey与消息发布时的routingKey进行 精确匹配。即:当bindingKey与routingKey完成相同时,消息才会被交换器分发给队列。 在开始改造我们的日志程序前,我们先看看多重绑定。 3. 多重绑定 多重绑定,即将相同的bindingKey绑定到多个队列上。RabbitMQ允许这么做,这样与fanout交换器作用类似,可以将消息发送到多个队列。例如: 如图所示,Q1和Q2队列都绑定了black key,那么所有与black匹配的消息都会分发到Q1和Q2中,看起来与消息广播类似。 4. 程序改造 现在,我们开始来改造我们的日志程序,来实现上述的需求。 4.1. 生产者 创建direct类型的交换器: // 创建direct交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 准备日志数据: static Log[] logs = { new Log("error", "this is an error log."), new Log("error", "this is an error log."), new Log("error", "this is an error log."), new Log("error", "this is an error log."), new Log("warning", "this is a warning log."), new Log("info", "this is an info log."), new Log("info", "this is an info log."), new Log("info", "this is an info log."), new Log("debug", "this is a debug log."), new Log("debug", "this is a debug log.") }; ...

2018-03-23 · 2 min · 225 words · Hank

RabbitMQ基础(三)——fanout交换器和发布/订阅

前一篇,我们创建了工作队列,并将任务发布到队列,每一项任务都会发送给一个worker。接下来,我们将使用发布/订阅模式,将消息分发给多个消费者。 为了说明这一模式,我们将构建一个简单的日志记录系统。它将由两个程序组成----第一个将发出日志消息(生产者),第二个将接收并打印它们(消费者)。运行多个消费者,它们都可以接收消息。 1. 交换器 前边的部分我们都是通过消息队列来发布和接收消息,现在让我们看下RabbitMQ的全消息模型。让我们快速回顾一下 前边的内容: producer:用来发送消息 queue:用来缓存消息 consumer:用来接收消息 交换器,即Exchange,交换器是消息到达的第一站,所有的消息都先发送给交换器,交换器再按照不同的规则进行消息分发。RabbitMQ中的消息传递模型的核心思想是,生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道消息是否会被传递到任何队列。相反,生产者只能发送消息给交换器。 交换器做的事情非常简单:一方面,它接收来自生产者的消息,另一边则将消息推送到队列中。交换器必须知道如何处理它接收到的消息。是否应该分发到特定的队列?是否应该分发到多个队列?或者应该被抛弃?其实,这些规则是由交换类型(exchange type)定义的。 交换类型:根据交换器的功能、用途和适用场景,将交换器进行类型定义,每种类型有各自的功能和适用场景。常见的交换类型有:direct、topic、headers、fanout。 接下来,我们学习下fanout类型的交换器。 fanout:将接收到的消息分发到所有能匹配的队列(广播)。简单而言,即:所有订阅了这些消息的队列,都能够收到消息。 创建名为logs的交换器,其类型为fanout: channel.exchangeDeclare("logs", "fanout"); 发送消息: String message = "中文日志信息"; channel.basicPublish("logs", "", null, message.getBytes("utf-8")); 注意,上边的代码队列名称为空。 2. 临时队列 对于队列,很重要的一点是,我们需要为其命名,因为生产者和消费者必须通过队列名称来定位到具体的队列,从中发送和获取消息(前几篇的"hello"队列和"task_queue"队列名称)。上边发送消息的代码中,我们并没有给队列命名,而是使用了""。 对于我们的日志记录系统,如果要每个消费者都能获取所有的日志消息,那么我们必须完成两点: 连接到RabbitMQ时,我们需要创建一个全新的队列,里边没有任何消息; 当消费者与RabbitMQ断开连接,那么队列应该被自动删除。 对于第1点,我们可以利用随机值来命名队列;对于第2点,我们只能检测断开连接后删除队列。但是,这都不是很好的做法。 其实,我们要做的是创建一个非持久的、独占的、自动删除的队列,这个队列的名称随机。RabbitMQ已经为我们提供了这个功能: String queueName = channel.queueDeclare().getQueue(); 如上边代码所示,我们创建了一个默认的队列,这队列具有上述特性,并且名称是随机生成的,格式为amq.gen-xxxx,例如amq.gen-JzTY20BRgKO-HjmUJj0wLg,我们称之为临时队列。 关键队列的非持久、独占和自动删除: 非持久(non-durable):队列中的消息不会持久化 独占(exclusive):队列为私有队列,只有当前应用程序能够消费队列 自动删除(autodelete ):最后一个消费者取消订阅队列时,队列自动删除 3. 绑定 本文开头,我们创建一个名称为"logs"、类型为"fanout"的交换器。那么,交换器需要分发消息给队列,如果将队列与交换器进行关联呢?我们把交换器和队列之间的关联关系称为绑定(binding)。 绑定的代码如下: channel.queueBind(queueName, "logs", ""); queueName即为前边创建的临时队列,第三个参数为routingKey,这里为空。 4. 完整代码 生产者LogSender: public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 创建exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分发消息 String message = "中文日志信息"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8")); channel.close(); connection.close(); } ...

2018-03-22 · 2 min · 216 words · Hank