在上一篇,我们将日志系统做了改造,按照日志级别进行消息路由。我们还认识了direct类型的交换器,它是直接按照bindingKey与routingKey进行精确匹配,这两者分别在队列绑定和消息发送时进行设置。

Direct交换器每次仅能匹配一个精确的条件(bindingKey),如果要实现按照多个条件进行路由,或者按照条件进行模糊匹配,那么它就无能为力了。例如:前一篇你的日志程序,我们既要按照日志级别进行采集,还要根据打印日志的类来进行过滤,使用direct或者fanout都难以实现。

Topic交换器就是专门来处理这种场景的。在本篇,我们不再使用日志的例子,而是以动物为例,来了解topic交换器。

1. Topic交换器

发送到topic交换器的消息所设定的routingKey必须是一系列的单词列表,他们使用"."分隔。通常,这些单词会根据消息内容进行特殊定义,最大长度为255字节,举例:“stock.usd.nyse”、"syse.vmw"、"quick.range.rabbit"。

BindingKey也必须拥有相同的格式和遵循相同的规则。topic交换器和direct交换器处理逻辑上类似:带有特定routingKey的消息将被分发到绑定了匹配bindingKey的所有队列。但是,bindingKey还有两种特殊的通配符

  • *:能够模糊匹配一个单词

  • #:能够模糊匹配零个或多个单词

简单而言,topic交换器能够将消息的routingKey和队列绑定的bindingKey进行模糊匹配。如果不使用上边的两种通配符,那么topic交换器跟direct交换器没什么区别。

通配符举例:

  • *.test.*:仅能匹配中间为test的三个单词的routingKey,例如mq.test.topic。

  • lazy.#:能够匹配以lazy开头的所有routingKey,单词个数不限,例如:lazy能匹配,lazy.test也能匹配

2. 示例

现在,我们来编写一个能够按照动物信息进行消息分发的程序。我们从速度、颜色和种类三个维度来描述动物信息,这里我们的key也是由这三个词语的具体描述组成,格式为“速度.颜色.种类”整体结构大致如下:

98af8e2fb72b4f7bbf9b4e26feceac20

首选创建了一个类型为topic的交换器;然后我们定义了三个key,用于绑定到Q1和Q2两个队列,Q1绑定的key为*.orange.,Q2绑定的key为..rabbit和lazy.;消费者C1希望从Q1获取消息,而C2则希望从Q2获取消息。

整个程序的含义如下:

1、C1对颜色为orange(橙色)的动物感兴趣,希望获取它们的信息;

2、C2除了希望接收物种为rabbit(兔子)的消息外,还希望订阅速度为lazy(缓慢)的所有动物的信息。

很明显,C1、C2获取到的消息肯定存在重复的,它们接收消息的维度不同。

2.1. 生产者

创建交换器

// 创建交换器
channel.exchangeDeclare(exchangeName, "topic");

模拟数据

String[] msgs = {
        "quick.orange.rabbit", "lazy.orange.elephant", "lazy.brown.fox", // 能匹配
        "lazy.black.male.cat", // 四个单词也可以匹配
        "orange", "quick.orange.male.rabbit" // 不能匹配,消息被丢弃
};

发送消息

for (String msg : msgs) {
    System.out.println("发送:" + msg);
    channel.basicPublish(exchangeName, msg, null, msg.getBytes("utf-8"));
}

为了简便,我将消息内容直接作为routingKey。实际上,routingKey需要根据消息内容进行特殊定制。

2.2. 消费者

创建随机队列

``String queueName = channel.queueDeclare().getQueue();``

绑定队列,启动消费者时,需要分别注释Q2和Q1的代码

// Q1
// String bindingKey = "*.orange.*";
// channel.queueBind(queueName, exchangeName, bindingKey);
// Q2
String bindingKey = "*.*.rabbit";
channel.queueBind(queueName, exchangeName, bindingKey);
bindingKey = "lazy.#";
channel.queueBind(queueName, exchangeName, bindingKey);

接收消息

channel.basicConsume(queueName, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "utf-8");
        System.out.println("收到消息:" + msg);
    }
});

打开Q1注释Q2代码,启动一个客户端,再注释Q1,打开Q2代码,启动另一个客户端;然后启动生产者发送消息,可以看到消息的分发情况与上边预期一致。

3. 总结

Topic类型的交换器,支持使用"*"和"#"通配符定义模糊bindingKey,然后按照条件进行模糊匹配。bindingKey的定义需要遵循一定的规则,长度不能超过255字节。


相关阅读