前提

本文内容参考RabbitMQ官方文档Direct reply-to

直接回复

直接回复(Direct reply-to)是一种可以避免声明回复队列并且实现类似于RPC功能的一种特性。RabbitMQ中允许使用客户端和RabbitMQ消息代理中间件实现RPC模式,典型的做法是:RPC客户端发送请求(消息)到一个持久化的已知服务端队列,RPC服务端消费该服务端队列的消息,然后使用消息属性中的reply-to属性对应的值作为客户端回复队列发送回复消息到RPC客户端。

客户端回复队列需要考虑创建问题。客户端可以为每个请求-响应声明一个一次性的队列,但是这样的做法是十分低效的,因为即使是非持久状态下的非镜像队列,其删除的代价是昂贵的,特别是在集群模式之下。另一个可选的做法是:客户端为回复创建一个持久化的长期存在的队列,这种情况下队列的管理可能变得复杂,因为客户端本身可能不是长期存在的。

r-m-d-r-1.png

实际上,RabbitMQ提供了一个功能,允许RPC客户端直接从其RPC服务端接收回复,并且无需创建回复队列,依赖于RabbitMQ的消息中间件的功能,具体做法是:

对于RPC客户端:

  • RPC客户端创建消费者的时候队列指定为伪队列amq.rabbitmq.reply-to,使用非手动ack模式(autoAck=true)进行消费,伪队列amq.rabbitmq.reply-to不需要显式声明,当然如果需要的话也可以显式声明。
  • 发布消息的时候,消息属性中的reply-to属性需要指定为amq.rabbitmq.reply-to

对于RPC服务端:

  • RPC服务端接收消息后感知消息属性中的reply-to属性存在,它应该通过默认的交换器(名称为””)和reply-to属性作为路由键发送回复消息,那么该回复消息就会直接投递到RPC客户端的消费者中。
  • 如果RPC服务端需要进行一些长时间的计算逻辑,可能需要探测RPC服务端是否存活,可以使用一个一次性使用的信道对reply-to属性做一次队列声明,如果声明成功,队列amq.rabbitmq.reply-to并不会创建,如果声明失败,那么说明客户端已经失去连接。

注意事项:

  • RPC客户端在创建伪队列amq.rabbitmq.reply-to消费者的时候必须使用非手动ack模式(autoAck=true)。
  • 使用此机制发送的回复消息通常不具有容错能力,如果发布原始请求的客户端随后断开连接,它们将被丢弃。
  • 伪队列amq.rabbitmq.reply-to可以在basic.consumebasic.publish和消息属性reply-to中使用,实际上,它并不是一个真实存在的队列,RabbitMQ的Web管理器或者rabbitmqctl list_queues命令都无法展示该伪队列的相关属性或者信息。

说实话,个人认为这种方式有个比较多的局限性:

  • 同一个应用里面,只能使用唯一一个伪队列amq.rabbitmq.reply-to消费回复消息,并且RabbitMQ的Web管理器或者rabbitmqctl list_queues命令都无法展示该伪队列的相关属性或者信息,也就是无法对它进行监控或者管理。
  • 对于多应用同时接进去同一个RabbitMQ消息中间件代理,这些应用之间无法同时使用amq.rabbitmq.reply-to这个特性,因为有可能A客户端发送的消息被远程服务回调到另一个不同的B客户端。

直接回复特性使用

使用伪队列amq.rabbitmq.reply-to的一个例子:

public class ReplyToRawMain extends BaseChannelFactory {

private static final String FAKE_QUEUE = "amq.rabbitmq.reply-to";
private static final String RPC_QUEUE = "rpc.queue";
private static final String DEFAULT_EXCHANGE = "";

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
// 服务端队列
channel.queueDeclare(RPC_QUEUE, true, false, false, null);
client(channel);
server(channel);
Thread.sleep(5000);
});
}

