RabbitMQ基础(二)——工作队列Work Queues

上一篇,我们编写了从一个指定的队列发送和接收消息的程序。在本文中,我们将创建一个工作队列,用于将耗时的任务分配给多个工作人员。 Work Queues,称为工作队列(也称Task Queues,任务队列),其主旨在于避免立即执行资源密集型任务,并且必须等待它完成。相反,我们把任务安排在后来执行。对此,我们将任务封装为消息并将其发送到队列中。在后台运行的工作进程会获取任务并最终执行任务。当你运行许多工作进程时,任务将在他们之间共享。 1. 轮询调度 使用任务队列的优点之一是能够轻松地并行工作。如果我们积累了大量的工作,我们就可以增加更多的任务消费者,这样就可以很容易地扩大规模。默认情况下,RabbitMQ将依次将每个消息发送给下一个使用者。平均每个消费者将得到相同数量的消息,这种分发消息的方式称为轮询。 2. 消息确认 完成一项任务可能需要几秒钟。你可能会想,如果其中一个消费者开始了一项很长的任务,并且只完成了部分任务,会发生什么。使用我们当前的代码,一旦RabbitMQ向客户传递消息,它立即标记为删除。在这种情况下,如果你关闭一个工作进程,我们将失去它正在处理的信息。我们也将丢失所有发送给这个特定工作进程并且还未处理消息。 通常,我们希望一个工作进程挂掉后,将其任务交给其他工作进程完成。为了保证消息不丢失,RabbitMQ支持消息确认机制:当消费者处理完消息后,其反馈给RabbitMQ,表明消息已经被接收和处理,RabbitMQ可以自由删除该消息。 如果一个消费者挂掉(其通道关闭,连接关闭,或者TCP连接丢失),而没有发送ack,RabbitMQ将会知道一条消息没有被完全处理,并且将重新排队。如果同时有其他的消费者在线,那么它将很快把消息重新交给另一个消费者处理,这样就保证不会丢失任何信息。 消息不会超时:当消费者进程挂掉时,RabbitMQ将重新传递消息,即使处理消息需要很长时间。 RabbitMQ默认是开启手动消息确认的,我们可以通过autoAck=true标记明确地关闭,即采用系统自动确认消息。如果需要手动确认消息,那么将autoAck设置为false,一旦我们完成了任务,需要向工作进程发出确认消息,代码如下: // 处理完成,手动接收消息时,需要在处理成功后进行反馈,保证消息不丢失 channel.basicAck(envelope.getDeliveryTag(), false); 示例的部分关键代码如下: NewTask.java /** * 发布5条消息,每条消息的一个“.”表示消息执行时间需要1秒。 * * @param args * @throws IOException * @throws TimeoutException */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); String[] msgs = new String[]{ "First Message.", // 1s "Second Message..", // 2s "Third Message...", // 3s "Fourth Message....", // 4s "Fifth Message....." // 5s }; Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); for (String msg : msgs) { // 发布消息 channel.basicPublish("", TASK_QUEUE_NAME, null, msg.getBytes("utf-8")); System.out.println("[x] Sent '" + msg + "'"); } channel.close(); connection.close(); } ...

2018-01-24 · 3 min · 431 words · Hank

RabbitMQ基础(一)——基本概念和HelloWorld

1. 基本概念 RabbitMQ是一个消息代理,是一个erlang开发的AMQP(Advanced Message Queue )的开源实现。 RabbitMQ是轻量级的,易于部署在premises和云中。它支持多种消息传递协议。RabbitMQ可以部署在分布式和联合配置中,以满足高级别、高可用性需求。 其主要思想非常简单:它接受并转发消息。你可以把它想象成邮局:当你把邮件寄到邮箱时,你很确定邮差先生最终会把邮件寄给你的收件人。使用这个比喻,RabbitMQ是一个邮筒,一个邮局和一个邮差。 RabbitMQ与邮局的主要区别在于,它不处理纸张,而是接受、存储和转发二进制数据。 官网地址: http://www.rabbitmq.com 2. AMQP AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。 目标是实现一种在全行业广泛使用的标准消息中间件技术,以便降低企业和系统集成的开销,并且向大众提供工业级的集成服务。通过AMQP,让消息中间件的能力最终被网络本身所具有,并且通过消息中间件的广泛使用发展出一系列有用的应用程序。 Broker: 中间件。接收和分发消息的应用,RabbitMQ Server就是Message Broker。 Virtual host: 虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。 Connection: 连接。publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。 Channel: 渠道。如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。 Exchange: 路由。message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。 Queue: 队列。消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。 Binding: 绑定。exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。 3. RabbitMQ术语 3.1. Producter 即生产者。Producing就是发送,发送消息的程序是生产者(Producter)。用P表示,如下图: 3.2. Exchange 交换器,RabbitMQ中,其实消息不会直接相队列发送,而是发送给交换器,然后交换器在按照一定的规则转发给不同的队列。交换器做的事情非常简单:一方面,它接收来自生产者的消息,另一边则将消息推送到队列中。交换必须知道如何处理它接收到的消息。是否应该附加到特定的队列?是否应该附加到许多队列?或者应该被抛弃。这些规则由交换类型(exchange type)定义。 3.3. Exchange Type 交换器类型,在创建交换器时指定,用于区分交换器的不同作用,实现不同的功能。RabbitMQ定义了四种交换器类型:direct、topic、headers、fanout,每种类型都有特定的应用场景(见后续文章的详细介绍)。 direct:bindingKey和routingKey进行精确匹配,适用于精确将消息发送给指定队列; topic:bindingKey和routingKey可以进行模糊匹配,通过使用通配符"*"和"#"分别来模糊匹配一个单词和多个单词;适用于将消息按照一定的规则发送到匹配的一个或多个队列; fanout:广播,这种交换器可以将消息广播给所有订阅的交换器; header:不常用,有兴趣的话可以自行了解。 3.4. Queue 即队列,队列是邮箱的名称,它处于RabbitMQ内部。尽管消息流通过RabbitMQ和您的应用程序,但它们只能存储在队列中。队列不受任何限制,它可以根据你的需要存储尽可能多的消息----它本质上是一个无限的缓冲区。许多生产者都可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。队列上有它的名称,如下图表示: 3.5. Consumer 即消费者,Consuming跟receiving的含义类似。Consumer通常为等待接收消息的应用程序 。注意,生产者、消费者和消息代理不需要处于同一台主机上,事实上,在大多数应用场景都是如此。 ...

2018-01-18 · 2 min · 354 words · Hank