上一篇,我们编写了从一个指定的队列发送和接收消息的程序。在本文中,我们将创建一个工作队列,用于将耗时的任务分配给多个工作人员。
Work Queues,称为工作队列(也称Task Queues,任务队列),其主旨在于避免立即执行资源密集型任务,并且必须等待它完成。相反,我们把任务安排在后来执行。对此,我们将任务封装为消息并将其发送到队列中。在后台运行的工作进程会获取任务并最终执行任务。当你运行许多工作进程时,任务将在他们之间共享。
1. 轮询调度
使用任务队列的优点之一是能够轻松地并行工作。如果我们积累了大量的工作,我们就可以增加更多的任务消费者,这样就可以很容易地扩大规模。默认情况下,RabbitMQ将依次将每个消息发送给下一个使用者。平均每个消费者将得到相同数量的消息,这种分发消息的方式称为轮询。
2. 消息确认
完成一项任务可能需要几秒钟。你可能会想,如果其中一个消费者开始了一项很长的任务,并且只完成了部分任务,会发生什么。使用我们当前的代码,一旦RabbitMQ向客户传递消息,它立即标记为删除。在这种情况下,如果你关闭一个工作进程,我们将失去它正在处理的信息。我们也将丢失所有发送给这个特定工作进程并且还未处理消息。
通常,我们希望一个工作进程挂掉后,将其任务交给其他工作进程完成。为了保证消息不丢失,RabbitMQ支持消息确认机制:当消费者处理完消息后,其反馈给RabbitMQ,表明消息已经被接收和处理,RabbitMQ可以自由删除该消息。
如果一个消费者挂掉(其通道关闭,连接关闭,或者TCP连接丢失),而没有发送ack,RabbitMQ将会知道一条消息没有被完全处理,并且将重新排队。如果同时有其他的消费者在线,那么它将很快把消息重新交给另一个消费者处理,这样就保证不会丢失任何信息。
消息不会超时:当消费者进程挂掉时,RabbitMQ将重新传递消息,即使处理消息需要很长时间。
RabbitMQ默认是开启手动消息确认的,我们可以通过autoAck=true标记明确地关闭,即采用系统自动确认消息。如果需要手动确认消息,那么将autoAck设置为false,一旦我们完成了任务,需要向工作进程发出确认消息,代码如下:
// 处理完成,手动接收消息时,需要在处理成功后进行反馈,保证消息不丢失
channel.basicAck(envelope.getDeliveryTag(), false);
示例的部分关键代码如下:
/**
* 发布5条消息,每条消息的一个“.”表示消息执行时间需要1秒。
*
* @param args
* @throws IOException
* @throws TimeoutException
*/
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
String[] msgs = new String[]{
"First Message.", // 1s
"Second Message..", // 2s
"Third Message...", // 3s
"Fourth Message....", // 4s
"Fifth Message....." // 5s
};
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
for (String msg : msgs) {
// 发布消息
channel.basicPublish("", TASK_QUEUE_NAME, null, msg.getBytes("utf-8"));
System.out.println("[x] Sent '" + msg + "'");
}
channel.close();
connection.close();
}
/**
* 启动三个Worker进程。
*
* 任务发布者每发送一个"."号,线程睡眠1秒,模拟工作时间。
*
* 可以看到各个Worker轮询执行任务。
*
* @param args 参数
* @throws IOException
* @throws TimeoutException
*/
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(NewTask.TASK_QUEUE_NAME, false, false, false, null);
// 创建消费者,阻塞接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("Received "" + msg + "".");
try {
doWork(msg);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
// 处理完成,手动接收消息时,需要在处理成功后进行反馈,保证消息不丢失
// 如果消费者工作进程挂掉,自动分给其他消费者
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动接收消息并处理,接收后,立即将消息标记为删除,如果工作进程挂掉,者其未处理和处理中的消息都将丢失
// boolean autoAck = true; // acknowledgment is covered below
// 手动接收消息,处理完成后调用channel.basicAck()反馈给RabbitMQ
boolean autoAck = false;
channel.basicConsume(NewTask.TASK_QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) throws InterruptedException {
System.out.println("Doing some works.");
for (char ch : task.toCharArray()) {
// 模拟工作时间消耗
if (ch == '.') {
Thread.sleep(1000);
}
}
}
NewTask为任务发布者,即生产者,Woker为工作者,即消费者。
启动两个Woker,然后运行NewTask,发送5条消息,结果可以看到消息进行了轮询分发:
worker1:First Message、Third Message、Fifth Message worker2:Second Message、Fourth Message
然后关闭各个进程,重新启动三个Woker,然后运行NewTask,发送5条消息,然后快速关闭woker1,结果可以看到5条消息都被成功处理,原本该work1处理的消息被转发到了其他两个woker上,说明消息确认机制保证了消息不丢失。
2.1. 忘记确认消息
开发者经常会忘记设置basicAck,这样一个简单的错误,带来的后果确非常严重。如果客户端退出,RabbitMQ仍然会重新分发消息(看起来像随机分发),因为他没有收到处理成功反馈。由于RabbitMQ不能释放这些未收到反馈信息的消息,它会消耗越来越多的内存,导致性能问题设置RabbitMQ挂掉。
该问题的解决办法是,使用rabbitmqctl工具打印messages_unacknowledged字段内容,来进行调试:
linux上命令为:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
windows上为:
$ rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
接着前一节的Woker和NewTask例子,注释掉channel.basicAck(envelope.getDeliveryTag(), false);代码后,看看程序运行情况:
开三个工作客户端Woker,然后多次执行NewTask分发5条消息,可以看到,由于处理成功后客户端没有进行反馈,客户端会收到重复的消息,通过rabbitmqctl查看未确认的消息数量,结果如下:
D:\soft\RabbitMQ Server\rabbitmq_server-3.7.2\sbin>rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged Timeout: 60.0 seconds ... Listing queues for vhost / ... helloworld 2 0 hello 0 13
可以看到当前队列hello里边还有13条消息,随着NewTask重复执行,消息会越来越多。这表明,如果没有收到Woker的消息处理成功确认,RabbitMQ队列中会一直保留消息(因为不知道消息是否成功处理),造成内存资源浪费。
当我们开启消息反馈后(取消注释channel.basicAck()),然后在运行一个客户端,可以看到它将会接收所有剩余的消息,并且队列中的消息数量会递减为0,消息最终被该客户端一一成功处理。
3. 消息和队列持久化
我们已经知道,即使消费者挂掉,如何保证消息不丢失。但是,如果RabbitMQ挂掉了,我们的消息仍然会丢失。此时应该这么处理?
当RabbitMQ退出或者挂掉,所有的队列和消息数据将会丢失,除非我们进行持久化设置。我们需要做两件事情:
首先,在创建队列的时候将其设置为持久化:
boolean durable = true;
channel.queueDeclare("durable_queue", durable, false, false, null);
前边我们已经创建过hello队列了,对于已经存在的队列,RabbitMQ不允许重新设置,否则会抛出ShutdownSignalException异常,告诉你设置的参数与已有的参数不一致,可以给队列命名新的名称来解决。
boolean durable = true;
channel.queueDeclare("durable_queue", durable, false, false, null);
第二,在发布消息时,需要设置消息持久化参数:
// 设置消息持久化参数
AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
// 发布消息
channel.basicPublish("", TASK_QUEUE_NAME, props, msg.getBytes("utf-8"));
System.out.println("[x] Sent '" + msg + "'");
模拟是否成功持久化的过程很简单,不启动消费者,直接启动生产者发布消息,然后重启RabbitMQ,然后再通过命令行工具rabbitmqctl查询队列和消息数量,就可以验证队列和未被成功处理消息是否已经持久化,这里不再赘述。
注意 将消息标记为持久化并不能完全保证消息不丢失,尽管RabbitMQ会保存消息到磁盘上,但是仍然有可能RabbitMQ接收了消息但是还没来得及保存。并且,RabbitMQ并不会对每一条消息进行fsync,它可能仅仅保存消息到缓存中而不是写磁盘。尽管持久化担保并不强大,但是对于简单队列已经足够了,如果还需要更强大的保障机制,可以使用 publisher confirms。 |
4. 消息公平分发
有时,消息分发可能并未按照我们所想,例如,有两个工作进程,当奇数任务耗时较长,而偶数任务耗时较短,那么一个工作进程将非常繁忙而另一个则会很闲。其实,RabbitMQ并不知道这些情况,仍然平衡的分发消息。这是因为RabbitMQ在消息进入队列时才会发送消息,它只是盲目地将第几条消息发送给第几个用户,而不会考虑消费者未确认的消息的数量,如图所示:
其实,我们可以调用basicQos方法来设置prefetchCount参数。这意味着RabbitMQ一次不会给一个消费者一条以上的信息。换言之,RabbitMQ不会向消费者发送新的消息,直到它处理并确认了之前的消息。相反,它会把消息发送给下一个不太忙的消费者。代码如下:
int prefetchCount = 1;
channel.basicQos(prefetchCount);
同样用上边的Woker和NewTask的例子,启动两个Worker,启动NewTask发送5条消息。如果不设置prefetchCount,那么应该的消息执行顺序为:
worker1:First Message、Third Message、Fifth Message worker2:Second Message、Fourth Message
为了验证分发的公平性,这里我修改了NewTask的模拟的消息执行时间(通过修改“.”的数量,每一个“.”表示执行1秒)如下:
String[] msgs = new String[]{
"First Message.", // 1s
"Second Message.", // 1s
"Third Message...", // 3s
"Fourth Message.", // 1s
"Fifth Message." // 1s
};
再次执行后可以看到,执行结果并不是按照轮询的机制,而是RabbitMQ根据任务执行时间进行了公平的分发。
5. 总结
RabbitMQ默认采用轮询的方式进行消息分发
如果消费者设置autoAck=true,则消费者自动接收并处理消息,并将消息标记为删除,可能造成未处理和处理中的消息丢失
如果消费者设置autoAck=false(默认),必须在消息处理成功后手动反馈,即设置channel.basicAck
通过控制台工具rabbitmqctl可以查询队列和队列中的消息
默认情况消息未开启持久化,要开启持久化,必须在创建队列时设置持久化,并且生产者发布消息时设置持久化类型
消息标记为持久化并不能完全保证消息不丢失
轮询的消息分发机制并不公平,可以通过设置channel.basicQos来保证消息未处理完成前不在分发新的消息给当前消费者
示例代码见 github.