前提

本文来源于官方文档Consumer Prefetch

消费者消息预读取

消费者消息预读取是一个更加合理和高效的限制未确认消息数量的解决方式。

AMQP 0-9-1协议中定义了basic.qos方法用于限制信道或者连接上的未确认消息数量,这个消息数据量命名为prefetch_count。不幸的是,信道其实并不是限制未确认消息数量的理想范畴,因为单个信道有可能有多个消费者订阅多个不同的队列,所以信道和队列需要为发送的每个消息相互协调,以确保消息总数量不超过限制,造成了性能下降,单机性能出现瓶颈,在集群方案中耗时更加严重。

basic.qos定义了两个属性:

  • prefetch_count:预读取消息的数量。
  • global:是否全局的。

在许多情况下,指定每个消费者的预读取消息数量更加合理。因此,RabbitMQ在basic.qos方法中重新定义了global标志的含义:

global的值 prefetch_count在AMQP 0-9-1中的含义 prefetch_count在RabbitMQ中的含义
false 同一个信道上的消费者共享 单独应用于信道上的每个新消费者
true 所有消费者基于同一个连接共享 同一个信道上的消费者共享

basic.qos方法在RabbitMQ的Java驱动中对应三个方法:

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

// prefetchSize = 0
void basicQos(int prefetchCount, boolean global) throws IOException;

// prefetchSize = 0 , global = false
void basicQos(int prefetchCount) throws IOException;
  • prefetchSize:预读取的消息内容大小上限(包含),可以简单理解为消息有效载荷字节数组的最大长度限制,0表示无上限。
  • prefetchCount:预读取的消息数量上限,0表示无上限。
  • global:false表示prefetchCount单独应用于信道上的每个新消费者,true表示prefetchCount在同一个信道上的消费者共享。

限制单个消费者

public class BasicQosSingle extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.basicQos(10); //基于消费者进行限制
channel.basicConsume("throwable.queue.direct",new DefaultConsumer(channel){});
});
}
}

此消费者最多只能有10条预读取的未确认的消息。

独立限制多个消费者

基于同一个信道对多个队列建立不同的消费者:

public class BasicQosMulti extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
DefaultConsumer consumer1 = new DefaultConsumer(channel) {};
DefaultConsumer consumer2 = new DefaultConsumer(channel) {};
channel.basicQos(10); //基于消费者进行限制
channel.basicConsume("throwable.queue.direct",consumer1);
channel.basicConsume("throwable.queue.fanout",consumer2);
});
}
}

每个费者最多只能有10条预读取的未确认的消息。

基于共享限制多个消费者

AMQP规范没有解释如果使用不同的global多次调用basic.qos会发生什么,RabbitMQ将此解释为意味着两个预取限制应该彼此独立地强制执行。

public class BasicQosShare extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
DefaultConsumer consumer1 = new DefaultConsumer(channel) {};
DefaultConsumer consumer2 = new DefaultConsumer(channel) {};
channel.basicQos(10, false); //基于消费者进行限制
channel.basicQos(15, true); //基于信道进行限制
channel.basicConsume("throwable.queue.direct",consumer1);
channel.basicConsume("throwable.queue.fanout",consumer2);
});
}
}

上面的代码表示:

  • 两个消费者consumer1和consumer2基于信道最多只能有15条未确认的预读取消息。
  • 消费者consumer1和consumer2自身最多只能有10条未确认的预读取消息。

也就是有双重限制,这种限制需要信道和队列之间协调,会耗费额外的性能。

消息预读取的意义

消息预读取可以理解为RabbitMQ Broker把未确认的消息批量推送到RabbitMQ的Java客户端中,由客户端先缓存这些消息,然后投递到消费者中。试想,如果在推模式下,没有消息预读取功能,RabbitMQ Broker每次投递一条消息到客户端消费者中,这样就会产生大量的IO操作,导致性能下降,此外,消费者处理速度有可能比较快,容易产生消费者饥饿的情况。可以根据消费者实际的消费速度和消息发布的速度,对消费者的预读取未确认消息的上限进行配置,这样在大多数场景下可以提高消费者的性能。