private static void client(Channel channel) throws Exception {
// 客户端消费 - no-ack,也就是autoAck = true
channel.basicConsume(FAKE_QUEUE, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(String.format("[X-Client]\ndeliveryTag:%s\nexchange:%s\nroutingKey:%s\ncorrelationId:%s\nreplyTo:%s\ncontent:%s\n",
envelope.getDeliveryTag(), envelope.getExchange(), envelope.getRoutingKey(), properties.getCorrelationId(),
properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
}
});
// 客户端发送
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.correlationId("message-99999")
.replyTo(FAKE_QUEUE)
.build();
channel.basicPublish(DEFAULT_EXCHANGE, RPC_QUEUE, basicProperties, "Reply Message".getBytes(StandardCharsets.UTF_8));
}

private static void server(Channel channel) throws Exception {
// 服务端消费
channel.basicConsume(RPC_QUEUE, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(String.format("[X-Server]\ndeliveryTag:%s\nexchange:%s\nroutingKey:%s\ncorrelationId:%s\nreplyTo:%s\ncontent:%s\n",
envelope.getDeliveryTag(), envelope.getExchange(), envelope.getRoutingKey(), properties.getCorrelationId(),
properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
// 服务端应答->客户端
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish(DEFAULT_EXCHANGE, properties.getReplyTo(), basicProperties, body);

}
});
}
}

当然,可以直接创建一个真实的独占队列(生命周期跟客户端的连接绑定)作为回复队列,举个例子:

public class ReplyToMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
// 服务端队列
channel.queueDeclare("rpc.queue", true, false, false, null);

// 客户端接收应答队列 - 排他队列,生命周期和连接绑定
AMQP.Queue.DeclareOk callback = channel.queueDeclare("", false, true, false, null);

System.out.println("建立排他应答队列:" + callback.getQueue());

// 客户端消费
channel.basicConsume(callback.getQueue(), false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(String.format("[X-Client]\ndeliveryTag:%s\nroutingKey:%s\ncorrelationId:%s\nreplyTo:%s\ncontent:%s\n",
envelope.getDeliveryTag(), envelope.getRoutingKey(), properties.getCorrelationId(),
properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
}
});

// 服务端消费
channel.basicConsume("rpc.queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(String.format("[X-Server]\ndeliveryTag:%s\nroutingKey:%s\ncorrelationId:%s\nreplyTo:%s\ncontent:%s\n",
envelope.getDeliveryTag(), envelope.getRoutingKey(), properties.getCorrelationId(),
properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
// 服务端应答
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", properties.getReplyTo(), basicProperties, body);
}
});

// 客户端发送
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.correlationId("message-99999")
.replyTo(callback.getQueue())
.build();
channel.basicPublish("", "rpc.queue", basicProperties, "Reply Message".getBytes(StandardCharsets.UTF_8));

Thread.sleep(5000);
});
}
}

个人想法

在实际项目中,我们经常被RabbitMQ消息发送是否成功这个问题困扰,一般情况下,我们认为调用basic.publish只要不抛出异常就是发送消息成功,例如一个代码模板如下:

public boolean sendMessage(){
boolean success = false;
try {
channel.basicPublish();
// 发送成功
success = true;
}catch (Exception e){
// 发送失败
log.error();
}
return success;
}

这个代码模板在极大多数情况下是合适的,但是有些时候我们确实需要消息的接收方告知发送方已经收到消息,这个时候就需要用到消息的回复功能,个人认为可选的方案有:

  • 消息发布方基于伪队列amq.rabbitmq.reply进行消费,消息接收方回复到伪队列amq.rabbitmq.reply上。
  • 消息发布方自定义独占队列进行消费,消息接收方回复到此独占队列。
  • 消息发布方自定义持久化队列进行消费,消息接收方回复到此持久化队列。

其实,AMQP.BasicProperties的replyTo属性中指定需要回复的队列名只是RabbitMQ提出的一种规约或者建议,并不是强制实行的方案,实际上可以自行选择回复队列或者忽略replyTo属性