第二篇中我们学习了如何在多个worker中使用工作队列来分配耗时的任务。现在,假设我们需要运行一个远程计算机上的一个方法并等待其返回结果,那么我们怎么实现?通常,这个过程被称为RPC(远程过程调用Remote Procedure Call)。
百度百科对RPC的介绍:
RPC(Remote Procedure Call)--远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。 RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。 有多种 RPC模式和执行。最初由 Sun 公司提出。IETF ONC 宪章重新修订了 Sun 版本,使得 ONC RPC 协议成为 IETF 标准协议。现在使用最普遍的模式和执行是开放式软件基础的分布式计算环境(DCE)。
这里我们不过多讨论RPC相关的内容,有兴趣的可以查阅相关资料。
现在,我们将要使用RabbitMQ来构建一个RPC系统,包括可扩展的RPC客户端和服务端。由于我们并没有耗时的任务,所以们将模拟一些RPC服务。
1. Message properties
Message properties即消息属性,AMQP-0-9-1预定义了14项消息属性,但是大部分都很少使用,常用的属性有如下几项:
deliveryMode:标记消息是否持久化,值为2则持久化,其他为瞬态消息;
contentType:描述编码的mime类型(mime-type),例如常用的JSON编码格式:application/json;
replyTo:命名回调队列
correlationId:用于关联RPC的请求和响应
接下来,我们看看如何通过消息属性来设置回调队列和关联ID(correlationId)。
2. 回调队列
通常,在RabbitMQ上实现RPC很简单,客户端发送消息,而服务端响应消息即可。为了接收响应信息,客户端需要在请求中发送回调队列地址给服务端,告诉服务端我使用这个队列来接收消息,你将返回消息发送到这个队列即可。我们可以使用默认的队列(在java客户端中是独占的)。
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
3. Correlation Id
上边的方法中,我们为每一个RPC客户端创建了回调队列,这是非常低效的。有没有更好的方式:为每一个客户端创建一个共享的回调队列?这样势必又带来新的问题:共享一个回调队列,请求和响应之间的对应关系并不明确。
CorrelationId属性解决了这个问题。
大致思路是这样:为每个请求设置一个 唯一的标识,然后通过回调队列接收响应消息时,获取这个唯一标识,如果这个标识和之前设定的相同,那么说明响应的确是当前请求的响应,可以获取请求的响应结果了。如果获取到的唯一标示并非我们所设定,说明它并不属于我们的任何请求,那么就可以丢弃这个消息。
为什么我们应该忽略回调队列中的未知消息,而不是进行失败处理?这是由于服务器端可能出现竞态条件。可能发生这样的情况,RPC服务器发送了响应消息后挂掉了,但是请求确认消息(ack)还没有发送,那么,重启后的RPC服务器将再次处理请求。这就是为什么在客户端我们必须优雅地处理重复的响应,保持RPC的幂等性。
消息定义correlationId代码:
// 设置消息属性,响应后发送到回调队列中
final String correlationId = UUID.randomUUID().toString(); // 随机生成唯一标识
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.correlationId(correlationId) // 每次请求都设定唯一标识,该标识用于将请求和响应进行匹配
.replyTo(callbackQueueName) // 回调队列
.build();
4. 示例
接下来,我们用RabbitMQ来实现一个计算斐波那契数列中第几个数的值的程序,包括两个端:
服务端:获取客户端传递的参数(第几个值),来计算其对应的斐波那契数列值;
客户端:传递参数(第几个值),RPC请求服务端,并获取服务端计算的结果。
4.1. 斐波那契数列
有几个关键点需要理解:
首先,服务端和客户端即是生产者也是消费者,他们使用名为rpc_queue的队列来交换信息(客户端发送消息到这个队列,服务端从该队列获取消息);
其次,客户端要从服务端获取响应结果,需要创建回调队列并告诉服务端:我想要使用这个队列来获取你的响应消息,你准备好后就把响应结果发送到这个队列;
第三,客户端请求和与之对应的响应需要通过correlationId来进行关联,所以客户端每次生产一个唯一的correlationId,并且将其发送给服务端;服务端在响应时同样将这个correlationId原样发回给客户端,客户端通过比对请求前和响应的correlationId,相同的则说明正确获得了本次请求的响应,就可以处理响应结果;
第四,回调队列、correlationId分别通过AMQP.BasicProperties的replyTo和correlationId来设置。
整个过程如下图所示:
4.2. 客户端FibonacciRpcClient
当客户端启动时,它创建了名称随机的独占的队列作为回调队列:
// 创建默认的回调队列
String callbackQueueName = channel.queueDeclare().getQueue();
使用UUID作为correlationId:
// 设置消息属性,响应后发送到回调队列中
final String correlationId = UUID.randomUUID().toString(); // 随机生成唯一标识
客户端通过设置properties的replyTo(回调队列)和correlationId(关联唯一标识),然后将消息发送到rpc_queue队列中;
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.correlationId(correlationId) // 每次请求都设定唯一标识,该标识用于将请求和响应进行匹配
.replyTo(callbackQueueName) // 回调队列
.build();
// 发送消息
String msg = String.valueOf(n); // 消息为斐波那契数列的第几个数
channel.basicPublish("", "rpc_queue", properties, msg.getBytes("utf-8"));
获取服务端的响应信息
// 容量设置为1,在成功获取响应信息之前,其take方法会一直阻塞以等待结果
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
// 获取响应
channel.basicConsume(callbackQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 响应回来的correlationId与生成的匹配,说明是本次请求的响应
if (correlationId.equals(respCorrelationId)) {
response.offer(new String(body, "utf-8"));
}
}
});
// 阻塞
System.out.println("计算结果中...");
return Long.parseLong(response.take());
同上边的分析一样,客户端在获取了服务端响应后,通过比对请求前设置的correlationId和服务端发回来的correlationId,相同则说明是本次请求的响应,否则客户端会丢弃该响应消息。
这里使用了ArrayBlockingQueue类,其容量设置为1,用于模拟等待服务器响应的阻塞过程,当服务器还没有响应消息时,那么调用其take()方法线程会阻塞,直到服务器响应了消息并放入到response中。
客户端的请求方法如下:
FibonacciRpcClient client = new FibonacciRpcClient();
long n = 10;
long result = client.call(n);
System.out.println("斐波那契数列的第[" + n + "]个数是:" + result);
client.close();
注意:不要试图计算较大的值,因为会很消耗资源,上了50计算就很慢了。
4.3. 服务端FibonacciRpcServer
创建于客户端相同的队列来接收请求:
``static String rpcQueueName = "rpc_queue";``
channel.queueDeclare(rpcQueueName, false, false, false, null);
channel.basicQos(1);
System.out.println("等待RPC请求");
服务端等待客户端发送请求消息到rpc_queue队列,作为消费者接收消息:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
……
}
};
channel.basicConsume(rpcQueueName, false, consumer);
在这个消费回调方法handleDelivery内部,服务端需要处理计算逻辑。当请求成功时,服务端获取请求参数并进行计算,完成后,将结果发送到客户端传来的回调队列中(从replyTo获取)。
获取客户端请求参数:
// 接收到的请求消息,即传递数列的第n个数
String msg = new String(body, "utf-8");
long n = Long.parseLong(msg);
System.out.println("请求的参数为:" + n);
计算斐波那契数列的值:
long result = fib(n);
System.out.println("斐波那契数列的第[" + n + "]个数为:" + result);
String response = String.valueOf(result);
这里的fib方法很简单:
/**
* 计算斐波那契数列中第n个数的值。
*
* @param n 第几个数,从1开始
* @return 整数值
*/
private static long fib(long n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fib(n - 1) + fib(n - 2);
}
ok,现在计算完成了,得到了返回值response,那么现在该想客户端发送这个返回值了。前边说过,返回值发送到哪个队列是由客户端告诉服务端的(通过replyTo设置回调队列)。那么我们就可以通过handleDelivery回传的AMQP.BasicProperties来获取客户端设置的消息属性值,同时,还需要将客户端设置的correlationId回传给客户端,用于其辨别请求和响应的关联关系:
// 参数properties持有客户端请求设定的属性参数,包括replyTo、correlationId等
// 将请求的correlationId作为响应的属性,传递给客户端,用于匹配请求和响应
AMQP.BasicProperties replyProps = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("utf-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
完整代码见 FibonacciRpcClient和 FibonacciRpcServer。
5. 总结
本章简单介绍了如何使用RabbitMQ来实现RPC,关键是使用AMQP.BasicProperties定义的消息属性。
1、AMQP0-9-1预定了14中消息属性,不过常用的只有几种,在java中通过AMQP.BasicProperties访问和设置;
2、客户端通过消息属性的replyTo属性来设置回调队列,服务端获取并将响应消息发送到回调队列;
3、客户端请求和响应的对应关系需要通过比对correlationId来实现。