上一篇 "Spring AMQP简介和使用",我们介绍了Spring AMQP的一些基本要素和概念,也通过一些示例代码介绍了消息的发送和接收,但是都是使用的原始编码方式来实现,并不依赖Spring环境。其实,Spring AMQP也支持使用注解的方式来进行异步接收消息,极大的简化了编码。

1. hello world

要使用注解,首先需要在Spring应用环境中,我们看一个最简单的demo:

1、重新新建一个Spring boot工程,添加如下依赖:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>

2、新建一个Spring配置类RabbitConfiguration,用来申明Bean:

@Configuration
public class RabbitConfiguration {
    public static final String ANONYMOUS_QUEUE_NAME = "spring.amqp.anonymous.queue";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("192.168.0.27", 5672);
        cachingConnectionFactory.setUsername("admin");
        cachingConnectionFactory.setPassword("123456");
        return cachingConnectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    public Queue anonymousQueue() {
        // 匿名队列
        return new AnonymousQueue(() -> ANONYMOUS_QUEUE_NAME);
    }
}

配置类中,创建了ConnectionFactoryAmqpAdminAnonymousQueueRabbitTemplate等Bean,Spring容器启动后就可以使用它们了。

3、创建测试类HelloWorldDemo

@Component
public class HelloWorldDemo {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(String msg) {
        rabbitTemplate.convertAndSend(RabbitConfiguration.ANONYMOUS_QUEUE_NAME, msg);
        System.err.println("Send : " + msg);
    }

    @RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME)
    public void receive(String msg) {
        System.err.println("Received : " + msg);
    }
}

重点来了,首先该类被标注了@Component注解,表示其受Spring容器管理;其次,通过@Resource注解注入了rabbitTemplate Bean,并使用它来发送String类型的消息;第三,在消息接收的方法receive上标注了@RabbitListener注解,稍后再来看这个注解。

4、创建Spring boot工程启动类SpringAmqpApplication

@SpringBootApplication
@EnableRabbit
public class SpringAmqpApplication {

    public static void main(String[] args) {
        ApplicationContext context = SpringApplication.run(SpringAmqpApplication.class, args);

        HelloWorldDemo helloWorldDemo = context.getBean(HelloWorldDemo.class);
        helloWorldDemo.send("hello world!");
        helloWorldDemo.send("hi, belonk!");
        helloWorldDemo.send("张三");
    }
}

启动应用,可以看到成功接收了消息。

要启动注解的消息监听,需要在配置类上加上@EnableRabbit注解。

2. RabbitListener

通过一个例子来了解一下@RabbitListener的作用:

@Component
public class MyService {
    @RabbitListener(queues = "myQueue")
    public void processOrder(String data) {
        ...
    }
}

上边的实例使用了 @RabbitListener注解来监听名为“myQueue”的队列,只要该队列有消息可用,则会交给 processOrder方法处理,但是要确保该队列存在并绑定到了exchange上。使用该注解标记的方法或类,都会使用 RabbitListenerContainerFactory为其创建一个容器,上篇提过,异步消息监听的容器 MessageListenerContainer是由改类创建的。可见,Spring AMQP的注解消息监听是采用异步的方式, @RabbitListener注解是由 RabbitListenerAnnotationBeanPostProcessor处理的。

@RabbitListener注解标记的方法可以支持集中类型的参数:

  • com.rabbitmq.client.Channel:访问Channel

  • org.springframework.amqp.core.Message:接收的消息对象

  • 实体对象:消息中对应的负载的实体对象,自动推导负载实体

  • org.springframework.messaging.Message:Spring-messaging中的消息对象

  • @Payload:标记在消息负载实体上,明确指定消息的负载对象

  • @Header:获取特定一个消息头内容

  • @Headers:标注在一个Map上,用来获取所有消息头

  • MessageHeaders:spring-messaging的消息头

还有几个就不在一一列举了,我们看一个获取Channel和header的例子:

public void send(String msg) { (1)
    rabbitTemplate.convertAndSend(RabbitConfiguration.ANONYMOUS_QUEUE_NAME_1, (Object) msg, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().getHeaders().put("custom_header", "this is a custom header.");
            return message;
        }
    });
    System.err.println("Send : " + msg);
}

@RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_1)
public void receive(String msg, Channel channel, @Header("custom_header") String header) { (2)
    System.err.println("Received : " + msg);
    System.err.println("Header   : " + header);
    System.err.println("Channel  : " + channel.getChannelNumber());
}
1send方法设置了一个名为custom_header的自定义消息头
2receive方法通过@Header来获取,并添加Channel对象。

再看一个@Payload标注的例子:

@RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_2)
public void receive(@Payload User user, Channel channel, @Header("custom_header") String header) { (1)
    System.err.println("Received : " + user);
    System.err.println("Header   : " + header);
    System.err.println("Channel  : " + channel.getChannelNumber());
}
1这里的User参数可以根据消息自动推导,可以不加上@Payload注解。
注意
User必须实现Serializable接口,上一篇已经提到过,默认的消息转换器是使用的SimpleMessageConverter,它只能处理java序列化对象、Stringbyte[]

定义绑定和Exchange

@RabbitListener有多个属性,可以用来指定监听的队列、绑定关系,例如:

@Component
public class MyService {

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
        key = "orderRoutingKey")
  )
  public void processOrder(String data) {
    ...
  }

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "auto.exch"),
        key = "invoiceRoutingKey")
  )
  public void processInvoice(String data) {
    ...
  }

}

第一个方法,指定了要监听的队列、Exchange以及routing key,queue和exchange会按需自动申明并绑定;第二个方法,将会申明非持久化、独占的、自动删除的匿名队列并绑定到exchange。

2.1. 监听多个队列

使用queues属性时,可以指定关联容器可以侦听多个队列。也可以使用@Header注解来获取接受消息的队列名称:

@Component
public class MyService {

    @RabbitListener(queues = { "queue1", "queue2" } )
    public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        ...
    }
}

也支持spEL(从1.5开始):

@Component
public class MyService {

    @RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )
    public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        ...
    }

}

3. 消息转换

在调用Listener之前,有两个消息转换步骤:

首先,使用 MessageConverter 将传入的 Spring AMQP 的 Message 转换为 Spring messaing 的 Message,此时,MessageConverter 默认使用的是SimpleMessageConverter 实现,它仅仅处理 byte[] 数组与 Stringjava.io.Serializable 之间的相互转换

其次,在调用目标方法时将消息转换为方法参数的类型。此时,MessageConverter 默认使用的是 GenericMessageConverter 实现,它将转换委托给一个转换服务(DefaultFormattingConversionService的实例)。

设置消息转换器:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    // 使用jacson消息转换器
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    ...
    return factory;
}

在1.6版本之前,消息与类型的转换需要指定消息头(type)或者ClassMapper,从1.6开始,如果@RabbitListener用在方法上,那么可以根据方法参数类型进行自动推断。

以下代码示例可以定义自己的消息转换器:

@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {

    ...

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(new GenericMessageConverter(myConversionService()));
        return factory;
    }

    @Bean
    public ConversionService myConversionService() {
        DefaultConversionService conv = new DefaultConversionService();
        conv.addConverter(mySpecialConverter());
        return conv;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

    ...

}

4. 多方法监听

Spring AMQP 支持同一个监听器调用多个方法,此时@RabbiListener注解标注在类上,多个被调用的方法上标注@RabbitHandler 注解。

例如:

@RabbitListener(id="multi", queues = "someQueue")
public class MultiListenerBean {
    @RabbitHandler
    @SendTo("my.reply.queue")
    public String bar(Bar bar) {
        ...
    }

    @RabbitHandler
    public String baz(Baz baz) {
        ...
    }

    @RabbitHandler
    public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) {
        ...
    }
}

上边的三个方法都被标注@RabbitHandler注解,表示分别监听消息被转换的负载实体是BarBazQux。需要注意的是,Spring AMQP 必须能够区分根据负载实体来区分不同的方法,即是说,每一个被@RabbitHandler标注的方法必须具有不同的负载类型,要么被@Payload标注出明确的负载实体类型,要么根据参数类型自动推断。

5. 消息回复

如果 ``@RabbitListener``监听的方法返回不为空的值,会根据发送者的消息头的 ``ReplyToAddress``的地址进行消息返回,这是由 ``MessageListenerAdapter``来处理的,如果没有设置,则可以添加 ``@SendTo``注解来定义消息返回的地址。使用 ``@SendTo``注解来表示返回结果需要转换为 ``Message``并发送到指定的回复地址(exchange和routing key)上:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    ……
    return status;
}

直接返回Message:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    ……
    return MessageBuilder
        .withPayload(status)
        .setHeader("code", 1234)
        .build();
}

@SendTo注解的value值用来表示exchange和routing key,格式为:exchange/routingKey,例如:

  • foo/bar - 回复的exchange为foo,routing key为bar

  • foo/ - 回复的exchange为foo,routing key为默认(空的)

  • bar or /bar -回复的outingKey为bar,exchange为默认.

  • / or empty - 回复的exchange和routing key都为默认.

@SendTo也支持spEL:

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
    return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
    return "test.sendTo.reply.spel";
}

