使用Springboot开发websocket程序(四)——使用RabbitMQ作为STOMP消息代理

上一篇,我们在介绍了Spring中如何使用websocket的子协议stomp,并使用简单的基于内存的stomp消息代理来编写了一个web聊天室实例。基于内存的stomp消息代理,虽然能够满足基本需求,但还是存在一些不足,比如由于stomp代理在应用内部,多个外部websocket应用需要消息互通,那么就难以满足了。在本篇,我们来学习如何使用RabbitMQ作为stomp代理。 1. 为何要使用外部消息代理 简单消息代理,能够满足单websocket应用的需要,但是如果有多个websocket应用,他们之间需要进行消息共享,那么就需要做大量的工作才能实现了。其实,MQ一个最重要的作用就在于能个在各个系统间解耦。引入外部MQ作为stomp消息代理,很好的解决了多系统消息共享的问题,只要其支持stomp协议。RabbitMQ本身提供了对STOMP的支持,加上后结构变化如下: 前边的是单应用时的结构,后边为怎么了RabbitMQ过后,多个应用程序结构。 2. RabbitMQ对STOMP的支持 RabbitMQ对stomp协议的支持是通过插件的方式,默认stomp插件是关闭的,我们需要先启用之。 2.1. 启用插件 进入rabbitmq所在服务器,然后控制台输入如下命令来启用stomp插件: ``rabbitmq-plugins enable rabbitmq_stomp`` 然后可以查看插件是否启用成功: ``rabbitmq-plugins list`` 2.2. 插件配置 默认情况下,STOMP将会监听61613端口,默认的用户名和密码都为guest。通过配置文件来配置: ubuntu下rabbitmq的配置文件在/etc/rabbitmq/rabbitmq.conf,找到stomp开头的选项,就可以进行配置了 比如配置STOMP监听端口: ``stomp.listeners.tcp.1 = 12345`` RabbitMQ中STOMP适配器连接时如果用户名和密码使用默认的guest/guest,则可以忽略,如果需要修改,则配置如下: stomp.default_user = guest stomp.default_pass = guest 2.3. Destinations STOMP规范并没有规定消息代理来支持什么样的目的地(destination),只是根据消息头的destination的值来判断消息发送的目的地,一般由消息代理自定义支持,RabbitMQ中定义了几种destination类型: #exchange[/exchange]: 发送到任意的routing key和订阅任意的binding key #queue[/queue]: 发送和订阅队列,该队列由STOMP管理 #amqqueue[/amq/queue]: 发送和订阅外部创建的队列 #topic[/topic]: 发送和订阅到topic #temptopic[/temp-queue/]: 创建临时的队列(使用reply-to请求头) 现在,我们结合代码来看看Spring中对RabbitMQ的这几类destination是如何支持的。 3. Spring中使用RabbitMQ消息代理 我们通过一个demo来看看如何在Spring中使用RabbitMQ支持的这几个destination,整体界面如下; 下边的示例仅贴上部分关键代码,完整的代码可以参看文末的源码。 首先,我们创建一个名为03-websocket-stomp-rabbitmq的springboot工程,引入如下依赖: <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-net</artifactId> <version>2.0.5.RELEASE</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.33.Final</version> </dependency> <dependencies> ...

2019-11-12 · 3 min · 509 words · Hank

Spring AMQP消息转换

