1. 基本概念
RabbitMQ是一个消息代理,是一个erlang开发的AMQP(Advanced Message Queue )的开源实现。
RabbitMQ是轻量级的,易于部署在premises和云中。它支持多种消息传递协议。RabbitMQ可以部署在分布式和联合配置中,以满足高级别、高可用性需求。
其主要思想非常简单:它接受并转发消息。你可以把它想象成邮局:当你把邮件寄到邮箱时,你很确定邮差先生最终会把邮件寄给你的收件人。使用这个比喻,RabbitMQ是一个邮筒,一个邮局和一个邮差。
RabbitMQ与邮局的主要区别在于,它不处理纸张,而是接受、存储和转发二进制数据。
官网地址: http://www.rabbitmq.com
2. AMQP
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。
目标是实现一种在全行业广泛使用的标准消息中间件技术,以便降低企业和系统集成的开销,并且向大众提供工业级的集成服务。通过AMQP,让消息中间件的能力最终被网络本身所具有,并且通过消息中间件的广泛使用发展出一系列有用的应用程序。
Broker: 中间件。接收和分发消息的应用,RabbitMQ Server就是Message Broker。
Virtual host: 虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
Connection: 连接。publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
Channel: 渠道。如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
Exchange: 路由。message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
Queue: 队列。消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。
Binding: 绑定。exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
3. RabbitMQ术语
3.1. Producter
即生产者。Producing就是发送,发送消息的程序是生产者(Producter)。用P表示,如下图:
3.2. Exchange
交换器,RabbitMQ中,其实消息不会直接相队列发送,而是发送给交换器,然后交换器在按照一定的规则转发给不同的队列。交换器做的事情非常简单:一方面,它接收来自生产者的消息,另一边则将消息推送到队列中。交换必须知道如何处理它接收到的消息。是否应该附加到特定的队列?是否应该附加到许多队列?或者应该被抛弃。这些规则由交换类型(exchange type)定义。
3.3. Exchange Type
交换器类型,在创建交换器时指定,用于区分交换器的不同作用,实现不同的功能。RabbitMQ定义了四种交换器类型:direct、topic、headers、fanout,每种类型都有特定的应用场景(见后续文章的详细介绍)。
direct:bindingKey和routingKey进行精确匹配,适用于精确将消息发送给指定队列;
topic:bindingKey和routingKey可以进行模糊匹配,通过使用通配符"*"和"#"分别来模糊匹配一个单词和多个单词;适用于将消息按照一定的规则发送到匹配的一个或多个队列;
fanout:广播,这种交换器可以将消息广播给所有订阅的交换器;
header:不常用,有兴趣的话可以自行了解。
3.4. Queue
即队列,队列是邮箱的名称,它处于RabbitMQ内部。尽管消息流通过RabbitMQ和您的应用程序,但它们只能存储在队列中。队列不受任何限制,它可以根据你的需要存储尽可能多的消息----它本质上是一个无限的缓冲区。许多生产者都可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。队列上有它的名称,如下图表示:
3.5. Consumer
即消费者,Consuming跟receiving的含义类似。Consumer通常为等待接收消息的应用程序 。注意,生产者、消费者和消息代理不需要处于同一台主机上,事实上,在大多数应用场景都是如此。
4. 下载和安装
这里我下载windows的安装版本( http://www.rabbitmq.com/install-windows.html),在这之前,首先需要下载并按照erlang语言安装包。
由于我下载的是3.7.2版本的RabbitMQ Server,所以erlang语言需要19.3到20.2.x之间的版本。下载地址: http://www.erlang.org/downloads,按照步骤一步步安装即可。
erlang安装完成后,可以继续按照RabbitMQ server了,同样是一步步安装,较为简单,不在赘述。
按照完成后,开始菜单多了几个选项:
除了命令行工具外,服务的启动、停止、重新安装、移除都提供了响应的工具。在windows服务中,同样多了RabbitMQ的服务,同样通过服务进行启停。
5. Java客户端的Hello World
接下来,我们来用Java实现两个程序,一个发送简单消息,另一个负责接收消息并将它们打印到控制台上。这里我们暂时不深究Java API的细枝末节,仅仅简单了解RabbitMQ是如何工作的。
在下图中,P是我们的生产者,C是我们的消费者。中间的框是一个队列----RabbitMQ代表消费者保存的消息缓冲区。
首先,我们需要引入RabbitMQ的java客户端程序,为了方便,我使用Maven构建,只需引入如下依赖:
groupId:com.rabbitmq artifactId:amqp-client version:4.0.3
接下来,我们创建两个类,一个为Sender.java,表示消息发送者,另一个为Receiver.java,表示消息接收者。
package com.belonk.rmq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by sun on 2018/1/17.
*
* @author sunfuchang03@126.com
* @version 1.0
* @since 1.0
*/
public class Sender {
/**
* 队列名称
*/
public static final String QUEUE_NAME = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 连接到本地server
connectionFactory.setHost("localhost");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建通道,API通过通道完成相关任务
Channel channel = connection.createChannel();
// 创建队列,该队列非持久(服务器重启后依然存在)、非独占(非仅用于此链接)、非自动删除(服务器将不再使用的队列删除)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello, rabbit mq. 你好,rabbit mq.";
// 发布消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes("utf-8"));
System.out.println("Sent "" + msg + "".");;
channel.close();
connection.close();
}
}
package com.belonk.rmq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by sun on 2018/1/17.
*
* @author sunfuchang03@126.com
* @version 1.0
* @since 1.0
*/
public class Receiver {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 连接到本地server
connectionFactory.setHost("localhost");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建通道,API通过通道完成相关任务
Channel channel = connection.createChannel();
// 创建队列
channel.queueDeclare(Sender.QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 创建消费者,阻塞接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag : " + consumerTag);
System.out.println("exchange : " + envelope.getExchange());
System.out.println("routing key : " + envelope.getRoutingKey());
String msg = new String(body, "utf-8");
System.out.println("Received "" + msg + "".");
}
};
channel.basicConsume(Sender.QUEUE_NAME, consumer);
// channel.close();
// connection.close();
}
}
首先运行Receiver.java,可以看到程序阻塞等待接收消息;然后运行Sender.java,此时发送者发送出一条消息,Receiver控制已经成功接收消息并且已经打印出来。
注意:由于RabbitMQ消息为byte[]数组,在消息传递时注意中文乱码问题,这里将发送消息和接收消息都转为UTF-8编码。
6. 查询消息队列和数量
这里间简单使用命令行工具,使用rabbitmqctl.bat list_queues命令,如下图:
命令行工具路径如图所示。
7. 总结
本文仅按照官方的java指南,做了初步的消息发送和接收程序,对RabbitMQ的工作机制有了初步的认识。不必对具体细节太过纠结,接下来我们来一步步深入学习。