1. 前言

很早之前写过几篇关于RabbitMQ的一些基础文章,在本篇中,我们将来学习Spring AMQP。

Spring AMQP 是 Spring 对 AMQP( http://www.amqp.org) 协议的封装和扩展,它提供了模板来对消息的发送和接收进行高级抽象,还提供了基于消息驱动的POJO的支持(类似JMS,Java消息服务)。

在开始之前,首先需要了解一些 RabbitMQ 和 AMQP 的一些基础概念,并在你的机器上安装 RabbitMQ, 本文使用的spring-amqp的版本为1.7.6。

2. Spring AMQP抽象

Spring AMQP 是 Spring 对 AMQP 协议的封装和扩展,提供了消息发送和接收的模板。Spring AMQP项目将核心Spring概念应用于基于 AMQP 的消息传递解决方案的开发,以便更容易和简单的管理AMQO资源。

Spring AMQP由spring-amqp和spring-rabbit两个模块组成。spring-amqp模块位于org.springframework.amqp.core包,它的目标是提供不依赖于任何特定AMQP代理实现或客户端库的通用抽象;而spring-rabbit是spring-amqp通用抽象的具体实现,目前仅提供了rabbitmq的实现。

058c29dce41846aaa4dddcda5163b6ff

Spring AMQP包括一些基本的抽象定义(上边说过,他们位于org.springframework.amqp.coreb包中,而非AMQP协议本身定义):

2.1. Message

在0-9-1版本的 AMQP 规范中没有定义 Message 类或接口,当执行诸如 basicPublish() 操作时,内容作为字节数组参数传递,而其他属性作为单独的参数传递。Spring AMQP将 Message 类定义为更通用的AMQP域模型表示的一部分。Message该类的目的是将主体和属性封装在单个实例中,以便API可以更简单。以下示例显示了Message类定义:

public class Message {
    private final MessageProperties messageProperties;
    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
        this.body = body;
        this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
        return this.body;
    }

    public MessageProperties getMessageProperties() {
        return this.messageProperties;
    }
}

MessageProperties 接口定义了几个常见属性,例如 messageId,timestamp,contentType 等等,还可以通过调用 setHeader(String key, Object value) 方法使用用户定义的 headers 扩展这些属性。

2.2. Exchange

Exchange接口代表AMQP Exchange,它的消息由Message Producer发送,RabbitMQ 的虚拟主机(vhost)中,每个Exchange都具有唯一的名称以及一些其他属性:

public interface Exchange {
    String getName();
    String getExchangeType();
    boolean isDurable();
    boolean isAutoDelete();
    Map<String, Object> getArguments();
}

Exchange 接口中的 exchangeType 由 ExchangeTypes 抽象类定义,基本类型有:Direct,Topic,Fanout,和Headers,每种类型都对应有接口实现,不同的类型在处理绑定到队列的方式方面的行为各不相同。每种交换器类型可以看文首的几篇关于 Exchange 的文章。

2.3. Queue

同 AMQP 的队列一样,Spring AMQP 的 Queue 类表示从其中一个消息用户接收消息的组件。

public class Queue  {
    private final String name;
    private volatile boolean durable;
    private volatile boolean exclusive;
    private volatile boolean autoDelete;
    private volatile Map<String, Object> arguments;

    /**
     * The queue is durable, non-exclusive and non auto-delete.
     *
     * @param name the name of the queue.
     */
    public Queue(String name) {
        this(name, true, false, false);
    }

    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
    this(name, durable, exclusive, autoDelete, null);
    }

    // Getters and Setters omitted for brevity
}

构造函数需要指定 Queue 名称,默认创建的 Queue 可持久化(durable为true)、非独占(exclusive为false)、非自动删除(autoDelete为false)。

2.3.1. 匿名队列

如果要创建临时性的非持久化、独占的、自动删除的队列,可以使用 Queue 类的子类 AnonymousQueue :

public class AnonymousQueue extends Queue {
    public AnonymousQueue() {
        this((Map<String, Object>) null);
    }

    public AnonymousQueue(Map<String, Object> arguments) {
        super(UUID.randomUUID().toString(),  false, true, true, arguments);
    }

    public AnonymousQueue(NamingStrategy namingStrategy) {
        this(namingStrategy, null);
    }
}

