上一篇,我们在介绍了Spring中如何使用websocket的子协议stomp,并使用简单的基于内存的stomp消息代理来编写了一个web聊天室实例。基于内存的stomp消息代理,虽然能够满足基本需求,但还是存在一些不足,比如由于stomp代理在应用内部,多个外部websocket应用需要消息互通,那么就难以满足了。在本篇,我们来学习如何使用RabbitMQ作为stomp代理。
1. 为何要使用外部消息代理
简单消息代理,能够满足单websocket应用的需要,但是如果有多个websocket应用,他们之间需要进行消息共享,那么就需要做大量的工作才能实现了。其实,MQ一个最重要的作用就在于能个在各个系统间解耦。引入外部MQ作为stomp消息代理,很好的解决了多系统消息共享的问题,只要其支持stomp协议。RabbitMQ本身提供了对STOMP的支持,加上后结构变化如下:
前边的是单应用时的结构,后边为怎么了RabbitMQ过后,多个应用程序结构。
2. RabbitMQ对STOMP的支持
RabbitMQ对stomp协议的支持是通过插件的方式,默认stomp插件是关闭的,我们需要先启用之。
2.1. 启用插件
进入rabbitmq所在服务器,然后控制台输入如下命令来启用stomp插件:
``rabbitmq-plugins enable rabbitmq_stomp``
然后可以查看插件是否启用成功:
``rabbitmq-plugins list``
2.2. 插件配置
默认情况下,STOMP将会监听61613端口,默认的用户名和密码都为guest。通过配置文件来配置:
ubuntu下rabbitmq的配置文件在/etc/rabbitmq/rabbitmq.conf
,找到stomp开头的选项,就可以进行配置了
比如配置STOMP监听端口:
``stomp.listeners.tcp.1 = 12345``
RabbitMQ中STOMP适配器连接时如果用户名和密码使用默认的guest/guest
,则可以忽略,如果需要修改,则配置如下:
stomp.default_user = guest
stomp.default_pass = guest
2.3. Destinations
STOMP规范并没有规定消息代理来支持什么样的目的地(destination),只是根据消息头的destination的值来判断消息发送的目的地,一般由消息代理自定义支持,RabbitMQ中定义了几种destination类型:
#exchange[
/exchange
]: 发送到任意的routing key和订阅任意的binding key#queue[
/queue
]: 发送和订阅队列,该队列由STOMP管理#amqqueue[
/amq/queue
]: 发送和订阅外部创建的队列#topic[
/topic
]: 发送和订阅到topic#temptopic[
/temp-queue/
]: 创建临时的队列(使用reply-to请求头)
现在,我们结合代码来看看Spring中对RabbitMQ的这几类destination是如何支持的。
3. Spring中使用RabbitMQ消息代理
我们通过一个demo来看看如何在Spring中使用RabbitMQ支持的这几个destination,整体界面如下;
下边的示例仅贴上部分关键代码,完整的代码可以参看文末的源码。
首先,我们创建一个名为03-websocket-stomp-rabbitmq的springboot工程,引入如下依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-net</artifactId>
<version>2.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.33.Final</version>
</dependency>
<dependencies>
这里需要引入reactor库,注意版本的对应,启用响应式编程的支持,否则会出现如下错误:
Caused by: java.lang.ClassNotFoundException: reactor.io.codec.Codec
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_171]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_171]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[na:1.8.0_171]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_171]
... 29 common frames omitted
Spring启用Stomp外部消息代理很简单,配置类跟上一篇大致相同,注册stomp端点:
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").withSockJS();
}
然后,把简单stomp代理换成外部stomp代理,只需要修改一下配置类:
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry
// .setPathMatcher(new AntPathMatcher("."))
.setApplicationDestinationPrefixes("/app");
registry.enableStompBrokerRelay("/exchange", "/topic", "/queue", "/amq/queue")
.setRelayHost("192.168.0.27")
.setRelayPort(61613)
// 配置发送消息到stomp代理的系统共享连接的账号密码,默认是guest/guest
.setSystemLogin("admin")
.setSystemPasscode("123456")
// 配置客户端连接到stomp代理的账号和密码,默认是guest/guest
.setClientLogin("admin")
.setClientPasscode("123456")
;
}
我们这里仅测试/exchange
、/topic
、/queue
、/amq/queue
几个destination。
首先,在配置中调用registry.enableStompBrokerRelay(……)
方法来配置支持的代理消息destination的前缀,这些前缀由外部消息代理自定义,不同的外部MQ都有自己对STOMP特殊的支持,比如RabbitMQ,它就支持开篇说的这几种destination前缀。
然后,我们分别配置了服务端和客户端的管理账号和密码,其实就是RabbitMQ可用来管理的账号和密码,例如创建队列、交换机等。
工程准备好,我们再看看rabbitmq的这几种stomp destination支持。
3.1. Exchange Destinations
这种是面向交换机的,其destination的格式为:/exchange/<name>[/<pattern>]
这种destination会为每一个订阅者创建一个新的队列并使用给定的<pattern>作为routing key来绑定到exchange,但是stomp不会自动创建exchange,需要在外部自己创建。
订阅者:
创建一个独占的、自动删除的队列,绑定到名称为<name>的交换机,如果<pattern>有值,则使用其作为key绑定到exchange
订阅创建的queue
发送者:使用
<routing-key>
作为路由key发送消息到名为<name>
的交换机
我们先回顾一下RabbitMQ中exchange的常用几种类型:
direct:消息发送者的routing key和绑定到exchange的queue的routing key必须精确匹配,意思就是必须一致
topic:可以支持通配符匹配,表示匹配所有,表示匹配一个单词,如lazy.、*.orange.*
fanout:对所有绑定的队列进行广播
<p class="x-box-title x-padding-top-10">如果对rabbitmq还不了解的,可以看这几篇:
ok,现在我们来模拟一个场景:有两个客户端,分别订阅了stomp的/exchange/<name>/animal.#和/exchange/<name>/animal.* 两个destination,客户端分别发送消息到服务端,然后服务端转发到具体的destination上,分别来测试客户端收到的消息情况。
我们来看看该场景的测试代码(省略基础代码,见文末源码)。
首先,在RabbitMQ中创建一个topic类型的exchange,其他类型的同理。
然后,在websocket服务端,我们定义如下两个消息处理方法:
@MessageMapping("/send2mifei")
@SendTo("/exchange/" + EXCHANGE_TOPIC_NAME + "/animal.rabbit.mifei")
public String exchange1(String content) {
return "destination : " + "/exchange/" + EXCHANGE_TOPIC_NAME + "/animal.rabbit.mifei" + ", content : " + content;
}
@MessageMapping("/send2peppa")
@SendTo("/exchange/" + EXCHANGE_TOPIC_NAME + "/animal.pig")
public String exchange2(String content) {
return "destination : " + "/exchange/" + EXCHANGE_TOPIC_NAME + "/animal.pig" + ", content : " + content;
}
exchange1
和exchange2
方法用来接收客户端发出来的消息,并转发到具体的destination上,这里为animal.rabbit.mifei
和animal.pig
,正常情况下,订阅了animal.#
的客户端都能收到消息,而订阅了animal.*
的客户端只能收到exchange2转发的消息。
客户端关键的代码如下:
订阅地址设置:
<label>订阅地址:</label>
<select id="subscribe-uri">
<option></option>
<option value="/exchange/ws.rabbit.exchange.topic/animal.#">
/exchange/ws.rabbit.exchange.topic/animal.#
</option>
<option value="/exchange/ws.rabbit.exchange.topic/animal.*">
/exchange/ws.rabbit.exchange.topic/animal.*
</option>
</select>
<button id="subscribe">订阅</button>
订阅和消息发送:
div.find('#subscribe').click(function () {
let su = div.find('#subscribe-uri').val();
if (su) {
showMessage('已经订阅:' + su);
// 订阅聊天内容
ws.subscribe(su, function (data) {
showMessage(data.body);
});
}
});
div.find('#send1').click(function () {
if (div.find('#content').val()) {
if (type === 1) {
ws.send('/app/send2mifei', {}, div.find('#content').val());
} else if (type === 4) {
ws.send('/app/topic/debug', {}, div.find('#content').val());
}
}
});
div.find('#send2').click(function () {
if (div.find('#content').val()) {
if (type === 1) {
ws.send('/app/send2peppa', {}, div.find('#content').val());
} else if (type === 4) {
ws.send('/app/topic/info', {}, div.find('#content').val());
}
}
});
测试结果跟我们预期相同。
一句话概括:/exchange类型的destination,外部必须先有exchange,stomp不会创建,在客户端订阅时,stomp都会为之创建一个临时的、自动删除的队列,并根据routing key绑定到exchange上。
3.2. Queue Destinations
面向队列,消息发送到默认的交换机(名称为"")。
格式:/queue/<name>
Queue destinations会发送消息给至少一个订阅者,如果没有订阅者,那么消息会一直排队等待订阅者消费。
订阅者:订阅时如果队列不存在则创建名为<name>的共享队列,并订阅,队列默认是持久化、非独占、非自动删除的,多个订阅者会轮流接收消息
发送者:发送消息时如果队列不存在则创建,消息会通过默认的exchange发送到共享队列
来看看测试代码。
服务端:
@MessageMapping("/queue")
@SendTo("/queue/" + QUEUE_NAME)
public String queue(String content) {
return "destination : " + "/queue/" + QUEUE_NAME + ", content : " + content;
}
客户端:
启动多个客户端,它们都订阅/queue/<name>,然后发送任意消息到服务端。可以看到,客户端之间轮流接收消息。
一句话概括:/queue destination必须发送消息到一个客户端,没有客户端订阅则消息排队,消息发送者、订阅者都可以创建持久化的、非自动删除的队列,只要它不存在就创建,存在则用之,多个客户端轮流接收消息。
3.3. AMQ Queue Destinations
这种destination跟/queue相似,唯一的不同是,stomp订阅者和发送者都不负责创建队列,没有队列则出错,主要用于发送和订阅已经存在的队列.
格式:/amq/queue/<name>
发送者:消息通过默认的exchange发送到队列,
订阅者:通过STOMP创建对该队列的订阅,多个订阅者会轮流接收消息
现在rabbitmq创建一个队列。
服务端:
@MessageMapping("/amq/queue")
@SendTo("/amq/queue/" + AMQ_QUEUE_NAME)
public String amqQueue(String content) {
return "destination : " + "/amq/queue/" + AMQ_QUEUE_NAME + ", content : " + content;
}
客户端订阅/amq/queue/<name>。
启动多个客户端,发送任意消息,消息同样是轮流发送给各个客户端的。如果队列不存在,后台出现错误信息:
2019-11-12 14:36:52.859 ERROR 40283 --- [eactor-tcp-io-3] o.s.m.s.s.StompBrokerRelayMessageHandler : Received ERROR {message=[not_found], content-type=[text/plain], version=[1.0,1.1,1.2], content-length=[68]} session=m0dxmmi0 text/plain payload=NOT_FOUND - no queue 'ws.rabbit.amq.queue.destination' in vhost '/'
一句话概括:/amq/queue用来处理队列已经存在的情况,必须发送消息到一个客户端,没有客户端订阅则消息排队,消息发送者、订阅者不会创建队列,队列不存在则报错,消息同样是轮流发给各个客户端
3.4. Topic Destinations
STOMP最常用的destination类型,发送者和订阅者可以通过routing key进行匹配。如果发出的消息没有订阅者,消息会被丢弃。
格式:/topic/<name>
,<name>为绑定的routing key
发送者:消息通过<name>的routing key发送到rabbitmq的
amq.topic
交换机订阅者:创建并订阅自动删除的、非独占的队列,该队列按照<name>的routing key绑定到amq.topic交换机上
默认的amq.topic交换机可以通过配置更改为自定义名称:
``stomp.default_topic_exchange = some.exchange``
来看测试场景:客户端订阅不同的destination,后台接收消息并转发到不同的地址上,测试消息接收情况。
服务端:
@MessageMapping("/topic/debug")
@SendTo("/topic/" + "*.debug.*")
public String topicTest1(String content) {
return "destination : " + "/topic *.debug.*, content : " + content;
}
@MessageMapping("/topic/info")
@SendTo("/topic/" + "*.info.*")
public String topicTest2(String content) {
return "destination : " + "/topic *.info.*, content : " + content;
}
客户端自主选择订阅/topic/.debug.
或/topic/.info.
启动多个客户端,然后发送信息到topicTest1和topicTest2,订阅了/topic/.debug.
能收到topicTest1转发d的消息,而另外一个能收到topicTest2发送的消息。
/topic自动为每个订阅者创建临时队列,如果消息没有订阅者接收则自动丢弃。
topic应用最广泛,可以根据订阅者和消息发送者的routing key进行匹配。另外,topic也支持持久化的订阅,客户端断开连接后消息不会丢失,不过需要通过stomp请求头增加头信息来实现,这里就不做介绍了。
3.5. Temp Queue Destinations
临时队列目标允许在SEND frame的reply-to
请求头来定义临时destination。临时队列受消息代理管理,他们通过各自的session来唯一区分,所以队列名称可以相同。
要是用临时队列,只需在SEND frame时设置reply-to请求头,规定该值必须以/temp-queue开头:
SEND
destination:/queue/reply-test
reply-to:/temp-queue/foo
Hello World!
上边的frame会创建一个临时队列(名称自动生成),然后session私有并自动订阅,不同的session会创建不同的队列。
临时队列使用较少,这里也不做深入研究了。
4. 总结
基于内存的stomp消息代理能够满足单应用需求,引入外部stomp消息代理解决了多应用之间的websocket消息传递需求。不同的消息中间件,都会按照stomp规范定义自身的destination支持。开发者需要明白两个点:订阅者订阅了什么destination,发送者发送到什么destination,这两个地址的匹配规则是什么。
源码地址: 见GITHUB