RabbitMQ基础(二)——工作队列Work Queues

上一篇,我们编写了从一个指定的队列发送和接收消息的程序。在本文中,我们将创建一个工作队列,用于将耗时的任务分配给多个工作人员。 Work Queues,称为工作队列(也称Task Queues,任务队列),其主旨在于避免立即执行资源密集型任务,并且必须等待它完成。相反,我们把任务安排在后来执行。对此,我们将任务封装为消息并将其发送到队列中。在后台运行的工作进程会获取任务并最终执行任务。当你运行许多工作进程时,任务将在他们之间共享。 1. 轮询调度 使用任务队列的优点之一是能够轻松地并行工作。如果我们积累了大量的工作,我们就可以增加更多的任务消费者,这样就可以很容易地扩大规模。默认情况下,RabbitMQ将依次将每个消息发送给下一个使用者。平均每个消费者将得到相同数量的消息,这种分发消息的方式称为轮询。 2. 消息确认 完成一项任务可能需要几秒钟。你可能会想,如果其中一个消费者开始了一项很长的任务,并且只完成了部分任务,会发生什么。使用我们当前的代码,一旦RabbitMQ向客户传递消息,它立即标记为删除。在这种情况下,如果你关闭一个工作进程,我们将失去它正在处理的信息。我们也将丢失所有发送给这个特定工作进程并且还未处理消息。 通常,我们希望一个工作进程挂掉后,将其任务交给其他工作进程完成。为了保证消息不丢失,RabbitMQ支持消息确认机制:当消费者处理完消息后,其反馈给RabbitMQ,表明消息已经被接收和处理,RabbitMQ可以自由删除该消息。 如果一个消费者挂掉(其通道关闭,连接关闭,或者TCP连接丢失),而没有发送ack,RabbitMQ将会知道一条消息没有被完全处理,并且将重新排队。如果同时有其他的消费者在线,那么它将很快把消息重新交给另一个消费者处理,这样就保证不会丢失任何信息。 消息不会超时:当消费者进程挂掉时,RabbitMQ将重新传递消息,即使处理消息需要很长时间。 RabbitMQ默认是开启手动消息确认的,我们可以通过autoAck=true标记明确地关闭,即采用系统自动确认消息。如果需要手动确认消息,那么将autoAck设置为false,一旦我们完成了任务,需要向工作进程发出确认消息,代码如下: // 处理完成,手动接收消息时,需要在处理成功后进行反馈,保证消息不丢失 channel.basicAck(envelope.getDeliveryTag(), false); 示例的部分关键代码如下: NewTask.java /** * 发布5条消息,每条消息的一个“.”表示消息执行时间需要1秒。 * * @param args * @throws IOException * @throws TimeoutException */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); String[] msgs = new String[]{ "First Message.", // 1s "Second Message..", // 2s "Third Message...", // 3s "Fourth Message....", // 4s "Fifth Message....." // 5s }; Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); for (String msg : msgs) { // 发布消息 channel.basicPublish("", TASK_QUEUE_NAME, null, msg.getBytes("utf-8")); System.out.println("[x] Sent '" + msg + "'"); } channel.close(); connection.close(); } ...

2018-01-24 · 3 min · 431 words · Hank