匿名队列是一个非持久化的、独占、自动删除的队列,默认的队列名称为UUID,可以使用AnonymousQueue.NamingStrategy来指定其名称,例如:

@Bean
public Queue anon1() {
    // 使用默认的Base64命名策略,它会增加spring.gen-前缀,例如spring.gen-MRBv9sqISkuCiPfOYfpo4g
    return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy());
}

@Bean
public Queue anon2() {
    // 自定义前缀foo-
    return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("foo-"));
}

NamingStrategy是一个接口,默认实现类为Base64UrlNamingStrategy,它会用一个Base64的字符串来命名,Spring默认为其添加spring.gen-前缀,可以通过构造函数自己指定,可以实现接口来自定义命名策略。

2.4. Binding

绑定类描述了 ExchangeQueue 间的绑定关系:

public class Binding extends AbstractDeclarable {
    public enum DestinationType {
        QUEUE, EXCHANGE;
    }

    private final String destination;
    private final String exchange;
    private final String routingKey;
    private final Map<String, Object> arguments;
    private final DestinationType destinationType;

    public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
            Map<String, Object> arguments) {
        this.destination = destination;
        this.destinationType = destinationType;
        this.exchange = exchange;
        this.routingKey = routingKey;
        this.arguments = arguments;
    }
}

例如,使用固定的路由键(routingKey)将队列绑定到 DirectExchange

``new Binding(someQueue, someDirectExchange, "foo.bar")``

如果要绑定到 TopicExchange : 

``new Binding(someQueue, someTopicExchange, "foo.*")``

绑定到 FanoutExchange

``new Binding(someQueue,someFanoutExchange)``

Spring 还提供了构建器来构建绑定:

``Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");``

3. 连接和资源管理

管理与 RabbitMQ 连接的核心组件是 ConnectionFactory 接口,它提供 org.springframework.amqp.rabbit.connection.Connection 的实例,Connection 是对 com.rabbitmq.client.Connection 的包装。

public interface ConnectionFactory {
    Connection createConnection() throws AmqpException;
    String getHost();
    int getPort();
    String getVirtualHost();
    String getUsername();
    void addConnectionListener(ConnectionListener listener);
    boolean removeConnectionListener(ConnectionListener listener);
    void clearConnectionListeners();
}
public interface Connection {
    Channel createChannel(boolean transactional) throws AmqpException;
    void close() throws AmqpException;
    boolean isOpen();
    int getLocalPort();
}

CachingConnectionFactory 实现了 ConnectionFactory 接口,它建立一个可由应用程序共享的单个代理连接(即Connection,可以共享的原因在于,AMQP 的工作单元是在连接的Channel中,一个连接有多个Channel),Connection提供了 createChannel 方法来创建 Channel,而CachingConnectionFactory 能够缓存这些 Channel,当然,可以调用setChannelCacheSize() 方法来设置缓存的Channel的数量(默认是25个)。

同样,CachingConnectionFacotry 也能够将缓存模式设置为CONNECTION来缓存 Connection,此时,不支持自动创建队列、交换器等。可以通过 connectionLimit 属性来设置缓存连接的数量,如果达到最大限制,那么 Channel 将会等待连接空闲,如果等待 channelCheckoutTimeout 设置的超时时间仍然没有空闲,会抛出 AmqpTimeoutException 异常。

连接RabbitMQ的一个例子:

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

xml配置时:

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>

消息确认和返回

通过将 CachingConnectionFactorypublisherConfirmspublisherReturns属性分别设置为true 来支持确认和返回的消息。

设置这些选项后,ConnectionFactory 创建的Channel 包装在 PublisherCallbackChannel中,用于方便回调。当获得这样的信道时,客户端可以注册PublisherCallbackChannel.Listener。该PublisherCallbackChannel实现包含将确认/返回路由到适当的侦听器的逻辑。

自动恢复

当处理失败时,Spring AMQP 提供了自动恢复机制,当连接重新建立后,RabbitAdmin 将会重新申明基础的队列、交换器等信息。从4.0开始,amqp-client 也提供了默认的自动恢复机制,如果想使用Spring AMQP 的自动恢复机制,则只需将 amqp-client 的 ConnectionFactory 的 automaticRecoveryEnabled 属性设置为 false

自定义连接属性

``connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar");``

4. AmqpTemplate

