RabbitMQ基础(六)——实现RPC

第二篇中我们学习了如何在多个worker中使用工作队列来分配耗时的任务。现在,假设我们需要运行一个远程计算机上的一个方法并等待其返回结果,那么我们怎么实现?通常,这个过程被称为RPC(远程过程调用Remote Procedure Call)。 百度百科对RPC的介绍: RPC(Remote Procedure Call)--远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。 RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。 有多种 RPC模式和执行。最初由 Sun 公司提出。IETF ONC 宪章重新修订了 Sun 版本,使得 ONC RPC 协议成为 IETF 标准协议。现在使用最普遍的模式和执行是开放式软件基础的分布式计算环境(DCE)。 这里我们不过多讨论RPC相关的内容,有兴趣的可以查阅相关资料。 现在,我们将要使用RabbitMQ来构建一个RPC系统,包括可扩展的RPC客户端和服务端。由于我们并没有耗时的任务,所以们将模拟一些RPC服务。 1. Message properties Message properties即消息属性,AMQP-0-9-1预定义了14项消息属性,但是大部分都很少使用,常用的属性有如下几项: deliveryMode:标记消息是否持久化,值为2则持久化,其他为瞬态消息; contentType:描述编码的mime类型(mime-type),例如常用的JSON编码格式:application/json; replyTo:命名回调队列 correlationId:用于关联RPC的请求和响应 接下来,我们看看如何通过消息属性来设置回调队列和关联ID(correlationId)。 2. 回调队列 通常,在RabbitMQ上实现RPC很简单,客户端发送消息,而服务端响应消息即可。为了接收响应信息,客户端需要在请求中发送回调队列地址给服务端,告诉服务端我使用这个队列来接收消息,你将返回消息发送到这个队列即可。我们可以使用默认的队列(在java客户端中是独占的)。 callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); 3. Correlation Id 上边的方法中,我们为每一个RPC客户端创建了回调队列,这是非常低效的。有没有更好的方式:为每一个客户端创建一个共享的回调队列?这样势必又带来新的问题:共享一个回调队列,请求和响应之间的对应关系并不明确。 CorrelationId属性解决了这个问题。 大致思路是这样:为每个请求设置一个 唯一的标识,然后通过回调队列接收响应消息时,获取这个唯一标识,如果这个标识和之前设定的相同,那么说明响应的确是当前请求的响应,可以获取请求的响应结果了。如果获取到的唯一标示并非我们所设定,说明它并不属于我们的任何请求,那么就可以丢弃这个消息。 为什么我们应该忽略回调队列中的未知消息,而不是进行失败处理?这是由于服务器端可能出现竞态条件。可能发生这样的情况,RPC服务器发送了响应消息后挂掉了,但是请求确认消息(ack)还没有发送,那么,重启后的RPC服务器将再次处理请求。这就是为什么在客户端我们必须优雅地处理重复的响应,保持RPC的幂等性。 消息定义correlationId代码: // 设置消息属性,响应后发送到回调队列中 final String correlationId = UUID.randomUUID().toString(); // 随机生成唯一标识 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .correlationId(correlationId) // 每次请求都设定唯一标识,该标识用于将请求和响应进行匹配 .replyTo(callbackQueueName) // 回调队列 .build(); ...

2018-03-28 · 2 min · 355 words · Hank