1. 前言
很早之前写过几篇关于RabbitMQ的一些基础文章,在本篇中,我们将来学习Spring AMQP。
Spring AMQP 是 Spring 对 AMQP( http://www.amqp.org) 协议的封装和扩展,它提供了模板来对消息的发送和接收进行高级抽象,还提供了基于消息驱动的POJO的支持(类似JMS,Java消息服务)。
在开始之前,首先需要了解一些 RabbitMQ 和 AMQP 的一些基础概念,并在你的机器上安装 RabbitMQ, 本文使用的spring-amqp的版本为1.7.6。
2. Spring AMQP抽象
Spring AMQP 是 Spring 对 AMQP 协议的封装和扩展,提供了消息发送和接收的模板。Spring AMQP项目将核心Spring概念应用于基于 AMQP 的消息传递解决方案的开发,以便更容易和简单的管理AMQO资源。
Spring AMQP由spring-amqp和spring-rabbit两个模块组成。spring-amqp模块位于org.springframework.amqp.core
包,它的目标是提供不依赖于任何特定AMQP代理实现或客户端库的通用抽象;而spring-rabbit是spring-amqp通用抽象的具体实现,目前仅提供了rabbitmq的实现。
Spring AMQP包括一些基本的抽象定义(上边说过,他们位于org.springframework.amqp.coreb
包中,而非AMQP协议本身定义):
2.1. Message
在0-9-1版本的 AMQP 规范中没有定义 Message
类或接口,当执行诸如 basicPublish()
操作时,内容作为字节数组参数传递,而其他属性作为单独的参数传递。Spring AMQP将 Message
类定义为更通用的AMQP域模型表示的一部分。Message该类的目的是将主体和属性封装在单个实例中,以便API可以更简单。以下示例显示了Message类定义:
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
MessageProperties
接口定义了几个常见属性,例如 messageId,timestamp,contentType 等等,还可以通过调用 setHeader(String key, Object value)
方法使用用户定义的 headers 扩展这些属性。
2.2. Exchange
Exchange
接口代表AMQP Exchange,它的消息由Message Producer发送,RabbitMQ 的虚拟主机(vhost)中,每个Exchange都具有唯一的名称以及一些其他属性:
public interface Exchange {
String getName();
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
Exchange
接口中的 exchangeType 由 ExchangeTypes
抽象类定义,基本类型有:Direct,Topic,Fanout,和Headers,每种类型都对应有接口实现,不同的类型在处理绑定到队列的方式方面的行为各不相同。每种交换器类型可以看文首的几篇关于 Exchange 的文章。
2.3. Queue
同 AMQP 的队列一样,Spring AMQP 的 Queue 类表示从其中一个消息用户接收消息的组件。
public class Queue {
private final String name;
private volatile boolean durable;
private volatile boolean exclusive;
private volatile boolean autoDelete;
private volatile Map<String, Object> arguments;
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
this(name, durable, exclusive, autoDelete, null);
}
// Getters and Setters omitted for brevity
}
构造函数需要指定 Queue 名称,默认创建的 Queue
可持久化(durable为true)、非独占(exclusive为false)、非自动删除(autoDelete为false)。
2.3.1. 匿名队列
如果要创建临时性的非持久化、独占的、自动删除的队列,可以使用 Queue
类的子类 AnonymousQueue
:
public class AnonymousQueue extends Queue {
public AnonymousQueue() {
this((Map<String, Object>) null);
}
public AnonymousQueue(Map<String, Object> arguments) {
super(UUID.randomUUID().toString(), false, true, true, arguments);
}
public AnonymousQueue(NamingStrategy namingStrategy) {
this(namingStrategy, null);
}
}
匿名队列是一个非持久化的、独占、自动删除的队列,默认的队列名称为UUID,可以使用AnonymousQueue.NamingStrategy
来指定其名称,例如:
@Bean
public Queue anon1() {
// 使用默认的Base64命名策略,它会增加spring.gen-前缀,例如spring.gen-MRBv9sqISkuCiPfOYfpo4g
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy());
}
@Bean
public Queue anon2() {
// 自定义前缀foo-
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("foo-"));
}
NamingStrategy
是一个接口,默认实现类为Base64UrlNamingStrategy
,它会用一个Base64的字符串来命名,Spring默认为其添加spring.gen-
前缀,可以通过构造函数自己指定,可以实现接口来自定义命名策略。
2.4. Binding
绑定类描述了 Exchange
和 Queue
间的绑定关系:
public class Binding extends AbstractDeclarable {
public enum DestinationType {
QUEUE, EXCHANGE;
}
private final String destination;
private final String exchange;
private final String routingKey;
private final Map<String, Object> arguments;
private final DestinationType destinationType;
public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
Map<String, Object> arguments) {
this.destination = destination;
this.destinationType = destinationType;
this.exchange = exchange;
this.routingKey = routingKey;
this.arguments = arguments;
}
}
例如,使用固定的路由键(routingKey)将队列绑定到 DirectExchange
:
``new Binding(someQueue, someDirectExchange, "foo.bar")``
如果要绑定到 TopicExchange
:
``new Binding(someQueue, someTopicExchange, "foo.*")``
绑定到 FanoutExchange
:
``new Binding(someQueue,someFanoutExchange)``
Spring 还提供了构建器来构建绑定:
``Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");``
3. 连接和资源管理
管理与 RabbitMQ 连接的核心组件是 ConnectionFactory
接口,它提供 org.springframework.amqp.rabbit.connection.Connection
的实例,Connection
是对 com.rabbitmq.client.Connection
的包装。
public interface ConnectionFactory {
Connection createConnection() throws AmqpException;
String getHost();
int getPort();
String getVirtualHost();
String getUsername();
void addConnectionListener(ConnectionListener listener);
boolean removeConnectionListener(ConnectionListener listener);
void clearConnectionListeners();
}
public interface Connection {
Channel createChannel(boolean transactional) throws AmqpException;
void close() throws AmqpException;
boolean isOpen();
int getLocalPort();
}
CachingConnectionFactory
实现了 ConnectionFactory
接口,它建立一个可由应用程序共享的单个代理连接(即Connection
,可以共享的原因在于,AMQP 的工作单元是在连接的Channel
中,一个连接有多个Channel
),Connection
提供了 createChannel
方法来创建 Channel
,而CachingConnectionFactory
能够缓存这些 Channel
,当然,可以调用setChannelCacheSize()
方法来设置缓存的Channel
的数量(默认是25个)。
同样,CachingConnectionFacotry
也能够将缓存模式设置为CONNECTION来缓存 Connection
,此时,不支持自动创建队列、交换器等。可以通过 connectionLimit
属性来设置缓存连接的数量,如果达到最大限制,那么 Channel
将会等待连接空闲,如果等待 channelCheckoutTimeout
设置的超时时间仍然没有空闲,会抛出 AmqpTimeoutException
异常。
连接RabbitMQ的一个例子:
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
xml配置时:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
消息确认和返回
通过将 CachingConnectionFactory
的 publisherConfirms
和publisherReturns
属性分别设置为true
来支持确认和返回的消息。
设置这些选项后,ConnectionFactory
创建的Channel
包装在 PublisherCallbackChannel
中,用于方便回调。当获得这样的信道时,客户端可以注册PublisherCallbackChannel.Listener
。该PublisherCallbackChannel
实现包含将确认/返回路由到适当的侦听器的逻辑。
自动恢复
当处理失败时,Spring AMQP 提供了自动恢复机制,当连接重新建立后,RabbitAdmin
将会重新申明基础的队列、交换器等信息。从4.0开始,amqp-client 也提供了默认的自动恢复机制,如果想使用Spring AMQP 的自动恢复机制,则只需将 amqp-client 的 ConnectionFactory
的 automaticRecoveryEnabled
属性设置为 false
。
自定义连接属性
``connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar");``
4. AmqpTemplate
AmqpTempalte
是Spring AMQP 提供的高级抽象接口,它定义了消息发送和接收的通用方法,具体实现依赖于不同的客户端,目前仅RabbitMQ提供了实现类RabbitTemplate
。
重试功能
可以为 RabbitTemplate
添加 RetryTemplate
来进行失败重试,RetryTemplat
来自 spring-retry 项目,例如下边的配置使用指数重试策略,它将失败后重试3次,之后再抛出异常:
xml配置:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
Java代码配置:
@Bean
public AmqpTemplate rabbitTemplate();
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
除了RetryTemplate
的方式外,还支持设置重试回调,这样就不需要使用RetryTemplate
:
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
发布者确认和返回
RabbitTemplate
支持消息返回,前提是 CachingConnectionFactory
的publisherReturns
属性设置为true
,并且需要将RabbitTemplate
的 mandatory
设置为 true
,或者 mandatoryExpression
表达式的值为 true
。
RabbitTemplate.ReturnCallback
通过调用注册来将返回发送到客户端setReturnCallback(ReturnCallback callback)
。回调必须实现此方法:
void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey);
需要注意的时候,一个 RabbitTemplate
只能设置一次 ReturnCallback
。
消息超时
消息回复默认的超时时间为5秒,可通过接收消息带replyTimeout参数的方法来设置超时时间。
要实现消息确认,那么需要将 CachingConnectionFactory
的 publisherConfirms
属性设置为true
。RabbitTemplate.ConfirmCallback
通过调用setConfirmCallback(ConfirmCallback callback)
来实现消息确认回调,必须实现此方法:
``void confirm(CorrelationData correlationData, boolean ack, String cause);``
CorrelationData
是客户端在发送消息是绑定的关联对象,ack
代表是否确认了消息,如果其为 false
,那么可能由于某些异常原因导致的不能确认,参数 cause
就代表了这个原因。
需要注意的是:
发送消息时,如果exchange不存在,那么confirmCallback会被执行,但是returnCallback不会被执行,confirmCallback的ack为false,cause为异常信息
发送消息时,如果routingkey不存在,那么confirmCallback、returnCallback都会被执行,但是confirmCallback的ack是true,cause为null
具体可以阅读 #confirmAndReturn[示例代码].
5. 消息的发送
AmqpTeplate
提供了几个发送消息的基本方法:
// 发送消息到默认的Exchange,使用默认的routing key
void send(Message message) throws AmqpException;
// 使用指定的routing key发送消息到默认的exchange
void send(String routingKey, Message message) throws AmqpException;
// 使用指定的routing key发送消息到指定的exchange
void send(String exchange, String routingKey, Message message) throws AmqpException;
一个发送消息的示例:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO",
new Message("12.34".getBytes(), someProperties));
可以在模板上设置exchange,或者设置routing key:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
Spring AMQP 也挺消息构建器来构建消息:
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
AmqpTemplate 还提供了面向POJO(而不是Message)的消息发送方法,同时还支持使用 MessagePostProcessor 在消息发送之前(转换器执行之后)修改消息:
// 转换POJO为Message,并使用默认的routing key发送到默认的exchange
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
// 支持消息发送之前通过MessagePostProcessor来修改消息
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
RabbitTemplate
添加了几个发送消息的方法:
public void convertAndSend(String routingKey, final Object object, CorrelationData correlationData) throws AmqpException {
convertAndSend(this.exchange, routingKey, object, correlationData);
}
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) throws AmqpException {
send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
}
这些方法支持在发送消息的时候传递CorrelationData
,它是一个关联对象,在消息确认回调时传递,用来区分不同的消息,以便消息重发,具体可以看 这里对correlation id的介绍。
6. 消息的接收
接收消息有两个常见的方式:Consumer 轮询和异步监听,后者要优于前者。
轮询
AmqpTemplate 提供了常用的接收消息的方法:
// 从默认的Queue中接收消息,立即返回(不阻塞),没有则返回null
Message receive() throws AmqpException;
// 从指定的queue从接收消息,立即返回(不阻塞),没有则返回null
Message receive(String queueName) throws AmqpException;
// 从默认的queue中接收消息,等待timeoutMillis时间后返回,
// timeoutMillis为0表示不阻塞立即返回,负数表示无限期阻塞等待消息
Message receive(long timeoutMillis) throws AmqpException;
// 从指定的queue接收消息,等到timeoutMillis时间
Message receive(String queueName, long timeoutMillis) throws AmqpException;
同发送一样,也支持面向POJO的对象接收:
// 从默认的queue中接收消息并转换为结果对象,立即返回
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
// 从默认的queue中接收消息并转换为结果对象,等到指定时间
Message receiveAndConvert(long timeoutMillis) throws AmqpException;
Message receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
当消息需要返回时,AmqpTemplate 也提供了接收和回复的方法,只需要提供消息回复回调:
// 从默认queue中接收消息,并调用提供的回调来发送回复消息,前提是回调方法需要返回一个回复对象,如果返回为null,那么作用同receive()方法一样
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
……
// 从指定的queue中接收消息,并指定回复回调,reppyToAddressCallback用于设定消息回复的地址
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
具体可以看 #messageReply[消息回复示例]。
7. 异步消息
Spring AMQP 设计了专门的用于一部消息接收的组件,包含消息监听器(MessageListener
)、消息监听适配器(MessageListenerAdapter
)、消息容器(Container
)等。
MessageListener
MessageListener
专用用来监听接收的消息,用于设置回调:
public interface MessageListener {
void onMessage(Message message);
}
如果需要用到AMQP 的Channel
,那么可以使用ChannelAwareMessageListener
,它是单独定义的可以访问Channel
的接口:
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
一个典型的使用Channel的场景是我们手动来确认消息:
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("Received : " + message);
// 手动确认,不确认会导致消息一直重发
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
MessageListenerAdapter
如果要严格区分业务逻辑和消息传递API(被称为“消息驱动POJO”),可以使用消息监听适配器,它的作用就是在于将POJO适配为消息处理对象。我们可以指定POJO消息处理的方法,不指定则使用默认的handleMessage
方法,另外,也可以集成消息监听是配器,并覆盖getListenerMethodName
方法以便根据消息来动态选择不同的方法。
getListenerMethodName
方法实现:
protected String getListenerMethodName(Message originalMessage, Object extractedMessage) throws Exception {
if (this.queueOrTagToMethodName.size() > 0) {
MessageProperties props = originalMessage.getMessageProperties();
String methodName = this.queueOrTagToMethodName.get(props.getConsumerQueue());
if (methodName == null) {
methodName = this.queueOrTagToMethodName.get(props.getConsumerTag());
}
if (methodName != null) {
return methodName;
}
}
return getDefaultListenerMethod();
}
它根据原始消息的consumerQueue和consumerTag来获取消息处理的方法名称。
Container
有了消息监听器和消息监听适配器还不行,他们不能运行,此时需要用到容器,容器是一个生命周期组件,可以启动和停止。一个典型的实现是SimpleMessageListenerContainer
,
示例
手动确认消息:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
// 手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(RabbitConfiguration.QUEUE_NAME);
// 这里使用ChannelAwareMessageListener以获得Channel来进行手动确认
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("Received : " + message);
// 手动确认,不确认会导致消息一直重发
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
container.start();
8. 示例代码
8.1. Hello world
上边介绍的都是一些理论,现在来编写一个 Spring AMQP 的demo工程,看看他是如何工作的。
1、创建一个spring-amap的spring boot 工程,引入如下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>
2、编写测试类SimplestDemo
:
public static void main(String[] args) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.0.27", 5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
// 通过AmqpAdmin来申明队列
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
// 匿名队列,默认的命名为UUID
Queue queue = new AnonymousQueue();
// 使用Base64命名策略,添加自定义前缀
// Queue queue = new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("demo-"));
admin.declareQueue(queue);
System.err.println("Queue name is : " + queue.getName());
AmqpTemplate template = new RabbitTemplate(connectionFactory);
String str = "this is foo.";
template.convertAndSend(queue.getName(), str);
System.err.println("Send : " + str);
String foo = (String) template.receiveAndConvert(queue.getName());
System.err.println("Received : " + foo);
User user = new User("李四");
System.err.println("Send : " + user);
template.convertAndSend(queue.getName(), user);
// 接收消息并直接转换为对象,最大等待2秒
user = (User) template.receiveAndConvert(queue.getName(), 1000 * 2);
System.err.println("Received : " + user);
}
前边已经介绍过 #AnymousQueue[匿名队列],上述代码使用的就是匿名队列,使用完了会自动删除。示例中,首先创建一个 CachingConnectionFacotry,并设置了 RabbitMQ 的地址和端口以及操作的用户名、密码;其次,创建了一个 AmqpAdmin
对象,用它来创建Exchange
和Queue
等;然后,创建了一个 AmqpTemplate
,并调用其 convertAndSend
方法发送消息,receiveAndConvert
方法接收消息。
3、运行main方法,可以看到控制台成功打印了发送的信息和接收的信息,说明消息的发送和接收成功,一个最简单的demo完成。
8.2. 消息回复
接下来,我们再编写一个消息回复的例子。生产者发送消息给消费者,消费者可以对消息进行回复。假设有一个User
对象,生产者发送该对象,消费者拿到对象进行处理,然后回复给生产者,我们看看代码如何实现:
1、接着上边的helloworld工程,新建一个User
对象:
public class User implements Serializable {
private String name;
public User() {
}
public User(String name) {
this.name = name;
}
// 省略getter、setter
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
'}';
}
]
2、新建测试demo类ReceiveAndReplyDemo
,编写如下代码:
public class ReceiveAndReplyDemo {
public static void main(String[] args) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.0.27", 5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
// 通过AmqpAdmin来申明队列
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
Queue queue = new AnonymousQueue();
admin.declareQueue(queue);
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 发送消息,生产者
template.convertAndSend(queue.getName(), new User("王五"));
// 接收消息并直接转换为对象,并进行回复,消费者
boolean received = template.receiveAndReply(queue.getName(),
// 回复回调
new ReceiveAndReplyCallback<User, User>() {
@Override
public User handle(User payload) {
System.err.println("Handle : " + payload);
// 将消息payload修改改一下
payload.setName("王五改名了");
return payload;
}
},
// 设置回复地址
new ReplyToAddressCallback<User>() {
@Override
public Address getReplyToAddress(Message request, User reply) {
// 改完后的消息回复给发送方
System.err.println("Replying : " + reply);
return new Address("", queue.getName());
}
}
);
// 收到消费者的回复,生产者
User replyUser = (User) template.receiveAndConvert(queue.getName(), 1000 * 3);
System.err.println("Replied user : " + replyUser);
System.err.println("Received : " + received);
}
}
3、运行Main方法,可以看到生产者发送User对象给消费者,消费者受到消息并更改了用户名,然后回复给生产者,生产者再成功接收到了回复的消息。
8.3. 消息的确认和返回
#publisherConfirmAndReturn[发布者确认和返回]一节提到,AmqpTemplate支持发布者消息确认和返回,我们来看看如何实现。
1、接着helloworld的工程,我们新建一个ConfirmAndReturnDemo
的类,编写如下代码:
public class ConfirmAndReturnDemo {
public static void main(String[] args) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.0.27", 5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
// 开启消息确认和回调
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
Queue queue = new AnonymousQueue();
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(queue);
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 设置为true,则消息改为手动返回
template.setMandatory(true);
// 设置消息返回回调,一个RabbitTemplate只能设置一次返回回调
// 当消息不能成功投递,会抛出AmqpMessageReturnedException,该异常包含ReturnCallback所需的参数信息,此时会执行回调
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.err.println("Return callback : ");
System.err.println(" message : " + message);
System.err.println(" reply : " + replyCode + ", " + replyText);
});
// 消息确认回调,一个RabbitTemplate只能设置一次确认回调
template.setConfirmCallback((correlationData, ack, cause) -> {
System.err.println("Confirm callback : ");
System.err.println(" correlationData : " + correlationData);
System.err.println(" ack : " + ack);
System.err.println(" cause : " + cause);
});
String str = "this is foo.";
template.convertAndSend(queue.getName(), (Object) str, new CorrelationData(UUID.randomUUID().toString()));
System.err.println("Send : " + str);
String foo = (String) template.receiveAndConvert(queue.getName());
System.err.println("Received : " + foo);
User user = new User("zhangsan");
template.convertAndSend(queue.getName(), user, new CorrelationData(UUID.randomUUID().toString()));
System.err.println("Send : " + user);
User receivedUser = (User) template.receiveAndConvert(queue.getName());
System.err.println("Received : " + receivedUser);
// 不能成功投递消息
System.err.println("Mock message body can't be sent.");
str = "can not be sent.";
// routing key无法匹配,则confirmCallback和returnCallback都会被触发,ack仍然为true
template.convertAndSend("dontExist.exchange", (Object) str, new CorrelationData(UUID.randomUUID().toString()));
// exchange找不到,则会触发confirmCallback,但是returnCallback不会被触发,ack为false,cause包含异常信息,此时channel会关闭
template.convertAndSend("dontExist.exchange", "dontExist.routingKey", str, new CorrelationData(UUID.randomUUID().toString()));
}
}
代码首先创建的连接工厂,CachingConnectionFactory
的publisherConfirms
、publisherReturns
属性都设置为true,以启用发布者回调和返回;然后申明了匿名队列,然后使用RabbitTemplate来发送和接收消息,它的mandatory
属性设置为true
来启用消息返回,然后别分为其指定了消息返回回调和消息确认回调,注意它们分别只能设置一个;最后,正常发送两个消息,然后分别模拟一个routingkey不存在、exchange不存在的两次消息发送,以观察打印的结果。
2、执行main方法,查看打印信息如下:
Send : this is foo.
Confirm callback :
correlationData : CorrelationData [id=71ec4137-8fe6-43dd-9552-147a7807b686]
ack : true
cause : null
Received : this is foo.
Confirm callback :
correlationData : CorrelationData [id=662ef2e6-41f9-4461-8c7f-d654ab463ded]
ack : true
Send : User{name='zhangsan'}
cause : null
Received : User{name='zhangsan'}
Mock message body can't be sent.
Return callback :
message : (Body:'can not be sent.' MessageProperties [headers={}, timestamp=null, ……
reply : 312, NO_ROUTE
Confirm callback :
correlationData : CorrelationData [id=23386712-9922-4682-b92f-70e42bb28182]
ack : true
cause : null
Confirm callback :
correlationData : CorrelationData [id=bbdb39ed-5ad8-4818-bd96-eab58c8d5dad]
ack : false
cause : channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'cant_sent_exchange' in vhost '/', class-id=60, method-id=40)
更多示例代码可以看github上的源代码。
9. 总结
本章内容暂时到这里,更多详细内容,后续再来逐一学习,总结一下:
1、Spring AMQP 对 AMQP 规范做了进一步封装,并提供了自身的封装实体,诸如 Message
、Exchange
、Queue
、Binding
、AmqpTemplate
等
2、在使用Spring AMQP时,我们重心在于处理如何收发消息的业务逻辑,而不必关心连接RabbitMQ等其他细节
3、AmqpTemplate
是高层级的抽象,定义消息收发的通用方法,其实现RabbitTemplate
是对于RabbitMQ客户端的具体实现,并且进行了扩展
4、AmqpAdmin
用来管理RabbitMQ,但是需要用户具备相应的权限
5、消息接收最好的方式是异步
6、发布者消息确认和返回时,注意条件,满足条件才能进行确认和返回,并且一个RabbitTemplate只能设置一个确认回调和返回回调
本文的实例代码见: github