AmqpTempalte 是Spring AMQP 提供的高级抽象接口,它定义了消息发送和接收的通用方法,具体实现依赖于不同的客户端,目前仅RabbitMQ提供了实现类RabbitTemplate

重试功能

可以为 RabbitTemplate 添加 RetryTemplate 来进行失败重试,RetryTemplat 来自 spring-retry 项目,例如下边的配置使用指数重试策略,它将失败后重试3次,之后再抛出异常:

xml配置:

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>

Java代码配置:

@Bean
public AmqpTemplate rabbitTemplate();
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    return template;
}

除了RetryTemplate的方式外,还支持设置重试回调,这样就不需要使用RetryTemplate

retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }
    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

发布者确认和返回

RabbitTemplate支持消息返回,前提是 CachingConnectionFactorypublisherReturns 属性设置为true,并且需要将RabbitTemplate 的 mandatory 设置为 true,或者 mandatoryExpression 表达式的值为 true

RabbitTemplate.ReturnCallback 通过调用注册来将返回发送到客户端setReturnCallback(ReturnCallback callback)。回调必须实现此方法:

void returnedMessage(Message message, int replyCode, String replyText,
          String exchange, String routingKey);

需要注意的时候,一个 RabbitTemplate 只能设置一次 ReturnCallback

消息超时

消息回复默认的超时时间为5秒,可通过接收消息带replyTimeout参数的方法来设置超时时间。

要实现消息确认,那么需要将 CachingConnectionFactory 的 publisherConfirms属性设置为trueRabbitTemplate.ConfirmCallback通过调用setConfirmCallback(ConfirmCallback callback)来实现消息确认回调,必须实现此方法:

``void confirm(CorrelationData correlationData, boolean ack, String cause);``

CorrelationData 是客户端在发送消息是绑定的关联对象,ack 代表是否确认了消息,如果其为 false,那么可能由于某些异常原因导致的不能确认,参数 cause 就代表了这个原因。

需要注意的是:

  • 发送消息时,如果exchange不存在,那么confirmCallback会被执行,但是returnCallback不会被执行,confirmCallback的ack为false,cause为异常信息

  • 发送消息时,如果routingkey不存在,那么confirmCallback、returnCallback都会被执行,但是confirmCallback的ack是true,cause为null

具体可以阅读 #confirmAndReturn[示例代码].

5. 消息的发送

AmqpTeplate 提供了几个发送消息的基本方法:

// 发送消息到默认的Exchange,使用默认的routing key
void send(Message message) throws AmqpException;

// 使用指定的routing key发送消息到默认的exchange
void send(String routingKey, Message message) throws AmqpException;

// 使用指定的routing key发送消息到指定的exchange
void send(String exchange, String routingKey, Message message) throws AmqpException;

一个发送消息的示例:

amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO",
    new Message("12.34".getBytes(), someProperties));

可以在模板上设置exchange,或者设置routing key:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

Spring AMQP 也挺消息构建器来构建消息:

MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .andProperties(props)
    .build();

AmqpTemplate 还提供了面向POJO(而不是Message)的消息发送方法,同时还支持使用 MessagePostProcessor 在消息发送之前(转换器执行之后)修改消息:

// 转换POJO为Message,并使用默认的routing key发送到默认的exchange
void convertAndSend(Object message) throws AmqpException;

void convertAndSend(String routingKey, Object message) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

// 支持消息发送之前通过MessagePostProcessor来修改消息
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;

void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
        throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
        throws AmqpException;

RabbitTemplate添加了几个发送消息的方法:

public void convertAndSend(String routingKey, final Object object, CorrelationData correlationData) throws AmqpException {
    convertAndSend(this.exchange, routingKey, object, correlationData);
}

public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) throws AmqpException {
    send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
}

这些方法支持在发送消息的时候传递CorrelationData,它是一个关联对象,在消息确认回调时传递,用来区分不同的消息,以便消息重发,具体可以看 这里对correlation id的介绍。

6. 消息的接收

接收消息有两个常见的方式:Consumer 轮询和异步监听,后者要优于前者。

轮询

AmqpTemplate 提供了常用的接收消息的方法:

// 从默认的Queue中接收消息,立即返回(不阻塞),没有则返回null
Message receive() throws AmqpException;

