上一篇 "Spring AMQP简介和使用",我们介绍了Spring AMQP的一些基本要素和概念,也通过一些示例代码介绍了消息的发送和接收,但是都是使用的原始编码方式来实现,并不依赖Spring环境。其实,Spring AMQP也支持使用注解的方式来进行异步接收消息,极大的简化了编码。
1. hello world
要使用注解,首先需要在Spring应用环境中,我们看一个最简单的demo:
1、重新新建一个Spring boot工程,添加如下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>
2、新建一个Spring配置类RabbitConfiguration,用来申明Bean:
@Configuration
public class RabbitConfiguration {
public static final String ANONYMOUS_QUEUE_NAME = "spring.amqp.anonymous.queue";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("192.168.0.27", 5672);
cachingConnectionFactory.setUsername("admin");
cachingConnectionFactory.setPassword("123456");
return cachingConnectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue anonymousQueue() {
// 匿名队列
return new AnonymousQueue(() -> ANONYMOUS_QUEUE_NAME);
}
}
配置类中,创建了ConnectionFactory
、AmqpAdmin
、AnonymousQueue
、RabbitTemplate
等Bean,Spring容器启动后就可以使用它们了。
3、创建测试类HelloWorldDemo
:
@Component
public class HelloWorldDemo {
@Resource
private RabbitTemplate rabbitTemplate;
public void send(String msg) {
rabbitTemplate.convertAndSend(RabbitConfiguration.ANONYMOUS_QUEUE_NAME, msg);
System.err.println("Send : " + msg);
}
@RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME)
public void receive(String msg) {
System.err.println("Received : " + msg);
}
}
重点来了,首先该类被标注了@Component
注解,表示其受Spring
容器管理;其次,通过@Resource
注解注入了rabbitTemplate
Bean,并使用它来发送String
类型的消息;第三,在消息接收的方法receive
上标注了@RabbitListener
注解,稍后再来看这个注解。
4、创建Spring boot工程启动类SpringAmqpApplication
:
@SpringBootApplication
@EnableRabbit
public class SpringAmqpApplication {
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(SpringAmqpApplication.class, args);
HelloWorldDemo helloWorldDemo = context.getBean(HelloWorldDemo.class);
helloWorldDemo.send("hello world!");
helloWorldDemo.send("hi, belonk!");
helloWorldDemo.send("张三");
}
}
启动应用,可以看到成功接收了消息。
要启动注解的消息监听,需要在配置类上加上@EnableRabbit
注解。
2. RabbitListener
通过一个例子来了解一下@RabbitListener
的作用:
@Component
public class MyService {
@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
...
}
}
上边的实例使用了 @RabbitListener
注解来监听名为“myQueue”的队列,只要该队列有消息可用,则会交给 processOrder
方法处理,但是要确保该队列存在并绑定到了exchange上。使用该注解标记的方法或类,都会使用 RabbitListenerContainerFactory
为其创建一个容器,上篇提过,异步消息监听的容器 MessageListenerContainer
是由改类创建的。可见,Spring AMQP的注解消息监听是采用异步的方式, @RabbitListener
注解是由 RabbitListenerAnnotationBeanPostProcessor
处理的。
被@RabbitListener
注解标记的方法可以支持集中类型的参数:
com.rabbitmq.client.Channel:访问Channel
org.springframework.amqp.core.Message:接收的消息对象
实体对象:消息中对应的负载的实体对象,自动推导负载实体
org.springframework.messaging.Message:Spring-messaging中的消息对象
@Payload:标记在消息负载实体上,明确指定消息的负载对象
@Header:获取特定一个消息头内容
@Headers:标注在一个Map上,用来获取所有消息头
MessageHeaders:spring-messaging的消息头
还有几个就不在一一列举了,我们看一个获取Channel
和header的例子:
public void send(String msg) { (1)
rabbitTemplate.convertAndSend(RabbitConfiguration.ANONYMOUS_QUEUE_NAME_1, (Object) msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().getHeaders().put("custom_header", "this is a custom header.");
return message;
}
});
System.err.println("Send : " + msg);
}
@RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_1)
public void receive(String msg, Channel channel, @Header("custom_header") String header) { (2)
System.err.println("Received : " + msg);
System.err.println("Header : " + header);
System.err.println("Channel : " + channel.getChannelNumber());
}
1 | send 方法设置了一个名为custom_header 的自定义消息头 |
2 | receive 方法通过@Header 来获取,并添加Channel 对象。 |
再看一个@Payload标注的例子:
@RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_2)
public void receive(@Payload User user, Channel channel, @Header("custom_header") String header) { (1)
System.err.println("Received : " + user);
System.err.println("Header : " + header);
System.err.println("Channel : " + channel.getChannelNumber());
}
1 | 这里的User 参数可以根据消息自动推导,可以不加上@Payload 注解。 |
注意 User 必须实现Serializable 接口,上一篇已经提到过,默认的消息转换器是使用的SimpleMessageConverter ,它只能处理java序列化对象、String 、byte[] 。 |
定义绑定和Exchange
@RabbitListener
有多个属性,可以用来指定监听的队列、绑定关系,例如:
@Component
public class MyService {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)
public void processOrder(String data) {
...
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(String data) {
...
}
}
第一个方法,指定了要监听的队列、Exchange以及routing key,queue和exchange会按需自动申明并绑定;第二个方法,将会申明非持久化、独占的、自动删除的匿名队列并绑定到exchange。
2.1. 监听多个队列
使用queues
属性时,可以指定关联容器可以侦听多个队列。也可以使用@Header
注解来获取接受消息的队列名称:
@Component
public class MyService {
@RabbitListener(queues = { "queue1", "queue2" } )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}
}
也支持spEL(从1.5开始):
@Component
public class MyService {
@RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}
}
3. 消息转换
在调用Listener之前,有两个消息转换步骤:
首先,使用 MessageConverter
将传入的 Spring AMQP 的 Message
转换为 Spring messaing 的 Message
,此时,MessageConverter
默认使用的是SimpleMessageConverter
实现,它仅仅处理 byte[]
数组与 String
或 java.io.Serializable
之间的相互转换
其次,在调用目标方法时将消息转换为方法参数的类型。此时,MessageConverter
默认使用的是 GenericMessageConverter
实现,它将转换委托给一个转换服务(DefaultFormattingConversionService
的实例)。
设置消息转换器:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 使用jacson消息转换器
factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
return factory;
}
在1.6版本之前,消息与类型的转换需要指定消息头(type
)或者ClassMapper
,从1.6开始,如果@RabbitListener用在方法上,那么可以根据方法参数类型进行自动推断。
以下代码示例可以定义自己的消息转换器:
@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {
...
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new GenericMessageConverter(myConversionService()));
return factory;
}
@Bean
public ConversionService myConversionService() {
DefaultConversionService conv = new DefaultConversionService();
conv.addConverter(mySpecialConverter());
return conv;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
...
}
4. 多方法监听
Spring AMQP 支持同一个监听器调用多个方法,此时@RabbiListener
注解标注在类上,多个被调用的方法上标注@RabbitHandler
注解。
例如:
@RabbitListener(id="multi", queues = "someQueue")
public class MultiListenerBean {
@RabbitHandler
@SendTo("my.reply.queue")
public String bar(Bar bar) {
...
}
@RabbitHandler
public String baz(Baz baz) {
...
}
@RabbitHandler
public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) {
...
}
}
上边的三个方法都被标注@RabbitHandler
注解,表示分别监听消息被转换的负载实体是Bar
、Baz
或Qux
。需要注意的是,Spring AMQP 必须能够区分根据负载实体来区分不同的方法,即是说,每一个被@RabbitHandler
标注的方法必须具有不同的负载类型,要么被@Payload
标注出明确的负载实体类型,要么根据参数类型自动推断。
5. 消息回复
如果 ``@RabbitListener``监听的方法返回不为空的值,会根据发送者的消息头的 ``ReplyToAddress``的地址进行消息返回,这是由 ``MessageListenerAdapter``来处理的,如果没有设置,则可以添加 ``@SendTo``注解来定义消息返回的地址。使用 ``@SendTo``注解来表示返回结果需要转换为 ``Message``并发送到指定的回复地址(exchange和routing key)上:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
……
return status;
}
直接返回Message:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
……
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
@SendTo
注解的value值用来表示exchange和routing key,格式为:exchange/routingKey
,例如:
foo/bar - 回复的exchange为foo,routing key为bar
foo/ - 回复的exchange为foo,routing key为默认(空的)
bar or /bar -回复的outingKey为bar,exchange为默认.
/ or empty - 回复的exchange和routing key都为默认.
@SendTo
也支持spEL:
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}
引用的方法必须返回String。
现在,我们来编写一个消息确认和回复的demo,结合 上一篇的示例来看看基于注解如何工作:
1、修改RabbitConfiguration
:
设置消息确认和返回:
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("192.168.0.27", 5672);
cachingConnectionFactory.setUsername("admin");
cachingConnectionFactory.setPassword("123456");
// 消息确认
cachingConnectionFactory.setPublisherConfirms(true);
// 消息返回
cachingConnectionFactory.setPublisherReturns(true);
return cachingConnectionFactory;
}
创建设置了回调的RabbitTemplate
:
@Bean
public RabbitTemplate callbackRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
// 消息返回
rabbitTemplate.setMandatory(true);
// 设置消息返回回调,一个RabbitTemplate只能设置一次返回回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
Printer.p("Message returned : " + replyCode + ", " + replyText);
}
});
// 消息确认回调,一个RabbitTemplate只能设置一次确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
Printer.p("Message confirmed : " + ack + ", " + cause + ", " + correlationData);
}
});
return rabbitTemplate;
}
再申明一个匿名的队列,代码就不贴了。
2、新建一个ConfirmAndReturnDemo
:
@Component
public class ConfirmAndReturnDemo {
@Resource
private RabbitTemplate callbackRabbitTemplate;
private Sender sender = new Sender();
public void send(User user) {
sender.send(user);
Printer.p("Send : " + user);
}
@Component
public class Sender {
public void send(User msg) {
callbackRabbitTemplate.convertAndSend(RabbitConfiguration.ANONYMOUS_QUEUE_NAME_4, (Object) msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().getHeaders().put("custom_header", "this is a custom header.");
return message;
}
});
Printer.p(this, "Send : " + msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_4, durable = "false", autoDelete = "true", exclusive = "true"),
exchange = @Exchange("exist.exchange"), key = "exist.routingKey"
)
})
public void receiveReply(User user) {
Printer.p(this, "Received reply : " + user);
}
}
@Component
class Consumer {
@RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_4)
// 回复到默认的队列
// @SendTo
// 回复到不存在的exchange和routingkey
// @SendTo("dontExist.exchange/dontExist.routingKey")
// 回复到存在的exchange和routingkey
@SendTo("exist.exchange/exist.routingKey")
// 回复到默认的exchange和不存在的routingkey
// @SendTo("dontExist.routingKey")
public User receive(@Payload User user) {
Printer.p(this, "Received : " + user);
user.setName("王五改名字了");
return user;
}
}
}
这里用了两个内部类来创建生产者和消费者,生产者除了发送消息,还将监听回复的消息,只是exchange
和routing key
不同;消费者接收消息,并修改了User
的name
属性,然后返回,接收方法上标注了@SendTo
,指定exchange和routing key,这里可以设置多种情况来验证回调方法的执行情况。
3、启动类编写demo执行代码:
ConfirmAndReturnDemo confirmAndReturnDemo = context.getBean(ConfirmAndReturnDemo.class);
confirmAndReturnDemo.send(new User("王五"));
4、运行程序,可以看到控制台输出如下:
[Sender] Send : User(name=王五)
Send : User(name=王五)
Message confirmed : true, null, null
[Consumer] Received : User(name=王五)
[Sender] Received reply : User(name=王五改名字了)
Sender已经成功接收了回复消息。
6. 定义元注解
@RabbitListener可以用在注解上,来自定义元注解信息:
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
public @interface MyAnonFanoutListener {
}
可以看到,只需要在自定义注解上适用Spring AMQP 的注解即可,使用时:
public class MetaListener {
@MyAnonFanoutListener
public void handle1(String foo) {
...
}
@MyAnonFanoutListener
public void handle2(String foo) {
...
}
}
7. 总结
本文的示例代码见 github,总结一下:
1、@RabbitListener
是消息异步监听的基本注解,可以定义监听的队列、队列绑定关系
2、@RabbitHandler
用在需要监听多个方法时,不同的方法接收不同的消息实体,必须能够明确区分不同的实体,否则消息不能监听成功
3、@SendTo
用于设定消息回复,标注的方法需要返回非空的回复实体对象
4、@EanableRabbit
用来启用注解消息监听
5、@Header
、@Headers
、@Payload
等用在方法签名上,用来获取消息头信息或者明确表明消息的负载实体(也可以自动推导)。