RabbitMQ基础(五)——topic交换器

在上一篇,我们将日志系统做了改造,按照日志级别进行消息路由。我们还认识了direct类型的交换器,它是直接按照bindingKey与routingKey进行精确匹配,这两者分别在队列绑定和消息发送时进行设置。 Direct交换器每次仅能匹配一个精确的条件(bindingKey),如果要实现按照多个条件进行路由,或者按照条件进行模糊匹配,那么它就无能为力了。例如:前一篇你的日志程序,我们既要按照日志级别进行采集,还要根据打印日志的类来进行过滤,使用direct或者fanout都难以实现。 Topic交换器就是专门来处理这种场景的。在本篇,我们不再使用日志的例子,而是以动物为例,来了解topic交换器。 1. Topic交换器 发送到topic交换器的消息所设定的routingKey必须是一系列的单词列表,他们使用"."分隔。通常,这些单词会根据消息内容进行特殊定义,最大长度为255字节,举例:“stock.usd.nyse”、"syse.vmw"、"quick.range.rabbit"。 BindingKey也必须拥有相同的格式和遵循相同的规则。topic交换器和direct交换器处理逻辑上类似:带有特定routingKey的消息将被分发到绑定了匹配bindingKey的所有队列。但是,bindingKey还有两种特殊的通配符: *:能够模糊匹配一个单词 #:能够模糊匹配零个或多个单词 简单而言,topic交换器能够将消息的routingKey和队列绑定的bindingKey进行模糊匹配。如果不使用上边的两种通配符,那么topic交换器跟direct交换器没什么区别。 通配符举例: *.test.*:仅能匹配中间为test的三个单词的routingKey,例如mq.test.topic。 lazy.#:能够匹配以lazy开头的所有routingKey,单词个数不限,例如:lazy能匹配,lazy.test也能匹配 2. 示例 现在,我们来编写一个能够按照动物信息进行消息分发的程序。我们从速度、颜色和种类三个维度来描述动物信息,这里我们的key也是由这三个词语的具体描述组成,格式为“速度.颜色.种类”整体结构大致如下: 首选创建了一个类型为topic的交换器;然后我们定义了三个key,用于绑定到Q1和Q2两个队列,Q1绑定的key为*.orange.,Q2绑定的key为..rabbit和lazy.;消费者C1希望从Q1获取消息,而C2则希望从Q2获取消息。 整个程序的含义如下: 1、C1对颜色为orange(橙色)的动物感兴趣,希望获取它们的信息; 2、C2除了希望接收物种为rabbit(兔子)的消息外,还希望订阅速度为lazy(缓慢)的所有动物的信息。 很明显,C1、C2获取到的消息肯定存在重复的,它们接收消息的维度不同。 2.1. 生产者 创建交换器 // 创建交换器 channel.exchangeDeclare(exchangeName, "topic"); 模拟数据 String[] msgs = { "quick.orange.rabbit", "lazy.orange.elephant", "lazy.brown.fox", // 能匹配 "lazy.black.male.cat", // 四个单词也可以匹配 "orange", "quick.orange.male.rabbit" // 不能匹配,消息被丢弃 }; 发送消息 for (String msg : msgs) { System.out.println("发送:" + msg); channel.basicPublish(exchangeName, msg, null, msg.getBytes("utf-8")); } 为了简便,我将消息内容直接作为routingKey。实际上,routingKey需要根据消息内容进行特殊定制。 2.2. 消费者 创建随机队列 ...

2018-03-23 · 1 min · 130 words · Hank

RabbitMQ基础(三)——fanout交换器和发布/订阅

前一篇,我们创建了工作队列,并将任务发布到队列,每一项任务都会发送给一个worker。接下来,我们将使用发布/订阅模式,将消息分发给多个消费者。 为了说明这一模式,我们将构建一个简单的日志记录系统。它将由两个程序组成----第一个将发出日志消息(生产者),第二个将接收并打印它们(消费者)。运行多个消费者,它们都可以接收消息。 1. 交换器 前边的部分我们都是通过消息队列来发布和接收消息,现在让我们看下RabbitMQ的全消息模型。让我们快速回顾一下 前边的内容: producer:用来发送消息 queue:用来缓存消息 consumer:用来接收消息 交换器,即Exchange,交换器是消息到达的第一站,所有的消息都先发送给交换器,交换器再按照不同的规则进行消息分发。RabbitMQ中的消息传递模型的核心思想是,生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道消息是否会被传递到任何队列。相反,生产者只能发送消息给交换器。 交换器做的事情非常简单:一方面,它接收来自生产者的消息,另一边则将消息推送到队列中。交换器必须知道如何处理它接收到的消息。是否应该分发到特定的队列?是否应该分发到多个队列?或者应该被抛弃?其实,这些规则是由交换类型(exchange type)定义的。 交换类型:根据交换器的功能、用途和适用场景,将交换器进行类型定义,每种类型有各自的功能和适用场景。常见的交换类型有:direct、topic、headers、fanout。 接下来,我们学习下fanout类型的交换器。 fanout:将接收到的消息分发到所有能匹配的队列(广播)。简单而言,即:所有订阅了这些消息的队列,都能够收到消息。 创建名为logs的交换器,其类型为fanout: channel.exchangeDeclare("logs", "fanout"); 发送消息: String message = "中文日志信息"; channel.basicPublish("logs", "", null, message.getBytes("utf-8")); 注意,上边的代码队列名称为空。 2. 临时队列 对于队列,很重要的一点是,我们需要为其命名,因为生产者和消费者必须通过队列名称来定位到具体的队列,从中发送和获取消息(前几篇的"hello"队列和"task_queue"队列名称)。上边发送消息的代码中,我们并没有给队列命名,而是使用了""。 对于我们的日志记录系统,如果要每个消费者都能获取所有的日志消息,那么我们必须完成两点: 连接到RabbitMQ时,我们需要创建一个全新的队列,里边没有任何消息; 当消费者与RabbitMQ断开连接,那么队列应该被自动删除。 对于第1点,我们可以利用随机值来命名队列;对于第2点,我们只能检测断开连接后删除队列。但是,这都不是很好的做法。 其实,我们要做的是创建一个非持久的、独占的、自动删除的队列,这个队列的名称随机。RabbitMQ已经为我们提供了这个功能: String queueName = channel.queueDeclare().getQueue(); 如上边代码所示,我们创建了一个默认的队列,这队列具有上述特性,并且名称是随机生成的,格式为amq.gen-xxxx,例如amq.gen-JzTY20BRgKO-HjmUJj0wLg,我们称之为临时队列。 关键队列的非持久、独占和自动删除: 非持久(non-durable):队列中的消息不会持久化 独占(exclusive):队列为私有队列,只有当前应用程序能够消费队列 自动删除(autodelete ):最后一个消费者取消订阅队列时,队列自动删除 3. 绑定 本文开头,我们创建一个名称为"logs"、类型为"fanout"的交换器。那么,交换器需要分发消息给队列,如果将队列与交换器进行关联呢?我们把交换器和队列之间的关联关系称为绑定(binding)。 绑定的代码如下: channel.queueBind(queueName, "logs", ""); queueName即为前边创建的临时队列,第三个参数为routingKey,这里为空。 4. 完整代码 生产者LogSender: public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 创建exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分发消息 String message = "中文日志信息"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8")); channel.close(); connection.close(); } ...

2018-03-22 · 2 min · 216 words · Hank