// 从指定的queue从接收消息,立即返回(不阻塞),没有则返回null
Message receive(String queueName) throws AmqpException;

// 从默认的queue中接收消息,等待timeoutMillis时间后返回,
// timeoutMillis为0表示不阻塞立即返回,负数表示无限期阻塞等待消息
Message receive(long timeoutMillis) throws AmqpException;

// 从指定的queue接收消息,等到timeoutMillis时间
Message receive(String queueName, long timeoutMillis) throws AmqpException;

同发送一样,也支持面向POJO的对象接收:

// 从默认的queue中接收消息并转换为结果对象,立即返回
Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

// 从默认的queue中接收消息并转换为结果对象,等到指定时间
Message receiveAndConvert(long timeoutMillis) throws AmqpException;

Message receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

当消息需要返回时,AmqpTemplate 也提供了接收和回复的方法,只需要提供消息回复回调:

// 从默认queue中接收消息,并调用提供的回调来发送回复消息,前提是回调方法需要返回一个回复对象,如果返回为null,那么作用同receive()方法一样
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
       throws AmqpException;
……

// 从指定的queue中接收消息,并指定回复回调,reppyToAddressCallback用于设定消息回复的地址
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
            ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

具体可以看 #messageReply[消息回复示例]。

7. 异步消息

Spring AMQP 设计了专门的用于一部消息接收的组件,包含消息监听器(MessageListener)、消息监听适配器(MessageListenerAdapter)、消息容器(Container)等。

MessageListener

MessageListener专用用来监听接收的消息,用于设置回调:

public interface MessageListener {
     void onMessage(Message message);
}

如果需要用到AMQP 的Channel,那么可以使用ChannelAwareMessageListener,它是单独定义的可以访问Channel的接口:

public interface ChannelAwareMessageListener {
    void onMessage(Message message, Channel channel) throws Exception;
}

一个典型的使用Channel的场景是我们手动来确认消息:

@Override
public void onMessage(Message message, Channel channel) throws Exception {
    System.err.println("Received : " + message);
    // 手动确认,不确认会导致消息一直重发
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

MessageListenerAdapter

如果要严格区分业务逻辑和消息传递API(被称为“消息驱动POJO”),可以使用消息监听适配器,它的作用就是在于将POJO适配为消息处理对象。我们可以指定POJO消息处理的方法,不指定则使用默认的handleMessage方法,另外,也可以集成消息监听是配器,并覆盖getListenerMethodName方法以便根据消息来动态选择不同的方法。

getListenerMethodName方法实现:

protected String getListenerMethodName(Message originalMessage, Object extractedMessage) throws Exception {
    if (this.queueOrTagToMethodName.size() > 0) {
        MessageProperties props = originalMessage.getMessageProperties();
        String methodName = this.queueOrTagToMethodName.get(props.getConsumerQueue());
        if (methodName == null) {
            methodName = this.queueOrTagToMethodName.get(props.getConsumerTag());
        }
        if (methodName != null) {
            return methodName;
        }
    }
    return getDefaultListenerMethod();
}

它根据原始消息的consumerQueue和consumerTag来获取消息处理的方法名称。

Container

有了消息监听器和消息监听适配器还不行,他们不能运行,此时需要用到容器,容器是一个生命周期组件,可以启动和停止。一个典型的实现是SimpleMessageListenerContainer

示例

手动确认消息:

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
// 手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(RabbitConfiguration.QUEUE_NAME);
// 这里使用ChannelAwareMessageListener以获得Channel来进行手动确认
container.setMessageListener(new ChannelAwareMessageListener() {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("Received : " + message);
        // 手动确认,不确认会导致消息一直重发
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
});
container.start();

8. 示例代码

8.1. Hello world

上边介绍的都是一些理论,现在来编写一个 Spring AMQP 的demo工程,看看他是如何工作的。

1、创建一个spring-amap的spring boot 工程,引入如下依赖:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>

2、编写测试类SimplestDemo

public static void main(String[] args) {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.0.27", 5672);
    connectionFactory.setUsername("admin");
    connectionFactory.setPassword("123456");
    // 通过AmqpAdmin来申明队列
    AmqpAdmin admin = new RabbitAdmin(connectionFactory);
    // 匿名队列,默认的命名为UUID
    Queue queue = new AnonymousQueue();
    // 使用Base64命名策略,添加自定义前缀
    // Queue queue = new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("demo-"));
    admin.declareQueue(queue);
    System.err.println("Queue name is : " + queue.getName());
    AmqpTemplate template = new RabbitTemplate(connectionFactory);
    String str = "this is foo.";
    template.convertAndSend(queue.getName(), str);
    System.err.println("Send : " + str);
    String foo = (String) template.receiveAndConvert(queue.getName());
    System.err.println("Received : " + foo);
    User user = new User("李四");
    System.err.println("Send : " + user);
    template.convertAndSend(queue.getName(), user);
    // 接收消息并直接转换为对象,最大等待2秒
    user = (User) template.receiveAndConvert(queue.getName(), 1000 * 2);
    System.err.println("Received : " + user);
}

前边已经介绍过 #AnymousQueue[匿名队列],上述代码使用的就是匿名队列,使用完了会自动删除。示例中,首先创建一个 CachingConnectionFacotry,并设置了 RabbitMQ 的地址和端口以及操作的用户名、密码;其次,创建了一个 AmqpAdmin对象,用它来创建ExchangeQueue等;然后,创建了一个 AmqpTemplate,并调用其 convertAndSend 方法发送消息,receiveAndConvert 方法接收消息。

3、运行main方法,可以看到控制台成功打印了发送的信息和接收的信息,说明消息的发送和接收成功,一个最简单的demo完成。

8.2. 消息回复

接下来,我们再编写一个消息回复的例子。生产者发送消息给消费者,消费者可以对消息进行回复。假设有一个User对象,生产者发送该对象,消费者拿到对象进行处理,然后回复给生产者,我们看看代码如何实现:

1、接着上边的helloworld工程,新建一个User对象:

public class User implements Serializable {
    private String name;

    public User() {
    }

    public User(String name) {
        this.name = name;
    }

    // 省略getter、setter

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                '}';
    }
]

