前一篇我们构建了一个简单的日志记录系统,能够广播日志消息到所有已绑定的接收者。但是这样有一定的局限性,我们能够按照一定的条件来进行日志分发呢?例如,按照日志级别来分发日志消息,某个消费者只收到error级别的日志?在本章,我们将实现这个功能。

1. Bindings

上一篇,绑定队列的代码如下:

channel.queueBind(queueName, EXCHANGE_NAME, "");

第三个参数为routingKey,为了便于区分消息发送时的routingKey,我们将队列绑定的routingKey称为bindingKey

BindingKey允许在队列绑定时设置额外的条件,路由器会按照这个key与消息的routingKey进行条件匹配,成功匹配的消息才会发送到该绑定队列。这个路由器按照bindingKey进行过滤的过程,我们称为消息路由(Routing)。

不同类型的交换器,bindingKey的作用和规则都有所不同。在fanout类型中,bindingKey会被忽略,因为这个交换器类型本身就是为了用于广播消息。在接下来我们要介绍的direct类型的交换器中,设定bindingKey的值有着重要意义。

2. Direct交换器

现在我们对上一篇的日志系统进行扩展,使其可以根据日志的级别进行过滤,假设我们的日志级别有debug、info、warning和error,我们想把error级别的消息单独使用一个消费者来接收,其他的由另外的消费者接收。

上篇中,我们使用的fanout交换器并不能满足上述需求----因为它只会把消息进行简单地广播。

我们将要用到的是 direct类型的交换器,它会按照绑定时给定的bindingKey与消息发布时的routingKey进行 精确匹配。即:当bindingKey与routingKey完成相同时,消息才会被交换器分发给队列。

在开始改造我们的日志程序前,我们先看看多重绑定。

3. 多重绑定

多重绑定,即将相同的bindingKey绑定到多个队列上。RabbitMQ允许这么做,这样与fanout交换器作用类似,可以将消息发送到多个队列。例如:

22f70d3f1e7742e59b0d0d713baa7d71

如图所示,Q1和Q2队列都绑定了black key,那么所有与black匹配的消息都会分发到Q1和Q2中,看起来与消息广播类似。

4. 程序改造

现在,我们开始来改造我们的日志程序,来实现上述的需求。

4.1. 生产者

创建direct类型的交换器:

// 创建direct交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

准备日志数据:

static Log[] logs = {
        new Log("error", "this is an error log."),
        new Log("error", "this is an error log."),
        new Log("error", "this is an error log."),
        new Log("error", "this is an error log."),
        new Log("warning", "this is a warning log."),
        new Log("info", "this is an info log."),
        new Log("info", "this is an info log."),
        new Log("info", "this is an info log."),
        new Log("debug", "this is a debug log."),
        new Log("debug", "this is a debug log.")
};

分发日志:

// 分发日志
for (Log log : logs) {
    System.out.println("log : " + log.getMsg());
    channel.basicPublish(EXCHANGE_NAME, log.getLevel(), null, log.getMsg().getBytes("utf-8"));
}

生产者的代码非常简单,定义了一个日志数据数组,循环之并发送消息,发送消息时将日志的级别作为routingKey。

4.2. 消费者

创建交换器:

// 创建direct交换器
channel.exchangeDeclare(LogSenderDirect.EXCHANGE_NAME, "direct");

绑定队列:

// 创建随机队列
String queueName = channel.queueDeclare().getQueue();
// 绑定队列
// 队列1
// System.out.println("routingKey: " + "error");
// channel.queueBind(queueName, LogSenderDirect.EXCHANGE_NAME, "error");
// 队列2
System.out.println("routingKey: " + "debug");
channel.queueBind(queueName, LogSenderDirect.EXCHANGE_NAME, "debug");
System.out.println("routingKey: " + "info");
channel.queueBind(queueName, LogSenderDirect.EXCHANGE_NAME, "info");
System.out.println("routingKey: " + "warning");
channel.queueBind(queueName, LogSenderDirect.EXCHANGE_NAME, "warning");

接收消息:

// 处理日志
channel.basicConsume(queueName, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("log : " + new String(body, "utf-8"));
    }
});

队列1绑定了一个队列,bindingKey为error;队列2绑定了一个队列,绑定了debug、info、warning三个日志级别。分别注释队列2和队列1的绑定代码,启动两个消费者,然后启动生产者,可以看到,消费者1仅收到error日志,而消费者2收到了debug、info和warning的日志。

4.3. 过程分析

整个日志分发的流程如下图所示:

058c29dce41846aaa4dddcda5163b6ff

Producer生产各种级别的日志信息,发送给type为direct的交换器Exchange;在消费者端,队列Queue1绑定的bindingKey为error,队列Queue2则绑定了多个key,分别为debug、info和warning;最终,Consumer1从Queue1获取消息,而Consumer2从Queue2获取消息,实现了日志的按级别转发和接收。

完成的代码见: LogSenderDirect.javaLogReceiverDirect.java

5. 总结

1、绑定队列时,可以设置bindingKey,发送消息时可以设置消息的routingKey;

2、direct交换器,按照消息的routingKey和bindingKey进行精确匹配,完全相同才会进行转发;

3、相同的key可以与多个队列进行绑定,每个队列都可以收到与key匹配的消息,这称为多重绑定。

在本篇,我们简单了解了direct类型的交换器,它适用于需要精确匹配并转发消息的场景;在下一篇,我们将会了解topic类型的交换器,以及其作用及适用场景。

=


相关阅读