引用的方法必须返回String。

现在,我们来编写一个消息确认和回复的demo,结合 上一篇的示例来看看基于注解如何工作:

1、修改RabbitConfiguration:

设置消息确认和返回:

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("192.168.0.27", 5672);
    cachingConnectionFactory.setUsername("admin");
    cachingConnectionFactory.setPassword("123456");
    // 消息确认
    cachingConnectionFactory.setPublisherConfirms(true);
    // 消息返回
    cachingConnectionFactory.setPublisherReturns(true);
    return cachingConnectionFactory;
}

创建设置了回调的RabbitTemplate

@Bean
public RabbitTemplate callbackRabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    // 消息返回
    rabbitTemplate.setMandatory(true);
    // 设置消息返回回调,一个RabbitTemplate只能设置一次返回回调
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            Printer.p("Message returned : " + replyCode + ", " + replyText);
        }
    });
    // 消息确认回调,一个RabbitTemplate只能设置一次确认回调
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            Printer.p("Message confirmed : " + ack + ", " + cause + ", " + correlationData);
        }
    });
    return rabbitTemplate;
}

再申明一个匿名的队列,代码就不贴了。

2、新建一个ConfirmAndReturnDemo

@Component
public class ConfirmAndReturnDemo {
    @Resource
    private RabbitTemplate callbackRabbitTemplate;

    private Sender sender = new Sender();

    public void send(User user) {
        sender.send(user);
        Printer.p("Send : " + user);
    }

    @Component
    public class Sender {
        public void send(User msg) {
            callbackRabbitTemplate.convertAndSend(RabbitConfiguration.ANONYMOUS_QUEUE_NAME_4, (Object) msg, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().getHeaders().put("custom_header", "this is a custom header.");
                    return message;
                }
            });
            Printer.p(this, "Send : " + msg);
        }

        @RabbitListener(bindings = {
                @QueueBinding(
                        value = @Queue(value = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_4, durable = "false", autoDelete = "true", exclusive = "true"),
                        exchange = @Exchange("exist.exchange"), key = "exist.routingKey"
                )
        })
        public void receiveReply(User user) {
            Printer.p(this, "Received reply : " + user);
        }
    }

    @Component
    class Consumer {
        @RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_4)
        // 回复到默认的队列
        // @SendTo
        // 回复到不存在的exchange和routingkey
        // @SendTo("dontExist.exchange/dontExist.routingKey")
        // 回复到存在的exchange和routingkey
        @SendTo("exist.exchange/exist.routingKey")
        // 回复到默认的exchange和不存在的routingkey
        // @SendTo("dontExist.routingKey")
        public User receive(@Payload User user) {
            Printer.p(this, "Received : " + user);
            user.setName("王五改名字了");
            return user;
        }
    }
}

这里用了两个内部类来创建生产者和消费者,生产者除了发送消息,还将监听回复的消息,只是exchangerouting key不同;消费者接收消息,并修改了Username属性,然后返回,接收方法上标注了@SendTo,指定exchange和routing key,这里可以设置多种情况来验证回调方法的执行情况。

3、启动类编写demo执行代码:

ConfirmAndReturnDemo confirmAndReturnDemo = context.getBean(ConfirmAndReturnDemo.class);
confirmAndReturnDemo.send(new User("王五"));

4、运行程序,可以看到控制台输出如下:

[Sender] Send : User(name=王五)
Send : User(name=王五)
Message confirmed : true, null, null
[Consumer] Received : User(name=王五)
[Sender] Received reply : User(name=王五改名字了)

Sender已经成功接收了回复消息。

6. 定义元注解

@RabbitListener可以用在注解上,来自定义元注解信息:

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
public @interface MyAnonFanoutListener {
}

可以看到,只需要在自定义注解上适用Spring AMQP 的注解即可,使用时:

public class MetaListener {
    @MyAnonFanoutListener
    public void handle1(String foo) {
        ...
    }

    @MyAnonFanoutListener
    public void handle2(String foo) {
        ...
    }
}

7. 总结

本文的示例代码见 github,总结一下:

1、@RabbitListener是消息异步监听的基本注解,可以定义监听的队列、队列绑定关系

2、@RabbitHandler用在需要监听多个方法时,不同的方法接收不同的消息实体,必须能够明确区分不同的实体,否则消息不能监听成功

3、@SendTo用于设定消息回复,标注的方法需要返回非空的回复实体对象

4、@EanableRabbit用来启用注解消息监听

5、@Header@Headers@Payload等用在方法签名上,用来获取消息头信息或者明确表明消息的负载实体(也可以自动推导)。


相关阅读