2、新建测试demo类ReceiveAndReplyDemo,编写如下代码:

public class ReceiveAndReplyDemo {
    public static void main(String[] args) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.0.27", 5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        // 通过AmqpAdmin来申明队列
        AmqpAdmin admin = new RabbitAdmin(connectionFactory);
        Queue queue = new AnonymousQueue();
        admin.declareQueue(queue);

        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        // 发送消息,生产者
        template.convertAndSend(queue.getName(), new User("王五"));
        // 接收消息并直接转换为对象,并进行回复,消费者
        boolean received = template.receiveAndReply(queue.getName(),
                // 回复回调
                new ReceiveAndReplyCallback<User, User>() {
                    @Override
                    public User handle(User payload) {
                        System.err.println("Handle : " + payload);
                        // 将消息payload修改改一下
                        payload.setName("王五改名了");
                        return payload;
                    }
                },
                // 设置回复地址
                new ReplyToAddressCallback<User>() {
                    @Override
                    public Address getReplyToAddress(Message request, User reply) {
                        // 改完后的消息回复给发送方
                        System.err.println("Replying : " + reply);
                        return new Address("", queue.getName());
                    }
                }
        );

        // 收到消费者的回复,生产者
        User replyUser = (User) template.receiveAndConvert(queue.getName(), 1000 * 3);
        System.err.println("Replied user : " + replyUser);

        System.err.println("Received : " + received);
    }
}

3、运行Main方法,可以看到生产者发送User对象给消费者,消费者受到消息并更改了用户名,然后回复给生产者,生产者再成功接收到了回复的消息。

8.3. 消息的确认和返回

#publisherConfirmAndReturn[发布者确认和返回]一节提到,AmqpTemplate支持发布者消息确认和返回,我们来看看如何实现。

1、接着helloworld的工程,我们新建一个ConfirmAndReturnDemo的类,编写如下代码:

