前提 前一篇文章通过Redis
的有序集合Sorted Set
和调度框架Quartz
实例一版简单的延时任务,但是有两个相对重要的问题没有解决:
分片。
监控。
这篇文章的内容就是要完善这两个方面的功能。前置文章:使用Redis实现延时任务(一) 。
为什么需要分片 这里重新贴一下查询脚本dequeue.lua
的内容:
local zset_key = KEYS[1 ]local hash_key = KEYS[2 ]local min_score = ARGV[1 ]local max_score = ARGV[2 ]local offset = ARGV[3 ]local limit = ARGV[4 ]local status , type = next (redis.call('TYPE' , zset_key))if status ~= nil and status == 'ok' then if type == 'zset' then local list = redis.call('ZREVRANGEBYSCORE' , zset_key, max_score, min_score, 'LIMIT' , offset, limit) if list ~= nil and #list > 0 then redis.call('ZREM' , zset_key, unpack (list)) local result = redis.call('HMGET' , hash_key, unpack (list)) redis.call('HDEL' , hash_key, unpack (list)) return result end end end return nil
这个脚本一共用到了四个命令ZREVRANGEBYSCORE
、ZREM
、HMGET
和HDEL
(TYPE
命令的时间复杂度可以忽略):
命令
时间复杂度
参数说明
ZREVRANGEBYSCORE
O(log(N)+M)
N
是有序集合中的元素总数,M
是返回的元素的数量
ZREM
O(M*log(N))
N
是有序集合中的元素总数,M
是成功移除的元素的数量
HMGET
O(L)
L
是成功返回的域的数量
HDEL
O(L)
L
是要删除的域的数量
接下来需要结合场景和具体参数分析,假如在生产环境,有序集合的元素总量维持在10000每小时(也就是说业务量是每小时下单1万笔),由于查询Sorted Set
和Hash
的数据同时做了删除,那么30分钟内常驻在这两个集合中的数据有5000条,也就是上面表中的N = 5000
。假设我们初步定义查询的LIMIT
值为100,也就是上面的M
值为100,假设Redis
中每个操作单元的耗时简单认为是T
,那么分析一下5000条数据处理的耗时:
序号
集合基数
ZREVRANGEBYSCORE
ZREM
HMGET
HDEL
1
5000
log(5000T) + 100T
log(5000T) * 100
100T
100T
2
4900
log(4900T) + 100T
log(4900T) * 100
100T
100T
3
4800
log(4800T) + 100T
log(4800T) * 100
100T
100T
…
…
…
…
…
…
理论上,脚本用到的四个命令中,ZREM
命令的耗时是最大的,而ZREVRANGEBYSCORE
和ZREM
的时间复杂度函数都是M * log(N)
,因此控制集合元素基数N
对于降低Lua
脚本运行的耗时是有一定帮助的。
分片 上面分析了dequeue.lua
的时间复杂度,准备好的分片方案有两个:
方案一:单Redis
实例,对Sorted Set
和Hash
两个集合的数据进行分片。
方案二:基于多个Redis
实例(可以是哨兵或者集群),实施方案一的分片操作。
**为了简单起见,后面的例子中分片的数量(shardingCount
)设计为2 **,生产中分片数量应该根据实际情况定制。预设使用长整型的用户ID字段userId
取模进行分片,假定测试数据中的userId
是均匀分布的。
通用实体:
@Data public class OrderMessage { private String orderId; private BigDecimal amount; private Long userId; private String timestamp; }
延迟队列接口:
public interface OrderDelayQueue { void enqueue (OrderMessage message) ; List<OrderMessage> dequeue (String min, String max, String offset, String limit, int index) ; List<OrderMessage> dequeue (int index) ; String enqueueSha () ; String dequeueSha () ; }
单Redis实例分片 单Redis
实例分片比较简单,示意图如下:
编写队列实现代码如下(部分参数写死,仅供参考,切勿照搬到生产中 ):
@RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue , InitializingBean { private static final String MIN_SCORE = "0" ; private static final String OFFSET = "0" ; private static final String LIMIT = "10" ; private static final long SHARDING_COUNT = 2L ; private static final String ORDER_QUEUE_PREFIX = "ORDER_QUEUE_" ; private static final String ORDER_DETAIL_QUEUE_PREFIX = "ORDER_DETAIL_QUEUE_" ; private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua" ; private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua" ; private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>(); private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>(); private final JedisProvider jedisProvider; @Override public void enqueue (OrderMessage message) { List<String> args = Lists.newArrayList(); args.add(message.getOrderId()); args.add(String.valueOf(System.currentTimeMillis())); args.add(message.getOrderId()); args.add(JSON.toJSONString(message)); List<String> keys = Lists.newArrayList(); long index = message.getUserId() % SHARDING_COUNT; keys.add(ORDER_QUEUE_PREFIX + index); keys.add(ORDER_DETAIL_QUEUE_PREFIX + index); try (Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), keys, args); } } @Override public List<OrderMessage> dequeue (int index) { String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000 ); return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index); } @SuppressWarnings("unchecked") @Override public List<OrderMessage> dequeue (String min, String max, String offset, String limit, int index) { List<String> args = new ArrayList<>(); args.add(min); args.add(max); args.add(offset); args.add(limit); List<OrderMessage> result = Lists.newArrayList(); List<String> keys = Lists.newArrayList(); keys.add(ORDER_QUEUE_PREFIX + index); keys.add(ORDER_DETAIL_QUEUE_PREFIX + index); try (Jedis jedis = jedisProvider.provide()) { List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), keys, args); if (null != eval) { for (String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); } } } return result; } @Override public String enqueueSha () { return ENQUEUE_LUA_SHA.get(); } @Override public String dequeueSha () { return DEQUEUE_LUA_SHA.get(); } @Override public void afterPropertiesSet () throws Exception { loadLuaScript(); } private void loadLuaScript () throws Exception { try (Jedis jedis = jedisProvider.provide()) { ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION); String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); String sha = jedis.scriptLoad(luaContent); ENQUEUE_LUA_SHA.compareAndSet(null , sha); resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION); luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); sha = jedis.scriptLoad(luaContent); DEQUEUE_LUA_SHA.compareAndSet(null , sha); } } }
消费者定时任务的实现如下:
DisallowConcurrentExecution @Component public class OrderMessageConsumer implements Job { private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class); private static final AtomicInteger COUNTER = new AtomicInteger(); private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> { Thread thread = new Thread(r); thread.setDaemon(true ); thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement()); return thread; }); @Autowired private OrderDelayQueue orderDelayQueue; @Override public void execute (JobExecutionContext context) throws JobExecutionException { int shardingIndex = context.getMergedJobDataMap().getInt("shardingIndex" ); LOGGER.info("订单消息消费者定时任务开始执行,shardingIndex:[{}]..." , shardingIndex); List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex); if (null != dequeue) { final CountDownLatch latch = new CountDownLatch(1 ); BUSINESS_WORKER_POOL.execute(new ConsumeTask(latch, dequeue, shardingIndex)); try { latch.await(); } catch (InterruptedException ignore) { } } LOGGER.info("订单消息消费者定时任务执行完毕,shardingIndex:[{}]..." , shardingIndex); } @RequiredArgsConstructor private static class ConsumeTask implements Runnable { private final CountDownLatch latch; private final List<OrderMessage> messages; private final int shardingIndex; @Override public void run () { try { for (OrderMessage message : messages) { LOGGER.info("shardingIndex:[{}],处理订单消息,内容:{}" , shardingIndex, JSON.toJSONString(message)); TimeUnit.MILLISECONDS.sleep(50 ); } } catch (Exception ignore) { } finally { latch.countDown(); } } } }
启动定时任务和写入测试数据的CommandLineRunner
实现如下:
@Component public class QuartzJobStartCommandLineRunner implements CommandLineRunner { @Autowired private Scheduler scheduler; @Autowired private JedisProvider jedisProvider; @Override public void run (String... args) throws Exception { int shardingCount = 2 ; prepareOrderMessageData(shardingCount); for (ConsumerTask task : prepareConsumerTasks(shardingCount)) { scheduler.scheduleJob(task.getJobDetail(), task.getTrigger()); } } private void prepareOrderMessageData (int shardingCount) throws Exception { DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS" ); try (Jedis jedis = jedisProvider.provide()) { List<OrderMessage> messages = Lists.newArrayList(); for (int i = 0 ; i < 100 ; i++) { OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(i)); message.setOrderId("ORDER_ID_" + i); message.setUserId((long ) i); message.setTimestamp(LocalDateTime.now().format(f)); messages.add(message); } for (OrderMessage message : messages) { Double score = Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000 )); long index = message.getUserId() % shardingCount; jedis.hset("ORDER_DETAIL_QUEUE_" + index, message.getOrderId(), JSON.toJSONString(message)); jedis.zadd("ORDER_QUEUE_" + index, score, message.getOrderId()); } } } private List<ConsumerTask> prepareConsumerTasks (int shardingCount) { List<ConsumerTask> tasks = Lists.newArrayList(); for (int i = 0 ; i < shardingCount; i++) { JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class) .withIdentity("OrderMessageConsumer-" + i, "DelayTask" ) .usingJobData("shardingIndex" , i) .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask" ) .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10 ).repeatForever()) .build(); tasks.add(new ConsumerTask(jobDetail, trigger)); } return tasks; } @Getter @RequiredArgsConstructor private static class ConsumerTask { private final JobDetail jobDetail; private final Trigger trigger; } }
启动应用,输出如下:
2019-08-28 00:13:20.648 INFO 50248 --- [ main] c.t.s.s.NoneJdbcSpringApplication : Started NoneJdbcSpringApplication in 1.35 seconds (JVM running for 5.109) 2019-08-28 00:13:20.780 INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer : 订单消息消费者定时任务开始执行,shardingIndex:[0]... 2019-08-28 00:13:20.781 INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer : 订单消息消费者定时任务开始执行,shardingIndex:[1]... 2019-08-28 00:13:20.788 INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[1],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","timestamp":"2019-08-28 00:13:20.657","userId":99} 2019-08-28 00:13:20.788 INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[0],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","timestamp":"2019-08-28 00:13:20.657","userId":98} 2019-08-28 00:13:20.840 INFO 50248 --- [onsumerWorker-1] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[1],处理订单消息,内容:{"amount":97,"orderId":"ORDER_ID_97","timestamp":"2019-08-28 00:13:20.657","userId":97} 2019-08-28 00:13:20.840 INFO 50248 --- [onsumerWorker-0] c.t.s.sharding.OrderMessageConsumer : shardingIndex:[0],处理订单消息,内容:{"amount":96,"orderId":"ORDER_ID_96","timestamp":"2019-08-28 00:13:20.657","userId":96} // ... 省略大量输出 2019-08-28 00:13:21.298 INFO 50248 --- [ryBean_Worker-1] c.t.s.sharding.OrderMessageConsumer : 订单消息消费者定时任务执行完毕,shardingIndex:[0]... 2019-08-28 00:13:21.298 INFO 50248 --- [ryBean_Worker-2] c.t.s.sharding.OrderMessageConsumer : 订单消息消费者定时任务执行完毕,shardingIndex:[1]... // ... 省略大量输出
多Redis实例分片 单Redis
实例分片其实存在一个问题,就是Redis
实例总是单线程处理客户端的命令,即使客户端是多个线程执行Redis
命令,示意图如下:
这种情况下,虽然通过分片降低了Lua
脚本命令的复杂度,但是Redis
的命令处理模型(单线程 )也有可能成为另一个性能瓶颈隐患。因此,可以考虑基于多Redis
实例进行分片。
这里为了简单起见,用两个单点的Redis
实例做编码示例。代码如下:
@Component public class JedisProvider implements InitializingBean { private final Map<Long, JedisPool> pools = Maps.newConcurrentMap(); private JedisPool defaultPool; @Override public void afterPropertiesSet () throws Exception { JedisPool pool = new JedisPool("localhost" ); defaultPool = pool; pools.put(0L , pool); pool = new JedisPool("192.168.56.200" ); pools.put(1L , pool); } public Jedis provide (Long index) { return pools.getOrDefault(index, defaultPool).getResource(); } } @Data public class OrderMessage { private String orderId; private BigDecimal amount; private Long userId; } public interface OrderDelayQueue { void enqueue (OrderMessage message) ; List<OrderMessage> dequeue (String min, String max, String offset, String limit, long index) ; List<OrderMessage> dequeue (long index) ; String enqueueSha (long index) ; String dequeueSha (long index) ; } @RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue , InitializingBean { private static final String MIN_SCORE = "0" ; private static final String OFFSET = "0" ; private static final String LIMIT = "10" ; private static final long SHARDING_COUNT = 2L ; private static final String ORDER_QUEUE = "ORDER_QUEUE" ; private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE" ; private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua" ; private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua" ; private static final ConcurrentMap<Long, String> ENQUEUE_LUA_SHA = Maps.newConcurrentMap(); private static final ConcurrentMap<Long, String> DEQUEUE_LUA_SHA = Maps.newConcurrentMap(); private final JedisProvider jedisProvider; @Override public void enqueue (OrderMessage message) { List<String> args = Lists.newArrayList(); args.add(message.getOrderId()); args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000 )); args.add(message.getOrderId()); args.add(JSON.toJSONString(message)); List<String> keys = Lists.newArrayList(); long index = message.getUserId() % SHARDING_COUNT; keys.add(ORDER_QUEUE); keys.add(ORDER_DETAIL_QUEUE); try (Jedis jedis = jedisProvider.provide(index)) { jedis.evalsha(ENQUEUE_LUA_SHA.get(index), keys, args); } } @Override public List<OrderMessage> dequeue (long index) { String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000 ); return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT, index); } @SuppressWarnings("unchecked") @Override public List<OrderMessage> dequeue (String min, String max, String offset, String limit, long index) { List<String> args = new ArrayList<>(); args.add(min); args.add(max); args.add(offset); args.add(limit); List<OrderMessage> result = Lists.newArrayList(); List<String> keys = Lists.newArrayList(); keys.add(ORDER_QUEUE); keys.add(ORDER_DETAIL_QUEUE); try (Jedis jedis = jedisProvider.provide(index)) { List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args); if (null != eval) { for (String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); } } } return result; } @Override public String enqueueSha (long index) { return ENQUEUE_LUA_SHA.get(index); } @Override public String dequeueSha (long index) { return DEQUEUE_LUA_SHA.get(index); } @Override public void afterPropertiesSet () throws Exception { loadLuaScript(); } private void loadLuaScript () throws Exception { for (long i = 0 ; i < SHARDING_COUNT; i++) { try (Jedis jedis = jedisProvider.provide(i)) { ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION); String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); String sha = jedis.scriptLoad(luaContent); ENQUEUE_LUA_SHA.put(i, sha); resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION); luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); sha = jedis.scriptLoad(luaContent); DEQUEUE_LUA_SHA.put(i, sha); } } } } public class OrderMessageConsumer implements Job { private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class); private static final AtomicInteger COUNTER = new AtomicInteger(); private final ExecutorService businessWorkerPool = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setDaemon(true ); thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement()); return thread; }); @Autowired private OrderDelayQueue orderDelayQueue; @Override public void execute (JobExecutionContext context) throws JobExecutionException { long shardingIndex = context.getMergedJobDataMap().getLong("shardingIndex" ); LOGGER.info("订单消息消费者定时任务开始执行,shardingIndex:[{}]..." , shardingIndex); List<OrderMessage> dequeue = orderDelayQueue.dequeue(shardingIndex); if (null != dequeue) { final CountDownLatch latch = new CountDownLatch(1 ); businessWorkerPool.execute(new ConsumeTask(latch, dequeue, shardingIndex)); try { latch.await(); } catch (InterruptedException ignore) { } } LOGGER.info("订单消息消费者定时任务执行完毕,shardingIndex:[{}]..." , shardingIndex); } @RequiredArgsConstructor private static class ConsumeTask implements Runnable { private final CountDownLatch latch; private final List<OrderMessage> messages; private final long shardingIndex; @Override public void run () { try { for (OrderMessage message : messages) { LOGGER.info("shardingIndex:[{}],处理订单消息,内容:{}" , shardingIndex, JSON.toJSONString(message)); TimeUnit.MILLISECONDS.sleep(50 ); } } catch (Exception ignore) { } finally { latch.countDown(); } } } } @Configuration public class QuartzConfiguration { @Bean public AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory () { return new AutowiredSupportQuartzJobFactory(); } @Bean public SchedulerFactoryBean schedulerFactoryBean (AutowiredSupportQuartzJobFactory autowiredSupportQuartzJobFactory) { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setSchedulerName("RamScheduler" ); factory.setAutoStartup(true ); factory.setJobFactory(autowiredSupportQuartzJobFactory); return factory; } public static class AutowiredSupportQuartzJobFactory extends AdaptableJobFactory implements BeanFactoryAware { private AutowireCapableBeanFactory autowireCapableBeanFactory; @Override public void setBeanFactory (BeanFactory beanFactory) throws BeansException { this .autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory; } @Override protected Object createJobInstance (@Nonnull TriggerFiredBundle bundle) throws Exception { Object jobInstance = super .createJobInstance(bundle); autowireCapableBeanFactory.autowireBean(jobInstance); return jobInstance; } } } @Component public class QuartzJobStartCommandLineRunner implements CommandLineRunner { @Autowired private Scheduler scheduler; @Autowired private JedisProvider jedisProvider; @Override public void run (String... args) throws Exception { long shardingCount = 2 ; prepareData(shardingCount); for (ConsumerTask task : prepareConsumerTasks(shardingCount)) { scheduler.scheduleJob(task.getJobDetail(), task.getTrigger()); } } private void prepareData (long shardingCount) { for (long i = 0L ; i < shardingCount; i++) { Map<String, Double> z = Maps.newHashMap(); Map<String, String> h = Maps.newHashMap(); for (int k = 0 ; k < 100 ; k++) { OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(k)); message.setUserId((long ) k); message.setOrderId("ORDER_ID_" + k); z.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000 ))); h.put(message.getOrderId(), JSON.toJSONString(message)); } Jedis jedis = jedisProvider.provide(i); jedis.hmset("ORDER_DETAIL_QUEUE" , h); jedis.zadd("ORDER_QUEUE" , z); } } private List<ConsumerTask> prepareConsumerTasks (long shardingCount) { List<ConsumerTask> tasks = Lists.newArrayList(); for (long i = 0 ; i < shardingCount; i++) { JobDetail jobDetail = JobBuilder.newJob(OrderMessageConsumer.class) .withIdentity("OrderMessageConsumer-" + i, "DelayTask" ) .usingJobData("shardingIndex" , i) .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("OrderMessageConsumerTrigger-" + i, "DelayTask" ) .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10 ).repeatForever()) .build(); tasks.add(new ConsumerTask(jobDetail, trigger)); } return tasks; } @Getter @RequiredArgsConstructor private static class ConsumerTask { private final JobDetail jobDetail; private final Trigger trigger; } }
新增一个启动函数并且启动,控制台输出如下:
// ...省略大量输出 2019-09-01 14:08:27.664 INFO 13056 --- [ main] c.t.multi.NoneJdbcSpringApplication : Started NoneJdbcSpringApplication in 1.333 seconds (JVM running for 5.352) 2019-09-01 14:08:27.724 INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer : 订单消息消费者定时任务开始执行,shardingIndex:[1]... 2019-09-01 14:08:27.724 INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer : 订单消息消费者定时任务开始执行,shardingIndex:[0]... 2019-09-01 14:08:27.732 INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer : shardingIndex:[1],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","userId":99} 2019-09-01 14:08:27.732 INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer : shardingIndex:[0],处理订单消息,内容:{"amount":99,"orderId":"ORDER_ID_99","userId":99} 2019-09-01 14:08:27.782 INFO 13056 --- [onsumerWorker-0] c.throwable.multi.OrderMessageConsumer : shardingIndex:[0],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","userId":98} 2019-09-01 14:08:27.782 INFO 13056 --- [onsumerWorker-1] c.throwable.multi.OrderMessageConsumer : shardingIndex:[1],处理订单消息,内容:{"amount":98,"orderId":"ORDER_ID_98","userId":98} // ...省略大量输出 2019-09-01 14:08:28.239 INFO 13056 --- [eduler_Worker-2] c.throwable.multi.OrderMessageConsumer : 订单消息消费者定时任务执行完毕,shardingIndex:[1]... 2019-09-01 14:08:28.240 INFO 13056 --- [eduler_Worker-1] c.throwable.multi.OrderMessageConsumer : 订单消息消费者定时任务执行完毕,shardingIndex:[0]... // ...省略大量输出
生产中应该避免Redis
服务单点,一般常用哨兵配合树状主从的部署方式(参考《Redis开发与运维》) ,2套Redis
哨兵的部署示意图如下:
需要什么监控项 我们需要相对实时地知道Redis
中的延时队列集合有多少积压数据,每次出队的耗时大概是多少等等监控项参数,这样我们才能更好地知道延时队列模块是否正常运行、是否存在性能瓶颈等等。具体的监控项,需要按需定制,这里为了方便举例,只做两个监控项的监控:
有序集合Sorted Set
中积压的元素数量。
每次调用dequeue.lua
的耗时。
采用的是应用实时上报数据的方式,依赖于spring-boot-starter-actuator
、Prometheus
、Grafana
搭建的监控体系,如果并不熟悉这个体系可以看两篇前置文章:
监控 引入依赖:
<dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-actuator</artifactId > </dependency > <dependency > <groupId > io.micrometer</groupId > <artifactId > micrometer-registry-prometheus</artifactId > <version > 1.2.0</version > </dependency >
这里选用Gauge
的Meter
进行监控数据收集,添加监控类OrderDelayQueueMonitor
:。
@Component public class OrderDelayQueueMonitor implements InitializingBean { private static final long SHARDING_COUNT = 2L ; private final ConcurrentMap<Long, AtomicLong> remain = Maps.newConcurrentMap(); private final ConcurrentMap<Long, AtomicLong> lua = Maps.newConcurrentMap(); private ScheduledExecutorService executor; @Autowired private JedisProvider jedisProvider; @Override public void afterPropertiesSet () throws Exception { executor = Executors.newSingleThreadScheduledExecutor(r -> { Thread thread = new Thread(r, "OrderDelayQueueMonitor" ); thread.setDaemon(true ); return thread; }); for (long i = 0L ; i < SHARDING_COUNT; i++) { AtomicLong l = new AtomicLong(); Metrics.gauge("order.delay.queue.lua.cost" , Collections.singleton(Tag.of("index" , String.valueOf(i))), l, AtomicLong::get); lua.put(i, l); AtomicLong r = new AtomicLong(); Metrics.gauge("order.delay.queue.remain" , Collections.singleton(Tag.of("index" , String.valueOf(i))), r, AtomicLong::get); remain.put(i, r); } executor.scheduleWithFixedDelay(new MonitorTask(jedisProvider), 0 , 5 , TimeUnit.SECONDS); } public void recordRemain (Long index, long count) { remain.get(index).set(count); } public void recordLuaCost (Long index, long count) { lua.get(index).set(count); } @RequiredArgsConstructor private class MonitorTask implements Runnable { private final JedisProvider jedisProvider; @Override public void run () { for (long i = 0L ; i < SHARDING_COUNT; i++) { try (Jedis jedis = jedisProvider.provide(i)) { recordRemain(i, jedis.zcount("ORDER_QUEUE" , "-inf" , "+inf" )); } } } } }
原来的RedisOrderDelayQueue#dequeue()
进行改造:
@RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue , InitializingBean { private final OrderDelayQueueMonitor orderDelayQueueMonitor; @Override public List<OrderMessage> dequeue (String min, String max, String offset, String limit, long index) { List<String> args = new ArrayList<>(); args.add(min); args.add(max); args.add(offset); args.add(limit); List<OrderMessage> result = Lists.newArrayList(); List<String> keys = Lists.newArrayList(); keys.add(ORDER_QUEUE); keys.add(ORDER_DETAIL_QUEUE); try (Jedis jedis = jedisProvider.provide(index)) { long start = System.nanoTime(); List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(index), keys, args); long end = System.nanoTime(); orderDelayQueueMonitor.recordLuaCost(index, TimeUnit.NANOSECONDS.toMicros(end - start)); if (null != eval) { for (String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); } } } return result; } }
其他配置这里简单说一下。
application.yaml
要开放prometheus
端点的访问权限:
server: port: 9091 management: endpoints: web: exposure: include: 'prometheus'
Prometheus
服务配置尽量减少查询的间隔时间,暂定为5秒:
global: scrape_interval: 5s evaluation_interval: 15s alerting: alertmanagers: - static_configs: - targets: rule_files: scrape_configs: - job_name: 'prometheus' metrics_path: '/actuator/prometheus' static_configs: - targets: ['localhost:9091' ]
Grafana
的基本配置项如下:
出队耗时 order_delay_queue_lua_cost 分片编号-{{index}} 订单延时队列积压量 order_delay_queue_remain 分片编号-{{index}}
最终可以在Grafana
配置每5秒刷新,见效果如下:
这里的监控项更多时候应该按需定制,说实话,监控的工作往往是最复杂和繁琐的。
小结 全文相对详细地介绍了基于Redis
实现延时任务的分片和监控的具体实施过程,核心代码仅供参考,还有一些具体的细节例如Prometheus
、Grafana
的一些应用,这里限于篇幅不会详细地展开。说实话,基于实际场景做一次中间件和架构的选型并不是一件简单的事,而且往往初期的实施并不是最大的难点,更大的难题在后面的优化以及监控。
附件
(本文完 c-3-d 20190901 身体不适,拖了一下)