上一篇,我们介绍了如果使用Spring AMQP注解来实现消息发送和监听,示例都是使用的默认的消息转换器,即SimpleMessageConverter
,它只能处理byte[]
、String
、java序列化对象(实现了Serializable
接口的对象)。
通常,不推荐使用Java序列化,因为它存在与Java对象强耦合、依赖java语言等缺点,Spring AMQP也提供了其他的消息转换方式,在本篇,我们将重点来看看如果将消息序列化为JSON
格式。
1. MessageConverter
Spring AMQP消息转换定义了顶层接口MessageConverter
,它的定义如下:
public interface MessageConverter {
// 将对象转换为Message对象,支持自定义消息属性
Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException;
// 将Message转换为对象
Object fromMessage(Message message) throws MessageConversionException;
}
它定义了两个方法:将对象转换为Message
,将Message
转换为对象。
同时,在AmqpTemplate中定义了便捷的消息转换和发送的方法:
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
同样,还定义消息接收并转换为对象的方法:
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
这些方法并不直接处理封装的Message
对象,而是根据设定的MessageConverter
来处理POJO,例如发送时将POJO转换为Message
再发送,又或者接收Message
并将body
转换为POJO,当然,它还会设置合适的MessageProperties
。
MessageConverter有几种实现,我们来简单看一下。
2. SimpleMessageConverter
这是MessageConverter的默认实现,它只能处理byte[]
、String
、java序列化对象(实现了Serializable
接口的对象)。
2.1. Message转换为POJO
消息底层存储的都是字节数组,从字节数组转换为Java对象有以下几种情况:
字符串:消息的
content-type
为text开头,则转换为字符串,默认使用的字符串编码为UTF-8
,可以通过defaultCharset
指定Java序列化:消息的
content-type
为application/x-java-serialized-object
,则SimpleMessageConverter
尝试将字节数组反序列化为Java对象。虽然这对于简单的原型开发可能很有用,但是通常不建议依赖Java序列化,因为它会导致生产者和消费者之间的紧密耦合
2.2. POJO转换为Message
同样,SimpleMessageConverter
将对象转换为Message
时,本质上是将其转换为字节数组,支持字符串、Java可序列化对象、字节数组,除此之外的数据类型Message的body将被设置为null。
3. SerializerMessageConverter
这个转换器类似于SimpleMessageConverter
,但它依赖spring框架的Serializer
和Deserializer
接口,而不是java的Serializable
接口。
4. Jackson2JsonMessageConverter
文章开头提过,通常不推荐使用Java序列化,因为它存在与Java对象强耦合、依赖java语言等缺点,一种替代方案是使用JSON
序列化,Spring AMQP提供了Jackson2JsonMessageConverter
转换器。
4.1. POJO转为Message
由于RabbitTemplate
默认使用的SimpleMessageConverter
,所以我们需要将其替换为Jackson2JsonMessasgeConverter
,代码如下:
@Bean
public RabbitTemplate jsonRabbitTemplate() {
RabbitTemplate jsonRabbitTemplate = new RabbitTemplate(connectionFactory());
jsonRabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return jsonRabbitTemplate;
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
return new Jackson2JsonMessageConverter();
}
Jackson2JsonMessasgeConverter
会根据接收消息的POJO类型来进行自动推断转换类型,同时,也支持手动指定转换类型,只需为其设置一个名为ClassMapper
的实体对象,用来明确标记JSON转换的对象,它的定义如下:
public interface ClassMapper {
// 将给定的clazz放入消息头
void fromClass(Class<?> clazz, MessageProperties properties);
// 根据消息头获取转换目标class
Class<?> toClass(MessageProperties properties);
}
它是一个策略接口,表明根据MessageProperties
中的特定消息头来获取转换的Class的策略,默认的策略是DefaultClassMapper
,它的特定消息为TypeId
,一个设定ClassMapper
的例子:
@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 申明json转换器,及其转换类型,默认为SimpleMessageConverter
Jackson2JsonMessageConverter messageConverter = (Jackson2JsonMessageConverter) messageConverter();
messageConverter.setClassMapper(classMapper());
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("foo", Foo.class);
idClassMapping.put("bar", Bar.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
发送消息时需要为MessageProperties
指定type
属性:
public void runDemo() throws Exception {
String json = "{\"foo\" : \"value\" }";
Message jsonMsg = MessageBuilder.withBody(json.getBytes(Charset.forName("utf-8")))
.andProperties(MessagePropertiesBuilder.newInstance().setContentType("application/json").build())
.build();
jsonMsg.getMessageProperties().setHeader(DefaultClassMapper.DEFAULT_CLASSID_FIELD_NAME, "foo");
this.jsonRabbitTemplate.send(JSON_MESSAGE_QUEUE, jsonMsg);
}
@RabbitListener(queues = JSON_MESSAGE_QUEUE)
public void listenForJsonMessage(Foo foo) {
System.out.println("listenForJsonMessage : Expected a Foo, got a " + foo);
}
4.2. Message转为POJO
Spring AMQP根据消息的头信息来转换POJO对象,这需要消息的属性MessageProperties
中Content-Type
中指定了JSON的格式,例如application/json
等。1.6之前的版本,如果头信息中的type信息(一般为type
)缺失,将会转换失败;从1.6版本开始,将采用jackson默认的转换行为(一般来说,是将json转换为map),并且,将会根据@RabbitListener
标注的方法参数类型来进行自动推断并转换。默认情况下,推断的类型会覆盖请求头设定的(根据type
)类型。
还有一些其他的转换器:
MarshallingMessageConverter:根据Spring OXM的Marshaller
、Unmarshaller
接口来转换消息,代理Spring OXM的实现;
ContentTypeDelegatingMessageConverter:根据MessageProperties的contentType
代理不同的转换器。
5. JSON转换示例
接下来,我们来编写一个基于注解的消息转换示例:
1、根据上篇的示例工程,在RabbitConfiguration添加jsonRabbitTemplate
等设置:
@Bean
public RabbitTemplate jsonRabbitTemplate() {
RabbitTemplate jsonRabbitTemplate = new RabbitTemplate(connectionFactory());
jsonRabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return jsonRabbitTemplate;
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
return new Jackson2JsonMessageConverter();
}
这里申明了一个名为jsonRabbitTemplate
的Bean和jackson2JsonMessageConverter
的转换器,并没有为json转换器设置ClassMapper
。
2、编写JsonMessageDemo
测试代码:
@Component
@RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_5)
public class JsonMessageDemo {
@Resource
private RabbitTemplate jsonRabbitTemplate;
public void send(Object obj) {
Printer.p(this, "Send : " + obj);
this.jsonRabbitTemplate.convertAndSend(RabbitConfiguration.ANONYMOUS_QUEUE_NAME_5, obj);
}
@RabbitHandler
public void receive(String json, @Header("contentType") String header) {
Printer.p(this, "content-type: " + header);
Printer.p(this, "Received : " + json);
}
@RabbitHandler
public void receive(Dept dept, @Header("contentType") String header) {
Printer.p(this, "content-type: " + header);
Printer.p(this, "Received : " + dept);
}
@RabbitHandler
public void receive(User user, @Header("contentType") String header) {
Printer.p(this, "content-type: " + header);
Printer.p(this, "Received : " + user);
}
}
这里输出了消息的ConentType
,便于验证是否是json格式的消息。
3、启动类Main方法添加测试代码:
// demo6
JsonMessageDemo jsonMessageDemo = context.getBean(JsonMessageDemo.class);
User user = new User("赵六");
jsonMessageDemo.send(user);
jsonMessageDemo.send(new Dept("技术部", user));
jsonMessageDemo.send("this is not json");
由于这里声明了Json转换器,之前的测试代码不能运行,所以需要先注释掉,运行之前的示例只需要注释掉json转换器申明的代码即可。
4、运行main方法,可以看到如下输出:
[JsonMessageDemo] Send : User(name=赵六)
[JsonMessageDemo] Send : Dept(name=技术部, manager=User(name=赵六))
[JsonMessageDemo] Send : this is not json
[JsonMessageDemo] content-type: application/json
[JsonMessageDemo] Received : User(name=赵六)
[JsonMessageDemo] content-type: application/json
[JsonMessageDemo] Received : Dept(name=技术部, manager=User(name=赵六))
[JsonMessageDemo] content-type: application/json
[JsonMessageDemo] Received : this is not json
消息的ContentType都是application/json,并且成功发送,也成功接收。
我们知道(说明在 这里
),@RabbitListener
注解标注的方法,会使用RabbitListenerContainerFactory
为其创建一个容器MessageListenerContainer
,容器通过设定的MessageListener
来进行异步监听,默认的容器是SimpleMessageListenerContainer
,要查看原始的消息内容,我们可以跟踪其executeListener方法,在其父类AbstractMessageListenerContainer
中,获取的一个消息如下:
(Body:'{"name":"赵六"}' MessageProperties [headers={TypeId=com.belonk.domain.User}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=spring.amqp.anonymous.queue5, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-ww7B814EdZCwae0T6bWw6g, consumerQueue=spring.amqp.anonymous.queue5])
可以看到,它的body是json字符串,并且自动设置了TypeId
消息头为User对象。
6. 总结
本篇先简单介绍到这里,总结一下:
1、Spring AMQP支持多种消息转换方式,最常用的是默认的和JSON,要设置消息转换,需要为RabbitTemplate配置消息转换器;
2、基于注解使用JSON消息转换器,可以根据参数进行转换类型推导,而无需设置消息头的ContentType
和TypeId
;
3、JSON消息转换器还支持配置ClassMapper来定义转换类型。
本文实例代码见 github。