前一篇我们构建了一个简单的日志记录系统,能够广播日志消息到所有已绑定的接收者。但是这样有一定的局限性,我们能够按照一定的条件来进行日志分发呢?例如,按照日志级别来分发日志消息,某个消费者只收到error级别的日志?在本章,我们将实现这个功能。
1. Bindings 上一篇,绑定队列的代码如下:
channel.queueBind(queueName, EXCHANGE_NAME, ""); 第三个参数为routingKey,为了便于区分消息发送时的routingKey,我们将队列绑定的routingKey称为bindingKey。
BindingKey允许在队列绑定时设置额外的条件,路由器会按照这个key与消息的routingKey进行条件匹配,成功匹配的消息才会发送到该绑定队列。这个路由器按照bindingKey进行过滤的过程,我们称为消息路由(Routing)。
不同类型的交换器,bindingKey的作用和规则都有所不同。在fanout类型中,bindingKey会被忽略,因为这个交换器类型本身就是为了用于广播消息。在接下来我们要介绍的direct类型的交换器中,设定bindingKey的值有着重要意义。
2. Direct交换器 现在我们对上一篇的日志系统进行扩展,使其可以根据日志的级别进行过滤,假设我们的日志级别有debug、info、warning和error,我们想把error级别的消息单独使用一个消费者来接收,其他的由另外的消费者接收。
上篇中,我们使用的fanout交换器并不能满足上述需求----因为它只会把消息进行简单地广播。
我们将要用到的是 direct类型的交换器,它会按照绑定时给定的bindingKey与消息发布时的routingKey进行 精确匹配。即:当bindingKey与routingKey完成相同时,消息才会被交换器分发给队列。
在开始改造我们的日志程序前,我们先看看多重绑定。
3. 多重绑定 多重绑定,即将相同的bindingKey绑定到多个队列上。RabbitMQ允许这么做,这样与fanout交换器作用类似,可以将消息发送到多个队列。例如:
如图所示,Q1和Q2队列都绑定了black key,那么所有与black匹配的消息都会分发到Q1和Q2中,看起来与消息广播类似。
4. 程序改造 现在,我们开始来改造我们的日志程序,来实现上述的需求。
4.1. 生产者 创建direct类型的交换器:
// 创建direct交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 准备日志数据:
static Log[] logs = { new Log("error", "this is an error log."), new Log("error", "this is an error log."), new Log("error", "this is an error log."), new Log("error", "this is an error log."), new Log("warning", "this is a warning log."), new Log("info", "this is an info log."), new Log("info", "this is an info log."), new Log("info", "this is an info log."), new Log("debug", "this is a debug log."), new Log("debug", "this is a debug log.") }; ...