前提
本文内容参考RabbitMQ官方文档Direct reply-to。
直接回复
直接回复(Direct reply-to)是一种可以避免声明回复队列并且实现类似于RPC功能的一种特性。RabbitMQ中允许使用客户端和RabbitMQ消息代理中间件实现RPC模式,典型的做法是:RPC客户端发送请求(消息)到一个持久化的已知服务端队列,RPC服务端消费该服务端队列的消息,然后使用消息属性中的reply-to
属性对应的值作为客户端回复队列发送回复消息到RPC客户端。
客户端回复队列需要考虑创建问题。客户端可以为每个请求-响应声明一个一次性的队列,但是这样的做法是十分低效的,因为即使是非持久状态下的非镜像队列,其删除的代价是昂贵的,特别是在集群模式之下。另一个可选的做法是:客户端为回复创建一个持久化的长期存在的队列,这种情况下队列的管理可能变得复杂,因为客户端本身可能不是长期存在的。
实际上,RabbitMQ提供了一个功能,允许RPC客户端直接从其RPC服务端接收回复,并且无需创建回复队列,依赖于RabbitMQ的消息中间件的功能,具体做法是:
对于RPC客户端:
- RPC客户端创建消费者的时候队列指定为伪队列
amq.rabbitmq.reply-to
,使用非手动ack模式(autoAck=true)进行消费,伪队列amq.rabbitmq.reply-to
不需要显式声明,当然如果需要的话也可以显式声明。
- 发布消息的时候,消息属性中的
reply-to
属性需要指定为amq.rabbitmq.reply-to
。
对于RPC服务端:
- RPC服务端接收消息后感知消息属性中的
reply-to
属性存在,它应该通过默认的交换器(名称为””)和reply-to
属性作为路由键发送回复消息,那么该回复消息就会直接投递到RPC客户端的消费者中。
- 如果RPC服务端需要进行一些长时间的计算逻辑,可能需要探测RPC服务端是否存活,可以使用一个一次性使用的信道对
reply-to
属性做一次队列声明,如果声明成功,队列amq.rabbitmq.reply-to
并不会创建,如果声明失败,那么说明客户端已经失去连接。
注意事项:
- RPC客户端在创建伪队列
amq.rabbitmq.reply-to
消费者的时候必须使用非手动ack模式(autoAck=true)。
- 使用此机制发送的回复消息通常不具有容错能力,如果发布原始请求的客户端随后断开连接,它们将被丢弃。
- 伪队列
amq.rabbitmq.reply-to
可以在basic.consume
、basic.publish
和消息属性reply-to
中使用,实际上,它并不是一个真实存在的队列,RabbitMQ的Web管理器或者rabbitmqctl list_queues
命令都无法展示该伪队列的相关属性或者信息。
说实话,个人认为这种方式有个比较多的局限性:
- 同一个应用里面,只能使用唯一一个伪队列
amq.rabbitmq.reply-to
消费回复消息,并且RabbitMQ的Web管理器或者rabbitmqctl list_queues
命令都无法展示该伪队列的相关属性或者信息,也就是无法对它进行监控或者管理。
- 对于多应用同时接进去同一个RabbitMQ消息中间件代理,这些应用之间无法同时使用
amq.rabbitmq.reply-to
这个特性,因为有可能A客户端发送的消息被远程服务回调到另一个不同的B客户端。
直接回复特性使用
使用伪队列amq.rabbitmq.reply-to
的一个例子:
public class ReplyToRawMain extends BaseChannelFactory {
private static final String FAKE_QUEUE = "amq.rabbitmq.reply-to"; private static final String RPC_QUEUE = "rpc.queue"; private static final String DEFAULT_EXCHANGE = "";
public static void main(String[] args) throws Exception { provideChannel(channel -> { channel.queueDeclare(RPC_QUEUE, true, false, false, null); client(channel); server(channel); Thread.sleep(5000); }); }
private static void client(Channel channel) throws Exception { channel.basicConsume(FAKE_QUEUE, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(String.format("[X-Client]\ndeliveryTag:%s\nexchange:%s\nroutingKey:%s\ncorrelationId:%s\nreplyTo:%s\ncontent:%s\n", envelope.getDeliveryTag(), envelope.getExchange(), envelope.getRoutingKey(), properties.getCorrelationId(), properties.getReplyTo(), new String(body, StandardCharsets.UTF_8))); } }); AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder() .correlationId("message-99999") .replyTo(FAKE_QUEUE) .build(); channel.basicPublish(DEFAULT_EXCHANGE, RPC_QUEUE, basicProperties, "Reply Message".getBytes(StandardCharsets.UTF_8)); }
private static void server(Channel channel) throws Exception { channel.basicConsume(RPC_QUEUE, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(String.format("[X-Server]\ndeliveryTag:%s\nexchange:%s\nroutingKey:%s\ncorrelationId:%s\nreplyTo:%s\ncontent:%s\n", envelope.getDeliveryTag(), envelope.getExchange(), envelope.getRoutingKey(), properties.getCorrelationId(), properties.getReplyTo(), new String(body, StandardCharsets.UTF_8))); AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder() .correlationId(properties.getCorrelationId()) .build(); channel.basicPublish(DEFAULT_EXCHANGE, properties.getReplyTo(), basicProperties, body);
} }); } }
|
当然,可以直接创建一个真实的独占队列(生命周期跟客户端的连接绑定)作为回复队列,举个例子:
public class ReplyToMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception { provideChannel(channel -> { channel.queueDeclare("rpc.queue", true, false, false, null);
AMQP.Queue.DeclareOk callback = channel.queueDeclare("", false, true, false, null);
System.out.println("建立排他应答队列:" + callback.getQueue());
channel.basicConsume(callback.getQueue(), false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(String.format("[X-Client]\ndeliveryTag:%s\nroutingKey:%s\ncorrelationId:%s\nreplyTo:%s\ncontent:%s\n", envelope.getDeliveryTag(), envelope.getRoutingKey(), properties.getCorrelationId(), properties.getReplyTo(), new String(body, StandardCharsets.UTF_8))); } });
channel.basicConsume("rpc.queue", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(String.format("[X-Server]\ndeliveryTag:%s\nroutingKey:%s\ncorrelationId:%s\nreplyTo:%s\ncontent:%s\n", envelope.getDeliveryTag(), envelope.getRoutingKey(), properties.getCorrelationId(), properties.getReplyTo(), new String(body, StandardCharsets.UTF_8))); AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder() .correlationId(properties.getCorrelationId()) .build(); channel.basicPublish("", properties.getReplyTo(), basicProperties, body); } });
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder() .correlationId("message-99999") .replyTo(callback.getQueue()) .build(); channel.basicPublish("", "rpc.queue", basicProperties, "Reply Message".getBytes(StandardCharsets.UTF_8));
Thread.sleep(5000); }); } }
|
个人想法
在实际项目中,我们经常被RabbitMQ消息发送是否成功这个问题困扰,一般情况下,我们认为调用basic.publish
只要不抛出异常就是发送消息成功,例如一个代码模板如下:
public boolean sendMessage(){ boolean success = false; try { channel.basicPublish(); success = true; }catch (Exception e){ log.error(); } return success; }
|
这个代码模板在极大多数情况下是合适的,但是有些时候我们确实需要消息的接收方告知发送方已经收到消息,这个时候就需要用到消息的回复功能,个人认为可选的方案有:
- 消息发布方基于伪队列
amq.rabbitmq.reply
进行消费,消息接收方回复到伪队列amq.rabbitmq.reply
上。
- 消息发布方自定义独占队列进行消费,消息接收方回复到此独占队列。
- 消息发布方自定义持久化队列进行消费,消息接收方回复到此持久化队列。
其实,在AMQP.BasicProperties
的replyTo属性中指定需要回复的队列名只是RabbitMQ提出的一种规约或者建议,并不是强制实行的方案,实际上可以自行选择回复队列或者忽略replyTo属性。