前提 最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。
候选方案对比 下面是想到的几种实现延时任务的方案,总结了一下相应的优势和劣势。
方案
优势
劣势
选用场景
JDK
内置的延迟队列DelayQueue
实现简单
数据内存态,不可靠
一致性相对低的场景
调度框架和MySQL
进行短间隔轮询
实现简单,可靠性高
存在明显的性能瓶颈
数据量较少实时性相对低的场景
RabbitMQ
的DLX
和TTL
,一般称为死信队列 方案
异步交互可以削峰
延时的时间长度不可控,如果数据需要持久化则性能会降低
-
调度框架和Redis
进行短间隔轮询
数据持久化,高性能
实现难度大
常见于支付结果回调方案
时间轮
实时性高
实现难度大,内存消耗大
实时性高的场景
如果应用的数据量不高,实时性要求比较低,选用调度框架和MySQL
进行短间隔轮询这个方案是最优的方案。但是笔者遇到的场景数据量相对比较大,实时性并不高,采用扫库的方案一定会对MySQL
实例造成比较大的压力。记得很早之前,看过一个PPT叫《盒子科技聚合支付系统演进》,其中里面有一张图片给予笔者一点启发:
里面刚好用到了调度框架和Redis
进行短间隔轮询实现延时任务的方案,不过为了分摊应用的压力,图中的方案还做了分片处理。鉴于笔者当前业务紧迫,所以在第一期的方案暂时不考虑分片,只做了一个简化版的实现。
由于PPT中没有任何的代码或者框架贴出,有些需要解决的技术点需要自行思考,下面会重现一次整个方案实现的详细过程。
场景设计 实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做OrderMessage
),订单消息需要延迟5到15秒后进行异步处理。
否决的候选方案实现思路 下面介绍一下其它四个不选用的候选方案,结合一些伪代码和流程分析一下实现过程。
JDK内置延迟队列 DelayQueue
是一个阻塞队列的实现,它的队列元素必须是Delayed
的子类,这里做个简单的例子:
public class DelayQueueMain { private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class); public static void main (String[] args) throws Exception { DelayQueue<OrderMessage> queue = new DelayQueue<>(); OrderMessage message = new OrderMessage("ORDER_ID_10086" ); queue.add(message); message = new OrderMessage("ORDER_ID_10087" , 6 ); queue.add(message); message = new OrderMessage("ORDER_ID_10088" , 10 ); queue.add(message); ExecutorService executorService = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setName("DelayWorker" ); thread.setDaemon(true ); return thread; }); LOGGER.info("开始执行调度线程..." ); executorService.execute(() -> { while (true ) { try { OrderMessage task = queue.take(); LOGGER.info("延迟处理订单消息,{}" , task.getDescription()); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } } }); Thread.sleep(Integer.MAX_VALUE); } private static class OrderMessage implements Delayed { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); private static final long DELAY_MS = 1000L * 5 ; private final String orderId; private final long timestamp; private final long expire; private final String description; public OrderMessage (String orderId, long expireSeconds) { this .orderId = orderId; this .timestamp = System.currentTimeMillis(); this .expire = this .timestamp + expireSeconds * 1000L ; this .description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s" , orderId, LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F)); } public OrderMessage (String orderId) { this .orderId = orderId; this .timestamp = System.currentTimeMillis(); this .expire = this .timestamp + DELAY_MS; this .description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s" , orderId, LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F)); } public String getOrderId () { return orderId; } public long getTimestamp () { return timestamp; } public long getExpire () { return expire; } public String getDescription () { return description; } @Override public long getDelay (TimeUnit unit) { return unit.convert(this .expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo (Delayed o) { return (int ) (this .getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } } }
注意一下,OrderMessage
实现Delayed
接口,关键是需要实现Delayed#getDelay()
和Delayed#compareTo()
。运行一下main()
方法:
10 :16 :08.240 [main] INFO club.throwable.delay.DelayQueueMain - 开始执行调度线程...10 :16 :13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10086]-创建时间为:2019 -08-20 10 :16 :08,超时时间为:2019 -08-20 10 :16 :13 10 :16 :14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10087]-创建时间为:2019 -08-20 10 :16 :08,超时时间为:2019 -08-20 10 :16 :14 10 :16 :18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10088]-创建时间为:2019 -08-20 10 :16 :08,超时时间为:2019 -08-20 10 :16 :18
调度框架 + MySQL 使用调度框架对MySQL
表进行短间隔轮询是实现难度比较低的方案,通常服务刚上线,表数据不多并且实时性不高的情况下应该首选这个方案。不过要注意以下几点:
注意轮询间隔不能太短,否则会对MySQL
实例产生影响。
注意每次查询的数量,结果集数量太多有可能会导致调度阻塞和占用应用大量内存,从而影响时效性。
注意要设计状态值和最大重试次数,这样才能尽量避免大量数据积压和重复查询的问题。
最好通过时间列做索引,查询指定时间范围内的数据。
引入Quartz
、MySQL
的Java驱动包和spring-boot-starter-jdbc
(这里只是为了方便用相对轻量级的框架实现,生产中可以按场景按需选择其他更合理的框架):
<dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.48</version > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-jdbc</artifactId > <version > 2.1.7.RELEASE</version > <scope > test</scope > </dependency > <dependency > <groupId > org.quartz-scheduler</groupId > <artifactId > quartz</artifactId > <version > 2.3.1</version > <scope > test</scope > </dependency >
假设表设计如下:
CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;USE `delayTask`; CREATE TABLE `t_order_message`( id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR (50 ) NOT NULL COMMENT '订单ID' , create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建日期时间' , edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期时间' , retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '重试次数' , order_status TINYINT NOT NULL DEFAULT 0 COMMENT '订单状态' , INDEX idx_order_id (order_id), INDEX idx_create_time (create_time) ) COMMENT '订单信息表' ; # 写入两条测试数据 INSERT INTO t_order_message(order_id) VALUES ('10086' ),('10087' );
编写代码:
public class OrderConstants { public static final int MAX_RETRY_TIMES = 5 ; public static final int PENDING = 0 ; public static final int SUCCESS = 1 ; public static final int FAIL = -1 ; public static final int LIMIT = 10 ; } @Builder @Data public class OrderMessage { private Long id; private String orderId; private LocalDateTime createTime; private LocalDateTime editTime; private Integer retryTimes; private Integer orderStatus; } @RequiredArgsConstructor public class OrderMessageDao { private final JdbcTemplate jdbcTemplate; private static final ResultSetExtractor<List<OrderMessage>> M = r -> { List<OrderMessage> list = Lists.newArrayList(); while (r.next()) { list.add(OrderMessage.builder() .id(r.getLong("id" )) .orderId(r.getString("order_id" )) .createTime(r.getTimestamp("create_time" ).toLocalDateTime()) .editTime(r.getTimestamp("edit_time" ).toLocalDateTime()) .retryTimes(r.getInt("retry_times" )) .orderStatus(r.getInt("order_status" )) .build()); } return list; }; public List<OrderMessage> selectPendingRecords (LocalDateTime start, LocalDateTime end, List<Integer> statusList, int maxRetryTimes, int limit) { StringJoiner joiner = new StringJoiner("," ); statusList.forEach(s -> joiner.add(String.valueOf(s))); return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " + "AND order_status IN (?) AND retry_times < ? LIMIT ?" , p -> { p.setTimestamp(1 , Timestamp.valueOf(start)); p.setTimestamp(2 , Timestamp.valueOf(end)); p.setString(3 , joiner.toString()); p.setInt(4 , maxRetryTimes); p.setInt(5 , limit); }, M); } public int updateOrderStatus (Long id, int status) { return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?" , p -> { p.setInt(1 , status); p.setTimestamp(2 , Timestamp.valueOf(LocalDateTime.now())); p.setLong(3 , id); }); } } @RequiredArgsConstructor public class OrderMessageService { private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class); private final OrderMessageDao orderMessageDao; private static final List<Integer> STATUS = Lists.newArrayList(); static { STATUS.add(OrderConstants.PENDING); STATUS.add(OrderConstants.FAIL); } public void executeDelayJob () { LOGGER.info("订单处理定时任务开始执行......" ); LocalDateTime end = LocalDateTime.now(); LocalDateTime start = end.minusDays(1 ); List<OrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT); if (!list.isEmpty()) { for (OrderMessage m : list) { LOGGER.info("处理订单[{}],状态由{}更新为{}" , m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS); orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS); } } LOGGER.info("订单处理定时任务开始完毕......" ); } } @DisallowConcurrentExecution public class OrderMessageDelayJob implements Job { @Override public void execute (JobExecutionContext jobExecutionContext) throws JobExecutionException { OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService" ); service.executeDelayJob(); } public static void main (String[] args) throws Exception { HikariConfig config = new HikariConfig(); config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask?useSSL=false&characterEncoding=utf8" ); config.setDriverClassName(Driver.class.getName()); config.setUsername("root" ); config.setPassword("root" ); HikariDataSource dataSource = new HikariDataSource(config); OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource)); OrderMessageService service = new OrderMessageService(orderMessageDao); StdSchedulerFactory factory = new StdSchedulerFactory(); Scheduler scheduler = factory.getScheduler(); JobDataMap jobDataMap = new JobDataMap(); jobDataMap.put("orderMessageService" , service); JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class) .withIdentity("orderMessageDelayJob" , "delayJob" ) .usingJobData(jobDataMap) .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("orderMessageDelayTrigger" , "delayJob" ) .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10 ).repeatForever()) .build(); scheduler.scheduleJob(job, trigger); scheduler.start(); Thread.sleep(Integer.MAX_VALUE); } }
这个例子里面用了create_time
做轮询,实际上可以添加一个调度时间schedule_time
列做轮询,这样子才能更容易定制空闲时和忙碌时候的调度策略。上面的示例的运行效果如下:
11 :58 :27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3 .1 ) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED' Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally. NOT STARTED. Currently in standby mode. Number of jobs executed: 0 Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads. Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered. 11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package : 'quartz.properties' 11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1 11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started. 11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers 11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class =club.throwable.jdbc.OrderMessageDelayJob11 :58 :27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c5311 :58 :27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers11 :58 :27.221 [DefaultQuartzScheduler_Worker-1 ] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob11 :58 :34.440 [DefaultQuartzScheduler_Worker-1 ] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始执行......11 :58 :34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece411 :58 :34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af11 :58 :34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b711 :58 :34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a6236911 :58 :34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d01711 :58 :34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10 , active=0 , idle=10 , waiting=0 ) 11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query 11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time > = ? AND create_time <= ? AND order_status IN (?) AND retry_times < ? LIMIT ?] 11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource 11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0,-1'] 11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10086],状态由0更新为1 11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update 11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]11 :58 :35.347 [DefaultQuartzScheduler_Worker-1 ] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource11 :58 :35.354 [DefaultQuartzScheduler_Worker-1 ] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10087 ],状态由0 更新为1 11 :58 :35.355 [DefaultQuartzScheduler_Worker-1 ] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update11 :58 :35.355 [DefaultQuartzScheduler_Worker-1 ] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]11 :58 :35.355 [DefaultQuartzScheduler_Worker-1 ] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource11 :58 :35.361 [DefaultQuartzScheduler_Worker-1 ] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始完毕......11 :58 :35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers11 :58 :37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob' , class =club.throwable.jdbc.OrderMessageDelayJob11 :58 :37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
RabbitMQ死信队列 使用RabbitMQ
死信队列依赖于RabbitMQ
的两个特性:TTL
和DLX
。
TTL
:Time To Live
,消息存活时间,包括两个维度:队列消息存活时间和消息本身的存活时间。
DLX
:Dead Letter Exchange
,死信交换器。
画个图描述一下这两个特性:
下面为了简单起见,TTL
使用了针对队列的维度。引入RabbitMQ
的Java驱动:
<dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.7.3</version > <scope > test</scope > </dependency >
代码如下:
public class DlxMain { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class); public static void main (String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel producerChannel = connection.createChannel(); Channel consumerChannel = connection.createChannel(); producerChannel.exchangeDeclare("dlx.exchange" , "direct" ); producerChannel.queueDeclare("dlx.queue" , false , false , false , null ); producerChannel.queueBind("dlx.queue" , "dlx.exchange" , "dlx.key" ); Map<String, Object> queueArgs = new HashMap<>(); queueArgs.put("x-message-ttl" , 5000 ); queueArgs.put("x-dead-letter-exchange" , "dlx.exchange" ); queueArgs.put("x-dead-letter-routing-key" , "dlx.key" ); producerChannel.queueDeclare("business.queue" , false , false , false , queueArgs); ExecutorService executorService = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setDaemon(true ); thread.setName("DlxConsumer" ); return thread; }); executorService.execute(() -> { try { consumerChannel.basicConsume("dlx.queue" , true , new DlxConsumer(consumerChannel)); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } }); OrderMessage message = new OrderMessage("10086" ); producerChannel.basicPublish("" , "business.queue" , MessageProperties.TEXT_PLAIN, message.getDescription().getBytes(StandardCharsets.UTF_8)); LOGGER.info("发送消息成功,订单ID:{}" , message.getOrderId()); message = new OrderMessage("10087" ); producerChannel.basicPublish("" , "business.queue" , MessageProperties.TEXT_PLAIN, message.getDescription().getBytes(StandardCharsets.UTF_8)); LOGGER.info("发送消息成功,订单ID:{}" , message.getOrderId()); message = new OrderMessage("10088" ); producerChannel.basicPublish("" , "business.queue" , MessageProperties.TEXT_PLAIN, message.getDescription().getBytes(StandardCharsets.UTF_8)); LOGGER.info("发送消息成功,订单ID:{}" , message.getOrderId()); Thread.sleep(Integer.MAX_VALUE); } private static class DlxConsumer extends DefaultConsumer { DlxConsumer(Channel channel) { super (channel); } @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { LOGGER.info("处理消息成功:{}" , new String(body, StandardCharsets.UTF_8)); } } private static class OrderMessage { private final String orderId; private final long timestamp; private final String description; OrderMessage(String orderId) { this .orderId = orderId; this .timestamp = System.currentTimeMillis(); this .description = String.format("订单[%s],订单创建时间为:%s" , orderId, LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F)); } public String getOrderId () { return orderId; } public long getTimestamp () { return timestamp; } public String getDescription () { return description; } } }
运行main()
方法结果如下:
16 :35 :58.638 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10086 16 :35 :58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10087 16 :35 :58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10088 16 :36 :03.646 [pool-1 -thread-4 ] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10086 ],订单创建时间为:2019 -08-20 16 :35 :58 16 :36 :03.670 [pool-1 -thread-5 ] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10087 ],订单创建时间为:2019 -08-20 16 :35 :58 16 :36 :03.670 [pool-1 -thread-6 ] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10088 ],订单创建时间为:2019 -08-20 16 :35 :58
时间轮 时间轮TimingWheel
是一种高效、低延迟的调度数据结构,底层采用数组实现存储任务列表的环形队列,示意图如下:
这里暂时不对时间轮和其实现作分析,只简单举例说明怎么使用时间轮实现延时任务。这里使用Netty
提供的HashedWheelTimer
,引入依赖:
<dependency > <groupId > io.netty</groupId > <artifactId > netty-common</artifactId > <version > 4.1.39.Final</version > </dependency >
代码如下:
public class HashedWheelTimerMain { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS" ); public static void main (String[] args) throws Exception { AtomicInteger counter = new AtomicInteger(); ThreadFactory factory = r -> { Thread thread = new Thread(r); thread.setDaemon(true ); thread.setName("HashedWheelTimerWorker-" + counter.getAndIncrement()); return thread; }; Timer timer = new HashedWheelTimer(factory, 1 , TimeUnit.SECONDS, 60 ); TimerTask timerTask = new DefaultTimerTask("10086" ); timer.newTimeout(timerTask, 5 , TimeUnit.SECONDS); timerTask = new DefaultTimerTask("10087" ); timer.newTimeout(timerTask, 10 , TimeUnit.SECONDS); timerTask = new DefaultTimerTask("10088" ); timer.newTimeout(timerTask, 15 , TimeUnit.SECONDS); Thread.sleep(Integer.MAX_VALUE); } private static class DefaultTimerTask implements TimerTask { private final String orderId; private final long timestamp; public DefaultTimerTask (String orderId) { this .orderId = orderId; this .timestamp = System.currentTimeMillis(); } @Override public void run (Timeout timeout) throws Exception { System.out.println(String.format("任务执行时间:%s,订单创建时间:%s,订单ID:%s" , LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), orderId)); } } }
运行结果:
任务执行时间:2019 -08-20 17 :19 :49.310 ,订单创建时间:2019 -08-20 17 :19 :43.294 ,订单ID:10086 任务执行时间:2019 -08-20 17 :19 :54.297 ,订单创建时间:2019 -08-20 17 :19 :43.301 ,订单ID:10087 任务执行时间:2019 -08-20 17 :19 :59.297 ,订单创建时间:2019 -08-20 17 :19 :43.301 ,订单ID:10088
一般来说,任务执行的时候应该使用另外的业务线程池,以免阻塞时间轮本身的运动。
选用的方案实现过程 最终选用了基于Redis
的有序集合Sorted Set
和Quartz
短轮询进行实现。具体方案是:
订单创建的时候,订单ID和当前时间戳分别作为Sorted Set
的member和score添加到订单队列Sorted Set
中。
订单创建的时候,订单ID和推送内容JSON
字符串分别作为field和value添加到订单队列内容Hash
中。
第1步和第2步操作的时候用Lua
脚本保证原子性。
使用一个异步线程通过Sorted Set
的命令ZREVRANGEBYSCORE
弹出指定数量的订单ID对应的订单队列内容Hash
中的订单推送内容数据进行处理。
对于第4点处理有两种方案:
方案一:弹出订单内容数据的同时进行数据删除,也就是ZREVRANGEBYSCORE
、ZREM
和HDEL
命令要在同一个Lua
脚本中执行,这样的话Lua
脚本的编写难度大,并且由于弹出数据已经在Redis
中删除,如果数据处理失败则可能需要从数据库重新查询补偿。
方案二:弹出订单内容数据之后,在数据处理完成的时候再主动删除订单队列Sorted Set
和订单队列内容Hash
中对应的数据,这样的话需要控制并发,有重复执行的可能性。
最终暂时选用了方案一,也就是从Sorted Set
弹出订单ID并且从Hash
中获取完推送数据之后马上删除这两个集合中对应的数据。方案的流程图大概是这样:
这里先详细说明一下用到的Redis
命令。
Sorted Set相关命令
ZADD
命令 - 将一个或多个成员元素及其分数值加入到有序集当中。
ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN
ZREVRANGEBYSCORE
命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
max:分数区间 - 最大分数。
min:分数区间 - 最小分数。
WITHSCORES:可选参数,是否返回分数值,指定则会返回得分值。
LIMIT:可选参数,offset和count原理和MySQL
的LIMIT offset,size
一致,如果不指定此参数则返回整个集合的数据。
ZREM
命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略。
ZREM key member [member ...]
Hash相关命令
HMSET
命令 - 同时将多个field-value(字段-值)对设置到哈希表中。
HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN
HDEL
命令 - 删除哈希表key中的一个或多个指定字段,不存在的字段将被忽略。
HDEL KEY_NAME FIELD1.. FIELDN
Lua相关
加载Lua
脚本并且返回脚本的SHA-1
字符串:SCRIPT LOAD script
。
执行已经加载的Lua
脚本:EVALSHA sha1 numkeys key [key ...] arg [arg ...]
。
unpack
函数可以把table
类型的参数转化为可变参数,不过需要注意的是unpack
函数必须使用在非变量定义的函数调用的最后一个参数,否则会失效,详细见Stackoverflow
的提问table.unpack() only returns the first element 。
PS:如果不熟悉Lua语言,建议系统学习一下,因为想用好Redis,一定离不开Lua。
引入依赖:
<dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-dependencies</artifactId > <version > 2.1.7.RELEASE</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <dependencies > <dependency > <groupId > org.quartz-scheduler</groupId > <artifactId > quartz</artifactId > <version > 2.3.1</version > </dependency > <dependency > <groupId > redis.clients</groupId > <artifactId > jedis</artifactId > <version > 3.1.0</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-jdbc</artifactId > </dependency > <dependency > <groupId > org.springframework</groupId > <artifactId > spring-context-support</artifactId > <version > 5.1.9.RELEASE</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > 1.18.8</version > <scope > provided</scope > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.59</version > </dependency > </dependencies >
编写Lua
脚本/lua/enqueue.lua
和/lua/dequeue.lua
:
local zset_key = KEYS[1 ]local hash_key = KEYS[2 ]local zset_value = ARGV[1 ]local zset_score = ARGV[2 ]local hash_field = ARGV[3 ]local hash_value = ARGV[4 ]redis.call('ZADD' , zset_key, zset_score, zset_value) redis.call('HSET' , hash_key, hash_field, hash_value) return nil local zset_key = KEYS[1 ]local hash_key = KEYS[2 ]local min_score = ARGV[1 ]local max_score = ARGV[2 ]local offset = ARGV[3 ]local limit = ARGV[4 ]local status , type = next (redis.call('TYPE' , zset_key))if status ~= nil and status == 'ok' then if type == 'zset' then local list = redis.call('ZREVRANGEBYSCORE' , zset_key, max_score, min_score, 'LIMIT' , offset, limit) if list ~= nil and #list > 0 then redis.call('ZREM' , zset_key, unpack (list)) local result = redis.call('HMGET' , hash_key, unpack (list)) redis.call('HDEL' , hash_key, unpack (list)) return result end end end return nil
编写核心API代码:
@Component public class JedisProvider implements InitializingBean { private JedisPool jedisPool; @Override public void afterPropertiesSet () throws Exception { jedisPool = new JedisPool(); } public Jedis provide () { return jedisPool.getResource(); } } @Data public class OrderMessage { private String orderId; private BigDecimal amount; private Long userId; } public interface OrderDelayQueue { void enqueue (OrderMessage message) ; List<OrderMessage> dequeue (String min, String max, String offset, String limit) ; List<OrderMessage> dequeue () ; String enqueueSha () ; String dequeueSha () ; } @RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue , InitializingBean { private static final String MIN_SCORE = "0" ; private static final String OFFSET = "0" ; private static final String LIMIT = "10" ; private static final String ORDER_QUEUE = "ORDER_QUEUE" ; private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE" ; private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua" ; private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua" ; private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>(); private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>(); private static final List<String> KEYS = Lists.newArrayList(); private final JedisProvider jedisProvider; static { KEYS.add(ORDER_QUEUE); KEYS.add(ORDER_DETAIL_QUEUE); } @Override public void enqueue (OrderMessage message) { List<String> args = Lists.newArrayList(); args.add(message.getOrderId()); args.add(String.valueOf(System.currentTimeMillis())); args.add(message.getOrderId()); args.add(JSON.toJSONString(message)); try (Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args); } } @Override public List<OrderMessage> dequeue () { String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000 ); return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT); } @SuppressWarnings("unchecked") @Override public List<OrderMessage> dequeue (String min, String max, String offset, String limit) { List<String> args = new ArrayList<>(); args.add(min); args.add(max); args.add(offset); args.add(limit); List<OrderMessage> result = Lists.newArrayList(); try (Jedis jedis = jedisProvider.provide()) { List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), KEYS, args); if (null != eval) { for (String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); } } } return result; } @Override public String enqueueSha () { return ENQUEUE_LUA_SHA.get(); } @Override public String dequeueSha () { return DEQUEUE_LUA_SHA.get(); } @Override public void afterPropertiesSet () throws Exception { loadLuaScript(); } private void loadLuaScript () throws Exception { try (Jedis jedis = jedisProvider.provide()) { ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION); String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); String sha = jedis.scriptLoad(luaContent); ENQUEUE_LUA_SHA.compareAndSet(null , sha); resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION); luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); sha = jedis.scriptLoad(luaContent); DEQUEUE_LUA_SHA.compareAndSet(null , sha); } } public static void main (String[] as) throws Exception { DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS" ); JedisProvider jedisProvider = new JedisProvider(); jedisProvider.afterPropertiesSet(); RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider); queue.afterPropertiesSet(); OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(10086 )); message.setOrderId("ORDER_ID_10086" ); message.setUserId(10086L ); message.setTimestamp(LocalDateTime.now().format(f)); List<String> args = Lists.newArrayList(); args.add(message.getOrderId()); args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000 )); args.add(message.getOrderId()); args.add(JSON.toJSONString(message)); try (Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args); } List<OrderMessage> dequeue = queue.dequeue(); System.out.println(dequeue); } }
这里先执行一次main()
方法验证一下延迟队列是否生效:
[OrderMessage(orderId=ORDER_ID_10086, amount=10086 , userId=10086 , timestamp=2019 -08-21 08:32 :22.885 )]
确定延迟队列的代码没有问题,接着编写一个Quartz
的Job
类型的消费者OrderMessageConsumer
:
@DisallowConcurrentExecution @Component public class OrderMessageConsumer implements Job { private static final AtomicInteger COUNTER = new AtomicInteger(); private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> { Thread thread = new Thread(r); thread.setDaemon(true ); thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement()); return thread; }); private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class); @Autowired private OrderDelayQueue orderDelayQueue; @Override public void execute (JobExecutionContext jobExecutionContext) throws JobExecutionException { StopWatch stopWatch = new StopWatch(); stopWatch.start(); LOGGER.info("订单消息处理定时任务开始执行......" ); List<OrderMessage> messages = orderDelayQueue.dequeue(); if (!messages.isEmpty()) { List<List<OrderMessage>> partition = Lists.partition(messages, 2 ); int size = partition.size(); final CountDownLatch latch = new CountDownLatch(size); for (List<OrderMessage> p : partition) { BUSINESS_WORKER_POOL.execute(new ConsumeTask(p, latch)); } try { latch.await(); } catch (InterruptedException ignore) { } } stopWatch.stop(); LOGGER.info("订单消息处理定时任务执行完毕,耗时:{} ms......" , stopWatch.getTotalTimeMillis()); } @RequiredArgsConstructor private static class ConsumeTask implements Runnable { private final List<OrderMessage> messages; private final CountDownLatch latch; @Override public void run () { try { for (OrderMessage message : messages) { LOGGER.info("处理订单信息,内容:{}" , message); } } finally { latch.countDown(); } } } }
上面的消费者设计的时候需要有以下考量:
使用@DisallowConcurrentExecution
注解不允许Job
并发执行,其实多个Job
并发执行意义不大,因为我们采用的是短间隔的轮询,而Redis
是单线程处理命令,在客户端做多线程其实效果不佳。
线程池BUSINESS_WORKER_POOL
的线程容量或者队列应该综合LIMIT
值、等分订单信息列表中使用的size
值以及ConsumeTask
里面具体的执行时间进行考虑,这里只是为了方便使用了固定容量的线程池。
ConsumeTask
中应该对每一条订单信息的处理单独捕获异常和吞并异常,或者把处理单个订单信息的逻辑封装成一个不抛出异常的方法。
其他Quartz
相关的代码:
@Configuration public class QuartzAutoConfiguration { @Bean public SchedulerFactoryBean schedulerFactoryBean (QuartzAutowiredJobFactory quartzAutowiredJobFactory) { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setAutoStartup(true ); factory.setJobFactory(quartzAutowiredJobFactory); return factory; } @Bean public QuartzAutowiredJobFactory quartzAutowiredJobFactory () { return new QuartzAutowiredJobFactory(); } public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements BeanFactoryAware { private AutowireCapableBeanFactory autowireCapableBeanFactory; @Override public void setBeanFactory (BeanFactory beanFactory) throws BeansException { this .autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory; } @Override protected Object createJobInstance (TriggerFiredBundle bundle) throws Exception { Object jobInstance = super .createJobInstance(bundle); autowireCapableBeanFactory.autowireBean(jobInstance); return jobInstance; } } }
这里暂时使用了内存态的RAMJobStore
去存放任务和触发器的相关信息,如果在生产环境最好替换成基于MySQL
也就是JobStoreTX
进行集群化,最后是启动函数和CommandLineRunner
的实现:
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class}) public class Application implements CommandLineRunner { @Autowired private Scheduler scheduler; @Autowired private JedisProvider jedisProvider; public static void main (String[] args) { SpringApplication.run(Application.class, args); } @Override public void run (String... args) throws Exception { prepareOrderMessageData(); JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class) .withIdentity("OrderMessageConsumer" , "DelayTask" ) .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("OrderMessageConsumerTrigger" , "DelayTask" ) .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5 ).repeatForever()) .build(); scheduler.scheduleJob(job, trigger); } private void prepareOrderMessageData () throws Exception { DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS" ); try (Jedis jedis = jedisProvider.provide()) { List<OrderMessage> messages = Lists.newArrayList(); for (int i = 0 ; i < 100 ; i++) { OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(i)); message.setOrderId("ORDER_ID_" + i); message.setUserId((long ) i); message.setTimestamp(LocalDateTime.now().format(f)); messages.add(message); } Map<String, Double> map = Maps.newHashMap(); Map<String, String> hash = Maps.newHashMap(); for (OrderMessage message : messages) { map.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000 ))); hash.put(message.getOrderId(), JSON.toJSONString(message)); } jedis.zadd("ORDER_QUEUE" , map); jedis.hmset("ORDER_DETAIL_QUEUE" , hash); } } }
输出结果如下:
2019 -08-21 22 :45 :59.518 INFO 33000 --- [ryBean_Worker-1 ] club.throwable.OrderMessageConsumer : 订单消息处理定时任务开始执行......2019 -08-21 22 :45 :59.525 INFO 33000 --- [onsumerWorker-4 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_91, amount=91 , userId=91 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :45 :59.525 INFO 33000 --- [onsumerWorker-2 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_95, amount=95 , userId=95 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :45 :59.525 INFO 33000 --- [onsumerWorker-1 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_97, amount=97 , userId=97 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :45 :59.525 INFO 33000 --- [onsumerWorker-0 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_99, amount=99 , userId=99 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :45 :59.525 INFO 33000 --- [onsumerWorker-3 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_93, amount=93 , userId=93 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :45 :59.539 INFO 33000 --- [onsumerWorker-2 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_94, amount=94 , userId=94 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :45 :59.539 INFO 33000 --- [onsumerWorker-1 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_96, amount=96 , userId=96 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :45 :59.539 INFO 33000 --- [onsumerWorker-3 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_92, amount=92 , userId=92 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :45 :59.539 INFO 33000 --- [onsumerWorker-0 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_98, amount=98 , userId=98 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :45 :59.539 INFO 33000 --- [onsumerWorker-4 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_90, amount=90 , userId=90 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :45 :59.540 INFO 33000 --- [ryBean_Worker-1 ] club.throwable.OrderMessageConsumer : 订单消息处理定时任务执行完毕,耗时:22 ms......2019 -08-21 22 :46 :04.515 INFO 33000 --- [ryBean_Worker-2 ] club.throwable.OrderMessageConsumer : 订单消息处理定时任务开始执行......2019 -08-21 22 :46 :04.516 INFO 33000 --- [onsumerWorker-5 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_89, amount=89 , userId=89 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :46 :04.516 INFO 33000 --- [onsumerWorker-6 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_87, amount=87 , userId=87 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :46 :04.516 INFO 33000 --- [onsumerWorker-7 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_85, amount=85 , userId=85 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :46 :04.516 INFO 33000 --- [onsumerWorker-5 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_88, amount=88 , userId=88 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :46 :04.516 INFO 33000 --- [onsumerWorker-2 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_83, amount=83 , userId=83 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :46 :04.516 INFO 33000 --- [onsumerWorker-1 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_81, amount=81 , userId=81 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :46 :04.516 INFO 33000 --- [onsumerWorker-6 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_86, amount=86 , userId=86 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :46 :04.516 INFO 33000 --- [onsumerWorker-2 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_82, amount=82 , userId=82 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :46 :04.516 INFO 33000 --- [onsumerWorker-7 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_84, amount=84 , userId=84 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :46 :04.516 INFO 33000 --- [onsumerWorker-1 ] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_80, amount=80 , userId=80 , timestamp=2019 -08-21 22 :45 :59.475 )2019 -08-21 22 :46 :04.516 INFO 33000 --- [ryBean_Worker-2 ] club.throwable.OrderMessageConsumer : 订单消息处理定时任务执行完毕,耗时:1 ms............
首次执行的时候涉及到一些组件的初始化,会比较慢,后面看到由于我们只是简单打印订单信息,所以定时任务执行比较快。如果在不调整当前架构的情况下,生产中需要注意:
切换JobStore
为JDBC
模式,Quartz
官方有完整教程,或者看笔者之前翻译的Quartz
文档。
需要监控或者收集任务的执行状态,添加预警等等。
这里其实有一个性能隐患,命令ZREVRANGEBYSCORE
的时间复杂度可以视为为O(N)
,N
是集合的元素个数,由于这里把所有的订单信息都放进了同一个Sorted Set
(ORDER_QUEUE
)中,所以在一直有新增数据的时候,dequeue
脚本的时间复杂度一直比较高,后续订单量升高之后会此处一定会成为性能瓶颈,后面会给出解决的方案。
小结 这篇文章主要从一个实际生产案例的仿真例子入手,分析了当前延时任务的一些实现方案,还基于Redis
和Quartz
给出了一个完整的示例。当前的示例只是处于可运行的状态,有些问题尚未解决。下一篇文章会着眼于解决两个方面的问题:
分片。
监控。
还有一点,架构是基于业务形态演进出来的,很多东西需要结合场景进行方案设计和改进,思路仅供参考,切勿照搬代码 。
附件
(本文完 c-5-d e-a-20190821 顺便开通了RSS插件,见主页的图标,欢迎订阅 r-a-20190904)