在上一篇,我们将日志系统做了改造,按照日志级别进行消息路由。我们还认识了direct类型的交换器,它是直接按照bindingKey与routingKey进行精确匹配,这两者分别在队列绑定和消息发送时进行设置。
Direct交换器每次仅能匹配一个精确的条件(bindingKey),如果要实现按照多个条件进行路由,或者按照条件进行模糊匹配,那么它就无能为力了。例如:前一篇你的日志程序,我们既要按照日志级别进行采集,还要根据打印日志的类来进行过滤,使用direct或者fanout都难以实现。
Topic交换器就是专门来处理这种场景的。在本篇,我们不再使用日志的例子,而是以动物为例,来了解topic交换器。
1. Topic交换器
发送到topic交换器的消息所设定的routingKey必须是一系列的单词列表,他们使用"."分隔。通常,这些单词会根据消息内容进行特殊定义,最大长度为255字节,举例:“stock.usd.nyse”、"syse.vmw"、"quick.range.rabbit"。
BindingKey也必须拥有相同的格式和遵循相同的规则。topic交换器和direct交换器处理逻辑上类似:带有特定routingKey的消息将被分发到绑定了匹配bindingKey的所有队列。但是,bindingKey还有两种特殊的通配符:
*
:能够模糊匹配一个单词#
:能够模糊匹配零个或多个单词
简单而言,topic交换器能够将消息的routingKey和队列绑定的bindingKey进行模糊匹配。如果不使用上边的两种通配符,那么topic交换器跟direct交换器没什么区别。
通配符举例:
*.test.*
:仅能匹配中间为test的三个单词的routingKey,例如mq.test.topic。lazy.#
:能够匹配以lazy开头的所有routingKey,单词个数不限,例如:lazy能匹配,lazy.test也能匹配
2. 示例
现在,我们来编写一个能够按照动物信息进行消息分发的程序。我们从速度、颜色和种类三个维度来描述动物信息,这里我们的key也是由这三个词语的具体描述组成,格式为“速度.颜色.种类”整体结构大致如下:
首选创建了一个类型为topic的交换器;然后我们定义了三个key,用于绑定到Q1和Q2两个队列,Q1绑定的key为*.orange.,Q2绑定的key为..rabbit和lazy.;消费者C1希望从Q1获取消息,而C2则希望从Q2获取消息。
整个程序的含义如下:
1、C1对颜色为orange(橙色)的动物感兴趣,希望获取它们的信息;
2、C2除了希望接收物种为rabbit(兔子)的消息外,还希望订阅速度为lazy(缓慢)的所有动物的信息。
很明显,C1、C2获取到的消息肯定存在重复的,它们接收消息的维度不同。
2.1. 生产者
创建交换器
// 创建交换器
channel.exchangeDeclare(exchangeName, "topic");
模拟数据
String[] msgs = {
"quick.orange.rabbit", "lazy.orange.elephant", "lazy.brown.fox", // 能匹配
"lazy.black.male.cat", // 四个单词也可以匹配
"orange", "quick.orange.male.rabbit" // 不能匹配,消息被丢弃
};
发送消息
for (String msg : msgs) {
System.out.println("发送:" + msg);
channel.basicPublish(exchangeName, msg, null, msg.getBytes("utf-8"));
}
为了简便,我将消息内容直接作为routingKey。实际上,routingKey需要根据消息内容进行特殊定制。
2.2. 消费者
创建随机队列
``String queueName = channel.queueDeclare().getQueue();``
绑定队列,启动消费者时,需要分别注释Q2和Q1的代码
// Q1
// String bindingKey = "*.orange.*";
// channel.queueBind(queueName, exchangeName, bindingKey);
// Q2
String bindingKey = "*.*.rabbit";
channel.queueBind(queueName, exchangeName, bindingKey);
bindingKey = "lazy.#";
channel.queueBind(queueName, exchangeName, bindingKey);
接收消息
channel.basicConsume(queueName, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("收到消息:" + msg);
}
});
打开Q1注释Q2代码,启动一个客户端,再注释Q1,打开Q2代码,启动另一个客户端;然后启动生产者发送消息,可以看到消息的分发情况与上边预期一致。
3. 总结
Topic类型的交换器,支持使用"*"和"#"通配符定义模糊bindingKey,然后按照条件进行模糊匹配。bindingKey的定义需要遵循一定的规则,长度不能超过255字节。