public class ConfirmAndReturnDemo {
    public static void main(String[] args) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.0.27", 5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        // 开启消息确认和回调
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);

        Queue queue = new AnonymousQueue();
        AmqpAdmin admin = new RabbitAdmin(connectionFactory);
        admin.declareQueue(queue);

        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // 设置为true,则消息改为手动返回
        template.setMandatory(true);
        // 设置消息返回回调,一个RabbitTemplate只能设置一次返回回调
        // 当消息不能成功投递,会抛出AmqpMessageReturnedException,该异常包含ReturnCallback所需的参数信息,此时会执行回调
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.err.println("Return callback : ");
            System.err.println("    message : " + message);
            System.err.println("    reply   : " + replyCode + ", " + replyText);
        });
        // 消息确认回调,一个RabbitTemplate只能设置一次确认回调
        template.setConfirmCallback((correlationData, ack, cause) -> {
            System.err.println("Confirm callback : ");
            System.err.println("    correlationData : " + correlationData);
            System.err.println("    ack   : " + ack);
            System.err.println("    cause : " + cause);
        });

        String str = "this is foo.";
        template.convertAndSend(queue.getName(), (Object) str, new CorrelationData(UUID.randomUUID().toString()));
        System.err.println("Send : " + str);
        String foo = (String) template.receiveAndConvert(queue.getName());
        System.err.println("Received : " + foo);

        User user = new User("zhangsan");
        template.convertAndSend(queue.getName(), user, new CorrelationData(UUID.randomUUID().toString()));
        System.err.println("Send : " + user);
        User receivedUser = (User) template.receiveAndConvert(queue.getName());
        System.err.println("Received : " + receivedUser);

        // 不能成功投递消息
        System.err.println("Mock message body can't be sent.");
        str = "can not be sent.";
        // routing key无法匹配,则confirmCallback和returnCallback都会被触发,ack仍然为true
        template.convertAndSend("dontExist.exchange", (Object) str, new CorrelationData(UUID.randomUUID().toString()));
        // exchange找不到,则会触发confirmCallback,但是returnCallback不会被触发,ack为false,cause包含异常信息,此时channel会关闭
        template.convertAndSend("dontExist.exchange", "dontExist.routingKey", str, new CorrelationData(UUID.randomUUID().toString()));
    }
}

代码首先创建的连接工厂,CachingConnectionFactorypublisherConfirmspublisherReturns属性都设置为true,以启用发布者回调和返回;然后申明了匿名队列,然后使用RabbitTemplate来发送和接收消息,它的mandatory属性设置为true来启用消息返回,然后别分为其指定了消息返回回调和消息确认回调,注意它们分别只能设置一个;最后,正常发送两个消息,然后分别模拟一个routingkey不存在、exchange不存在的两次消息发送,以观察打印的结果。

2、执行main方法,查看打印信息如下:

Send : this is foo.
Confirm callback :
    correlationData : CorrelationData [id=71ec4137-8fe6-43dd-9552-147a7807b686]
    ack   : true
    cause : null
Received : this is foo.

Confirm callback :
    correlationData : CorrelationData [id=662ef2e6-41f9-4461-8c7f-d654ab463ded]
    ack   : true
Send : User{name='zhangsan'}
    cause : null
Received : User{name='zhangsan'}

Mock message body can't be sent.
Return callback :
    message : (Body:'can not be sent.' MessageProperties [headers={}, timestamp=null, ……
    reply   : 312, NO_ROUTE
Confirm callback :
    correlationData : CorrelationData [id=23386712-9922-4682-b92f-70e42bb28182]
    ack   : true
    cause : null

Confirm callback :
    correlationData : CorrelationData [id=bbdb39ed-5ad8-4818-bd96-eab58c8d5dad]
    ack   : false
    cause : channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'cant_sent_exchange' in vhost '/', class-id=60, method-id=40)

更多示例代码可以看github上的源代码。

9. 总结

本章内容暂时到这里,更多详细内容,后续再来逐一学习,总结一下:

1、Spring AMQP 对 AMQP 规范做了进一步封装,并提供了自身的封装实体,诸如 MessageExchangeQueueBindingAmqpTemplate

2、在使用Spring AMQP时,我们重心在于处理如何收发消息的业务逻辑,而不必关心连接RabbitMQ等其他细节

3、AmqpTemplate是高层级的抽象,定义消息收发的通用方法,其实现RabbitTemplate是对于RabbitMQ客户端的具体实现,并且进行了扩展

4、AmqpAdmin用来管理RabbitMQ,但是需要用户具备相应的权限

5、消息接收最好的方式是异步

6、发布者消息确认和返回时,注意条件,满足条件才能进行确认和返回,并且一个RabbitTemplate只能设置一个确认回调和返回回调

本文的实例代码见: github


相关阅读