上一篇,我们介绍了如果使用Spring AMQP注解来实现消息发送和监听,示例都是使用的默认的消息转换器,即SimpleMessageConverter,它只能处理byte[]、String、java序列化对象(实现了Serializable接口的对象)。 通常,不推荐使用Java序列化,因为它存在与Java对象强耦合、依赖java语言等缺点,Spring AMQP也提供了其他的消息转换方式,在本篇,我们将重点来看看如果将消息序列化为JSON格式。 1. MessageConverter Spring AMQP消息转换定义了顶层接口MessageConverter,它的定义如下: public interface MessageConverter { // 将对象转换为Message对象,支持自定义消息属性 Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException; // 将Message转换为对象 Object fromMessage(Message message) throws MessageConversionException; } 它定义了两个方法:将对象转换为Message,将Message转换为对象。 同时,在AmqpTemplate中定义了便捷的消息转换和发送的方法: void convertAndSend(Object message) throws AmqpException; void convertAndSend(String routingKey, Object message) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException; void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException; void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException; ...

2019-06-17 · 3 min · 488 words · Hank

Spring AMQP注解的使用

上一篇 "Spring AMQP简介和使用",我们介绍了Spring AMQP的一些基本要素和概念,也通过一些示例代码介绍了消息的发送和接收,但是都是使用的原始编码方式来实现,并不依赖Spring环境。其实,Spring AMQP也支持使用注解的方式来进行异步接收消息,极大的简化了编码。 1. hello world 要使用注解,首先需要在Spring应用环境中,我们看一个最简单的demo: 1、重新新建一个Spring boot工程,添加如下依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>1.7.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.6.RELEASE</version> </dependency> 2、新建一个Spring配置类RabbitConfiguration,用来申明Bean: @Configuration public class RabbitConfiguration { public static final String ANONYMOUS_QUEUE_NAME = "spring.amqp.anonymous.queue"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("192.168.0.27", 5672); cachingConnectionFactory.setUsername("admin"); cachingConnectionFactory.setPassword("123456"); return cachingConnectionFactory; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } @Bean public Queue anonymousQueue() { // 匿名队列 return new AnonymousQueue(() -> ANONYMOUS_QUEUE_NAME); } } ...

2019-06-13 · 5 min · 990 words · Hank

Spring AMQP简介和使用

1. 前言 很早之前写过几篇关于RabbitMQ的一些基础文章,在本篇中,我们将来学习Spring AMQP。 Spring AMQP 是 Spring 对 AMQP( http://www.amqp.org) 协议的封装和扩展,它提供了模板来对消息的发送和接收进行高级抽象,还提供了基于消息驱动的POJO的支持(类似JMS,Java消息服务)。 在开始之前,首先需要了解一些 RabbitMQ 和 AMQP 的一些基础概念,并在你的机器上安装 RabbitMQ, 本文使用的spring-amqp的版本为1.7.6。 2. Spring AMQP抽象 Spring AMQP 是 Spring 对 AMQP 协议的封装和扩展,提供了消息发送和接收的模板。Spring AMQP项目将核心Spring概念应用于基于 AMQP 的消息传递解决方案的开发,以便更容易和简单的管理AMQO资源。 Spring AMQP由spring-amqp和spring-rabbit两个模块组成。spring-amqp模块位于org.springframework.amqp.core包,它的目标是提供不依赖于任何特定AMQP代理实现或客户端库的通用抽象;而spring-rabbit是spring-amqp通用抽象的具体实现,目前仅提供了rabbitmq的实现。 Spring AMQP包括一些基本的抽象定义(上边说过,他们位于org.springframework.amqp.coreb包中,而非AMQP协议本身定义): 2.1. Message 在0-9-1版本的 AMQP 规范中没有定义 Message 类或接口,当执行诸如 basicPublish() 操作时,内容作为字节数组参数传递,而其他属性作为单独的参数传递。Spring AMQP将 Message 类定义为更通用的AMQP域模型表示的一部分。Message该类的目的是将主体和属性封装在单个实例中,以便API可以更简单。以下示例显示了Message类定义: public class Message { private final MessageProperties messageProperties; private final byte[] body; public Message(byte[] body, MessageProperties messageProperties) { this.body = body; this.messageProperties = messageProperties; } public byte[] getBody() { return this.body; } public MessageProperties getMessageProperties() { return this.messageProperties; } } ...

2019-06-05 · 8 min · 1703 words · Hank

RabbitMQ服务器管理(二)——权限管理

RabbitMQ有用一套专门的权限控制系统,用来控制不同用户对不同虚拟主机的访问控制。基本思路同大多系统一样:先创建用户,然后为用户授予权限。 大多数系统并没有严格区分认证和鉴权,但是在RabbitMQ中,对这两个概念做了明确的区分: 认证:识别用户身份,即:识别当前用户,认证其身份信息 鉴权:明确了用户身份,那么授权就是要检查该用户是否拥有相应的权限,即:检查授予用户的权限 1. 虚拟主机和guest用户 在 RabbitMQ基础(七)----虚拟主机vhost一篇,我们已经详细介绍了虚拟主机。我们说过,首次安装好RabbitMQ时,会拥有一个默认的虚拟主机“/”。同时,还有一个默认的用户“guest”,密码也为“guest”,该用户有用默认虚拟主机的全部权限。 如果你得RabbitMQ需要公网访问,出于安全性考虑,官方建议删掉该用户或者修改其密码。默认情况下,RabbitMQ禁止guest用户远程访问,只可以访问本地的mq服务。这个是通过loopback_users配置项决定的,如果需要取消该限制,仅需将该选项配置为none即可: loopback_users = none 如果是3.7之前的版本,还不支持这样key=value的配置格式,那么你需要配置成这样: ``[{rabbit, [{loopback_users, []}]}].`` 该配置想的意思是,取消所有用户的本地访问限制。按照 官方的示例配置文件,如果仅需要取消guest用户本地访问限制,那么进行以下配置: loopback_users.guest = false 否则设置为true即可。 关于MQ的配置文件,我们将在后续博文中详细讨论,官方文档见 这里。 2. 权限工作机制 当客户端与服务端建立连接时,第一级权限控制将会执行:服务器会检查连接的用户是否有用访问其连接的虚拟主机的权限,没有则会拒绝连接,否则连接建立成功。 用户操作虚拟主机的资源(路由器、队列、绑定等)时,RabbitMQ会启用第二级权限控制,验证用户是否具有访问虚拟机的资源的权限。 2.1. 权限定义 具体而言,在RabbitMQ中,对资源的操作定义了三种权限: 配置:创建和删除资源,或者改变它们的行为; 写:发布消息到资源; 读:从资源获取消息; 下表显示了对执行权限检查的所有AMQP命令所需的资源类型的权限: AMQP 0-9-1 Operation configure write read exchange.declare (passive=false) exchange exchange.declare (passive=true) exchange.declare (with AE) exchange exchange (AE) exchange exchange.delete exchange queue.declare (passive=false) queue queue.declare (passive=true) queue.declare (with DLX) queue exchange (DLX) queue queue.delete queue exchange.bind exchange (destination) exchange (source) exchange.unbind exchange (destination) exchange (source) queue.bind queue exchange queue.unbind queue exchange basic.publish exchange basic.get queue basic.consume queue queue.purge queue 说明: ...

2018-04-12 · 3 min · 504 words · Hank

RabbitMQ服务器管理(一)——ubuntu上安装MQ

在本篇,我们将在ubuntu上安装RabbitMQ,其他操作系统类似。 我们知道,RabbitMQ使用Erlang语言开发,所以需要先安装Erlang语言,在实践过程中,linux安装RabbitMQ还比较麻烦,涉及到很多依赖包的安装,官方的安装文档见 这里。 1. 正确的安装步骤 简单而言,正确的安装步骤如下: 1、安装erlang依赖包 wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb sudo dpkg -i erlang-solutions_1.0_all.deb 编辑/etc/apt/sources.list,添加以下地址的任意一个 deb https://packages.erlang-solutions.com/ubuntu trusty contrib deb https://packages.erlang-solutions.com/ubuntu saucy contrib deb https://packages.erlang-solutions.com/ubuntu precise contrib 然后执行: wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb sudo dpkg -i erlang-solutions_1.0_all.deb 2、安装erlang sudo apt-get update sudo apt-get install esl-erlang(或erlang) 3、下载RabbitMQ wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.12/rabbitmq-server_3.7.12-1_all.deb 4、安装RabbitMQ依赖包 apt-get install socat get-get install init-system-helpers get-get install adduser get-get install logrotate 5、安装RabbitMQ dpkg -i rabbitmq-server_3.7.12-1_all.deb 具体我的安装流程以及遇到的问题记录如下: 2. 安装过程 2.1. 下载安装包 ubuntu系统基于debian,所以我们要下载官方给的deb安装包: wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.4/rabbitmq-server_3.7.4-1_all.deb 等待下载完成。 2.2. 尝试安装 下载安装后,尝试直接安装: ...

2018-04-12 · 2 min · 255 words · Hank

RabbitMQ基础(七)——虚拟主机vhost

1. 简介 RabbitMQ是一个多租户系统:连接、交换器、队列、绑定、用户权限、策略和其他的东西都属于虚拟主机(virtual hosts,v_host),他们是整个RabbitMQ的逻辑分组。 虚拟主机类似于 Apache的虚拟主机和 Nginx的server块,最重要的区别是:Apache的虚拟主机通过配置文件定义,然而在RabbitMQ中,RabbitMQ只能通过rabbitmqctl控制台工具或者HTTP API来创建。 1.1. 逻辑和物理分隔 如果没有虚拟主机,当RabbitMQ中的数据越来越庞大,队列越来越多,随之而来的是令人头痛的管理问题,比如队列、交换器命名冲突,它们相互影响等等。虚拟主机能够解决这些问题,而不需要我们部署多个RabbitMQ来负责不同的业务。 虚拟主机提供了资源的逻辑分组和分隔,每一个虚拟主机本质上是mini版的RabbitMQ服务器,他们有用自己的连接、队列、绑定、交换器,更重要的是有用自己的权限机制,这有点类似服务器和运行在服务器上的虚拟机一样。 1.2. 客户端连接 通队列、交换器一样,虚拟主机必须定义名称。当AMQP 0-9-1的客户端连接到RabbitMQ时,需要指定虚拟主机名称,同时还需要提供用户名和密码,只有用户具有相关的权限才能建立连接(关于权限控制的详细信息可以看 这里)。 通常,一个虚拟主机的连接只能操作属于该虚拟主机的交换器、队列和绑定等等内容。但是,当客户端同时连接了多个虚拟主机时,可能产生多个虚拟主机内的交换器、队列相互连接的情况,这种情景典型的例子是虚拟主机处于不同的RabbitMQ集群或者同一个集群中, RabbitMQ Shovel plugin就是一个这种场景应用的实例。 1.3. 虚拟主机和STOMP、MQTT 同AMQP 0-9-1一样,STOMP协议也有虚拟主机的概念,具体参见 这里。 相反,MQTT并不支持虚拟主机,MQTT连接默认使用单一的RabbitMQ主机,有MQTT特定的约定和特性,使客户机能够连接到特定的虚拟主机,而无需修改任何客户端lib库。有关详细信息,请参阅 MQTT指南。 2. 虚拟主机管理 RabbitMQ包含一个默认的虚拟主机:“/”,我们默认操作的都是这个虚拟主机,其用户名和密码默认都是guest,为了安全起见我们应该修改其密码(后续文章将详细介绍权限管理)。 2.1. 查询 命令: rabbitmqctl list_vhosts 该命令会查询当前RabbitMQ服务中所有的虚拟机,由于我没有创建任何虚拟机,所以只能看到默认的: rabbitmqctl list_vhosts 2.2. 创建 命令: rabbitmqctl list_vhosts 创建时必须指定名称: rabbitmqctl list_vhosts 创建完成后,查询: rabbitmqctl list_vhosts 可以看到刚创建的虚拟主机。 2.3. 删除 命令: rabbitmqctl list_vhosts 同样必须指定要删除的vhost的名称: rabbitmqctl list_vhosts 查询: rabbitmqctl list_vhosts ...

2018-04-04 · 1 min · 140 words · Hank

RabbitMQ基础(六)——实现RPC

第二篇中我们学习了如何在多个worker中使用工作队列来分配耗时的任务。现在,假设我们需要运行一个远程计算机上的一个方法并等待其返回结果,那么我们怎么实现?通常,这个过程被称为RPC(远程过程调用Remote Procedure Call)。 百度百科对RPC的介绍: RPC(Remote Procedure Call)--远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。 RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。 有多种 RPC模式和执行。最初由 Sun 公司提出。IETF ONC 宪章重新修订了 Sun 版本,使得 ONC RPC 协议成为 IETF 标准协议。现在使用最普遍的模式和执行是开放式软件基础的分布式计算环境(DCE)。 这里我们不过多讨论RPC相关的内容,有兴趣的可以查阅相关资料。 现在,我们将要使用RabbitMQ来构建一个RPC系统,包括可扩展的RPC客户端和服务端。由于我们并没有耗时的任务,所以们将模拟一些RPC服务。 1. Message properties Message properties即消息属性,AMQP-0-9-1预定义了14项消息属性,但是大部分都很少使用,常用的属性有如下几项: deliveryMode:标记消息是否持久化,值为2则持久化,其他为瞬态消息; contentType:描述编码的mime类型(mime-type),例如常用的JSON编码格式:application/json; replyTo:命名回调队列 correlationId:用于关联RPC的请求和响应 接下来,我们看看如何通过消息属性来设置回调队列和关联ID(correlationId)。 2. 回调队列 通常,在RabbitMQ上实现RPC很简单,客户端发送消息,而服务端响应消息即可。为了接收响应信息,客户端需要在请求中发送回调队列地址给服务端,告诉服务端我使用这个队列来接收消息,你将返回消息发送到这个队列即可。我们可以使用默认的队列(在java客户端中是独占的)。 callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); 3. Correlation Id 上边的方法中,我们为每一个RPC客户端创建了回调队列,这是非常低效的。有没有更好的方式:为每一个客户端创建一个共享的回调队列?这样势必又带来新的问题:共享一个回调队列,请求和响应之间的对应关系并不明确。 CorrelationId属性解决了这个问题。 大致思路是这样:为每个请求设置一个 唯一的标识,然后通过回调队列接收响应消息时,获取这个唯一标识,如果这个标识和之前设定的相同,那么说明响应的确是当前请求的响应,可以获取请求的响应结果了。如果获取到的唯一标示并非我们所设定,说明它并不属于我们的任何请求,那么就可以丢弃这个消息。 为什么我们应该忽略回调队列中的未知消息,而不是进行失败处理?这是由于服务器端可能出现竞态条件。可能发生这样的情况,RPC服务器发送了响应消息后挂掉了,但是请求确认消息(ack)还没有发送,那么,重启后的RPC服务器将再次处理请求。这就是为什么在客户端我们必须优雅地处理重复的响应,保持RPC的幂等性。 消息定义correlationId代码: // 设置消息属性,响应后发送到回调队列中 final String correlationId = UUID.randomUUID().toString(); // 随机生成唯一标识 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .correlationId(correlationId) // 每次请求都设定唯一标识,该标识用于将请求和响应进行匹配 .replyTo(callbackQueueName) // 回调队列 .build(); ...

2018-03-28 · 2 min · 355 words · Hank

RabbitMQ基础(五)——topic交换器

在上一篇,我们将日志系统做了改造,按照日志级别进行消息路由。我们还认识了direct类型的交换器,它是直接按照bindingKey与routingKey进行精确匹配,这两者分别在队列绑定和消息发送时进行设置。 Direct交换器每次仅能匹配一个精确的条件(bindingKey),如果要实现按照多个条件进行路由,或者按照条件进行模糊匹配,那么它就无能为力了。例如:前一篇你的日志程序,我们既要按照日志级别进行采集,还要根据打印日志的类来进行过滤,使用direct或者fanout都难以实现。 Topic交换器就是专门来处理这种场景的。在本篇,我们不再使用日志的例子,而是以动物为例,来了解topic交换器。 1. Topic交换器 发送到topic交换器的消息所设定的routingKey必须是一系列的单词列表,他们使用"."分隔。通常,这些单词会根据消息内容进行特殊定义,最大长度为255字节,举例:“stock.usd.nyse”、"syse.vmw"、"quick.range.rabbit"。 BindingKey也必须拥有相同的格式和遵循相同的规则。topic交换器和direct交换器处理逻辑上类似:带有特定routingKey的消息将被分发到绑定了匹配bindingKey的所有队列。但是,bindingKey还有两种特殊的通配符: *:能够模糊匹配一个单词 #:能够模糊匹配零个或多个单词 简单而言,topic交换器能够将消息的routingKey和队列绑定的bindingKey进行模糊匹配。如果不使用上边的两种通配符,那么topic交换器跟direct交换器没什么区别。 通配符举例: *.test.*:仅能匹配中间为test的三个单词的routingKey,例如mq.test.topic。 lazy.#:能够匹配以lazy开头的所有routingKey,单词个数不限,例如:lazy能匹配,lazy.test也能匹配 2. 示例 现在,我们来编写一个能够按照动物信息进行消息分发的程序。我们从速度、颜色和种类三个维度来描述动物信息,这里我们的key也是由这三个词语的具体描述组成,格式为“速度.颜色.种类”整体结构大致如下: 首选创建了一个类型为topic的交换器;然后我们定义了三个key,用于绑定到Q1和Q2两个队列,Q1绑定的key为*.orange.,Q2绑定的key为..rabbit和lazy.;消费者C1希望从Q1获取消息,而C2则希望从Q2获取消息。 整个程序的含义如下: 1、C1对颜色为orange(橙色)的动物感兴趣,希望获取它们的信息; 2、C2除了希望接收物种为rabbit(兔子)的消息外,还希望订阅速度为lazy(缓慢)的所有动物的信息。 很明显,C1、C2获取到的消息肯定存在重复的,它们接收消息的维度不同。 2.1. 生产者 创建交换器 // 创建交换器 channel.exchangeDeclare(exchangeName, "topic"); 模拟数据 String[] msgs = { "quick.orange.rabbit", "lazy.orange.elephant", "lazy.brown.fox", // 能匹配 "lazy.black.male.cat", // 四个单词也可以匹配 "orange", "quick.orange.male.rabbit" // 不能匹配,消息被丢弃 }; 发送消息 for (String msg : msgs) { System.out.println("发送:" + msg); channel.basicPublish(exchangeName, msg, null, msg.getBytes("utf-8")); } 为了简便,我将消息内容直接作为routingKey。实际上,routingKey需要根据消息内容进行特殊定制。 2.2. 消费者 创建随机队列 ...

2018-03-23 · 1 min · 130 words · Hank

RabbitMQ基础(四)——direct交换器与路由

前一篇我们构建了一个简单的日志记录系统,能够广播日志消息到所有已绑定的接收者。但是这样有一定的局限性,我们能够按照一定的条件来进行日志分发呢?例如,按照日志级别来分发日志消息,某个消费者只收到error级别的日志?在本章,我们将实现这个功能。 1. Bindings 上一篇,绑定队列的代码如下: channel.queueBind(queueName, EXCHANGE_NAME, ""); 第三个参数为routingKey,为了便于区分消息发送时的routingKey,我们将队列绑定的routingKey称为bindingKey。 BindingKey允许在队列绑定时设置额外的条件,路由器会按照这个key与消息的routingKey进行条件匹配,成功匹配的消息才会发送到该绑定队列。这个路由器按照bindingKey进行过滤的过程,我们称为消息路由(Routing)。 不同类型的交换器,bindingKey的作用和规则都有所不同。在fanout类型中,bindingKey会被忽略,因为这个交换器类型本身就是为了用于广播消息。在接下来我们要介绍的direct类型的交换器中,设定bindingKey的值有着重要意义。 2. Direct交换器 现在我们对上一篇的日志系统进行扩展,使其可以根据日志的级别进行过滤,假设我们的日志级别有debug、info、warning和error,我们想把error级别的消息单独使用一个消费者来接收,其他的由另外的消费者接收。 上篇中,我们使用的fanout交换器并不能满足上述需求----因为它只会把消息进行简单地广播。 我们将要用到的是 direct类型的交换器,它会按照绑定时给定的bindingKey与消息发布时的routingKey进行 精确匹配。即:当bindingKey与routingKey完成相同时,消息才会被交换器分发给队列。 在开始改造我们的日志程序前,我们先看看多重绑定。 3. 多重绑定 多重绑定,即将相同的bindingKey绑定到多个队列上。RabbitMQ允许这么做,这样与fanout交换器作用类似,可以将消息发送到多个队列。例如: 如图所示,Q1和Q2队列都绑定了black key,那么所有与black匹配的消息都会分发到Q1和Q2中,看起来与消息广播类似。 4. 程序改造 现在,我们开始来改造我们的日志程序,来实现上述的需求。 4.1. 生产者 创建direct类型的交换器: // 创建direct交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 准备日志数据: static Log[] logs = { new Log("error", "this is an error log."), new Log("error", "this is an error log."), new Log("error", "this is an error log."), new Log("error", "this is an error log."), new Log("warning", "this is a warning log."), new Log("info", "this is an info log."), new Log("info", "this is an info log."), new Log("info", "this is an info log."), new Log("debug", "this is a debug log."), new Log("debug", "this is a debug log.") }; ...

2018-03-23 · 2 min · 225 words · Hank

RabbitMQ基础(三)——fanout交换器和发布/订阅

前一篇,我们创建了工作队列,并将任务发布到队列,每一项任务都会发送给一个worker。接下来,我们将使用发布/订阅模式,将消息分发给多个消费者。 为了说明这一模式,我们将构建一个简单的日志记录系统。它将由两个程序组成----第一个将发出日志消息(生产者),第二个将接收并打印它们(消费者)。运行多个消费者,它们都可以接收消息。 1. 交换器 前边的部分我们都是通过消息队列来发布和接收消息,现在让我们看下RabbitMQ的全消息模型。让我们快速回顾一下 前边的内容: producer:用来发送消息 queue:用来缓存消息 consumer:用来接收消息 交换器,即Exchange,交换器是消息到达的第一站,所有的消息都先发送给交换器,交换器再按照不同的规则进行消息分发。RabbitMQ中的消息传递模型的核心思想是,生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道消息是否会被传递到任何队列。相反,生产者只能发送消息给交换器。 交换器做的事情非常简单:一方面,它接收来自生产者的消息,另一边则将消息推送到队列中。交换器必须知道如何处理它接收到的消息。是否应该分发到特定的队列?是否应该分发到多个队列?或者应该被抛弃?其实,这些规则是由交换类型(exchange type)定义的。 交换类型:根据交换器的功能、用途和适用场景,将交换器进行类型定义,每种类型有各自的功能和适用场景。常见的交换类型有:direct、topic、headers、fanout。 接下来,我们学习下fanout类型的交换器。 fanout:将接收到的消息分发到所有能匹配的队列(广播)。简单而言,即:所有订阅了这些消息的队列,都能够收到消息。 创建名为logs的交换器,其类型为fanout: channel.exchangeDeclare("logs", "fanout"); 发送消息: String message = "中文日志信息"; channel.basicPublish("logs", "", null, message.getBytes("utf-8")); 注意,上边的代码队列名称为空。 2. 临时队列 对于队列,很重要的一点是,我们需要为其命名,因为生产者和消费者必须通过队列名称来定位到具体的队列,从中发送和获取消息(前几篇的"hello"队列和"task_queue"队列名称)。上边发送消息的代码中,我们并没有给队列命名,而是使用了""。 对于我们的日志记录系统,如果要每个消费者都能获取所有的日志消息,那么我们必须完成两点: 连接到RabbitMQ时,我们需要创建一个全新的队列,里边没有任何消息; 当消费者与RabbitMQ断开连接,那么队列应该被自动删除。 对于第1点,我们可以利用随机值来命名队列;对于第2点,我们只能检测断开连接后删除队列。但是,这都不是很好的做法。 其实,我们要做的是创建一个非持久的、独占的、自动删除的队列,这个队列的名称随机。RabbitMQ已经为我们提供了这个功能: String queueName = channel.queueDeclare().getQueue(); 如上边代码所示,我们创建了一个默认的队列,这队列具有上述特性,并且名称是随机生成的,格式为amq.gen-xxxx,例如amq.gen-JzTY20BRgKO-HjmUJj0wLg,我们称之为临时队列。 关键队列的非持久、独占和自动删除: 非持久(non-durable):队列中的消息不会持久化 独占(exclusive):队列为私有队列,只有当前应用程序能够消费队列 自动删除(autodelete ):最后一个消费者取消订阅队列时,队列自动删除 3. 绑定 本文开头,我们创建一个名称为"logs"、类型为"fanout"的交换器。那么,交换器需要分发消息给队列,如果将队列与交换器进行关联呢?我们把交换器和队列之间的关联关系称为绑定(binding)。 绑定的代码如下: channel.queueBind(queueName, "logs", ""); queueName即为前边创建的临时队列,第三个参数为routingKey,这里为空。 4. 完整代码 生产者LogSender: public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 创建exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分发消息 String message = "中文日志信息"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8")); channel.close(); connection.close(); } ...

2018-03-22 · 2 min · 216 words · Hank

RabbitMQ基础(二)——工作队列Work Queues

上一篇,我们编写了从一个指定的队列发送和接收消息的程序。在本文中,我们将创建一个工作队列,用于将耗时的任务分配给多个工作人员。 Work Queues,称为工作队列(也称Task Queues,任务队列),其主旨在于避免立即执行资源密集型任务,并且必须等待它完成。相反,我们把任务安排在后来执行。对此,我们将任务封装为消息并将其发送到队列中。在后台运行的工作进程会获取任务并最终执行任务。当你运行许多工作进程时,任务将在他们之间共享。 1. 轮询调度 使用任务队列的优点之一是能够轻松地并行工作。如果我们积累了大量的工作,我们就可以增加更多的任务消费者,这样就可以很容易地扩大规模。默认情况下,RabbitMQ将依次将每个消息发送给下一个使用者。平均每个消费者将得到相同数量的消息,这种分发消息的方式称为轮询。 2. 消息确认 完成一项任务可能需要几秒钟。你可能会想,如果其中一个消费者开始了一项很长的任务,并且只完成了部分任务,会发生什么。使用我们当前的代码,一旦RabbitMQ向客户传递消息,它立即标记为删除。在这种情况下,如果你关闭一个工作进程,我们将失去它正在处理的信息。我们也将丢失所有发送给这个特定工作进程并且还未处理消息。 通常,我们希望一个工作进程挂掉后,将其任务交给其他工作进程完成。为了保证消息不丢失,RabbitMQ支持消息确认机制:当消费者处理完消息后,其反馈给RabbitMQ,表明消息已经被接收和处理,RabbitMQ可以自由删除该消息。 如果一个消费者挂掉(其通道关闭,连接关闭,或者TCP连接丢失),而没有发送ack,RabbitMQ将会知道一条消息没有被完全处理,并且将重新排队。如果同时有其他的消费者在线,那么它将很快把消息重新交给另一个消费者处理,这样就保证不会丢失任何信息。 消息不会超时:当消费者进程挂掉时,RabbitMQ将重新传递消息,即使处理消息需要很长时间。 RabbitMQ默认是开启手动消息确认的,我们可以通过autoAck=true标记明确地关闭,即采用系统自动确认消息。如果需要手动确认消息,那么将autoAck设置为false,一旦我们完成了任务,需要向工作进程发出确认消息,代码如下: // 处理完成,手动接收消息时,需要在处理成功后进行反馈,保证消息不丢失 channel.basicAck(envelope.getDeliveryTag(), false); 示例的部分关键代码如下: NewTask.java /** * 发布5条消息,每条消息的一个“.”表示消息执行时间需要1秒。 * * @param args * @throws IOException * @throws TimeoutException */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); String[] msgs = new String[]{ "First Message.", // 1s "Second Message..", // 2s "Third Message...", // 3s "Fourth Message....", // 4s "Fifth Message....." // 5s }; Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); for (String msg : msgs) { // 发布消息 channel.basicPublish("", TASK_QUEUE_NAME, null, msg.getBytes("utf-8")); System.out.println("[x] Sent '" + msg + "'"); } channel.close(); connection.close(); } ...

2018-01-24 · 3 min · 431 words · Hank

RabbitMQ基础(一)——基本概念和HelloWorld

1. 基本概念 RabbitMQ是一个消息代理,是一个erlang开发的AMQP(Advanced Message Queue )的开源实现。 RabbitMQ是轻量级的,易于部署在premises和云中。它支持多种消息传递协议。RabbitMQ可以部署在分布式和联合配置中,以满足高级别、高可用性需求。 其主要思想非常简单:它接受并转发消息。你可以把它想象成邮局:当你把邮件寄到邮箱时,你很确定邮差先生最终会把邮件寄给你的收件人。使用这个比喻,RabbitMQ是一个邮筒,一个邮局和一个邮差。 RabbitMQ与邮局的主要区别在于,它不处理纸张,而是接受、存储和转发二进制数据。 官网地址: http://www.rabbitmq.com 2. AMQP AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。 目标是实现一种在全行业广泛使用的标准消息中间件技术,以便降低企业和系统集成的开销,并且向大众提供工业级的集成服务。通过AMQP,让消息中间件的能力最终被网络本身所具有,并且通过消息中间件的广泛使用发展出一系列有用的应用程序。 Broker: 中间件。接收和分发消息的应用,RabbitMQ Server就是Message Broker。 Virtual host: 虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。 Connection: 连接。publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。 Channel: 渠道。如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。 Exchange: 路由。message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。 Queue: 队列。消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。 Binding: 绑定。exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。 3. RabbitMQ术语 3.1. Producter 即生产者。Producing就是发送,发送消息的程序是生产者(Producter)。用P表示,如下图: 3.2. Exchange 交换器,RabbitMQ中,其实消息不会直接相队列发送,而是发送给交换器,然后交换器在按照一定的规则转发给不同的队列。交换器做的事情非常简单:一方面,它接收来自生产者的消息,另一边则将消息推送到队列中。交换必须知道如何处理它接收到的消息。是否应该附加到特定的队列?是否应该附加到许多队列?或者应该被抛弃。这些规则由交换类型(exchange type)定义。 3.3. Exchange Type 交换器类型,在创建交换器时指定,用于区分交换器的不同作用,实现不同的功能。RabbitMQ定义了四种交换器类型:direct、topic、headers、fanout,每种类型都有特定的应用场景(见后续文章的详细介绍)。 direct:bindingKey和routingKey进行精确匹配,适用于精确将消息发送给指定队列; topic:bindingKey和routingKey可以进行模糊匹配,通过使用通配符"*"和"#"分别来模糊匹配一个单词和多个单词;适用于将消息按照一定的规则发送到匹配的一个或多个队列; fanout:广播,这种交换器可以将消息广播给所有订阅的交换器; header:不常用,有兴趣的话可以自行了解。 3.4. Queue 即队列,队列是邮箱的名称,它处于RabbitMQ内部。尽管消息流通过RabbitMQ和您的应用程序,但它们只能存储在队列中。队列不受任何限制,它可以根据你的需要存储尽可能多的消息----它本质上是一个无限的缓冲区。许多生产者都可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。队列上有它的名称,如下图表示: 3.5. Consumer 即消费者,Consuming跟receiving的含义类似。Consumer通常为等待接收消息的应用程序 。注意,生产者、消费者和消息代理不需要处于同一台主机上,事实上,在大多数应用场景都是如此。 ...

2018-01-18 · 2 min · 354 words · Hank