任务组件(qilu-task)介绍
任务组件(qilu-task)介绍
qilu-task主要是为了满足项目中的各种定时任务的防并发机制,都已经2020年了,难道一个系统上线,因为有个跑P的任务,所以要针对跑P功能单独开发应用程序,并且和运维同学商量好"这个包只部署一台服务器"???
流程的实现有简单和复杂的区别,复杂的流程一般都会包含配置界面,各种图形界面,断点,重跑之类的,而简单的,甚至都可以不要流程模型,比如我们当前的QiluTask的实现.当我们去掉配置界面以后,我们同样发现,整个的流程模型也能去掉,所以QiluTask可以被用在不需要流程的交易系统的各种异步补单中.
和quartz/spring-schedule不一样的地方在于,没有触发控制,只负责执行.使用简单的DB存储,告别quartz的8张表,具备简单的去重功能,有断点执行功能,主要是通过DB的机制来防并发,通过修改状态来实现简单的任务重跑.
主要场景
项目中使用到qilu-task的场景主要有
- 同一个项目部署多台服务器,每台服务器上都有定时任务,只希望有其中1台机器执行任务
- 项目中的异步补单机制
- 最最轻量级的任务实现,没有之一,高度聚合,简单而强大,对于项目中的补单操作,简单的跑批操作,错错有余.
- 针对任务创建,需要具备唯一的标识(任务类型+任务号),这个标识也具备防重复功能, 控制任务的执行
- 有持久,有断点,能重试
代码集成
maven依赖
<dependency>
<groupId>com.9istock.base</groupId>
<artifactId>qilu-task</artifactId>
<version>1.0.0</version>
</dependency>执行脚本
执行数据库的初始化语句(sql/create-*.sql)
spring配置
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="driverClassName" value="org.gjt.mm.mysql.Driver" />
<property name="url" value="jdbc:mysql://senvon.vm:3306/new-user?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull" />
<property name="username" value="root" />
<property name="password" value="123" />
</bean>
<bean name="qiluTaskDAO" class="com.istock.base.task.dao.QiluTaskDAO">
<property name="datasource" ref="dataSource"></property>
</bean>
<bean name="qiluTaskService" class="com.istock.base.task.service.QiluTaskService">
<!--配置异步执行的线程数-->
<property name="asynThreadCount" value="10"></property>
<!--配置未完成任务扫描线程数-->
<property name="compensateThreadCount" value="2"></property>
</bean>有一些细节需要注意
- qiluTask依赖于数据库,所以在注册dao的时候,需要数据源
- tableName,当一个项目,出现表冲突的时候,可以另外使用额外的表,默认值是AUTO_TASK
- qiluTaskService是qilutask的操作类,可以指定执行线程数
- asynThreadCount,配置异步执行的线程数,默认是20
- compenstaeThreadCount,是重跑线程数,默认是2
定时调度
使用spring-task,来触发定时任务
<task:scheduler id="taskScheduler" pool-size="10" />
<task:scheduled-tasks scheduler="taskScheduler">
<!-- 每隔1分钟检查在线用户里面是否有存在超时的信息,有就清除掉 -->
<task:scheduled ref="UserOnlineTask" method="addClearOnlineExpireTask" cron="23 0/1 * * * ?"/>
<!-- 每隔3分钟检查一下数据库还有没有未完成的任务,如果有,就执行 -->
<task:scheduled ref="qiluTaskService" method="compensateNotCompleteTask" cron="13 0/3 * * * ?"/>
</task:scheduled-tasks>compensateNotCompleteTask这个触发,是指每隔3分钟,轮询数据库里面执行完成的任务,重新执行.
常用方法说明
在QiluTaskService中,暴露如下几个方法
/**新增一个任务
* @param taskType 定义的一个任务类型
* @param taskNo 定义的一个任务编号
* @param invokeMethod 需要被执行的类和方法,请遵循格式,spring:beanId.methodName,如spring:UserOnlineTask.clearOnlineExpireHandler
* @param context 传递给执行器的上下文参数
* @param firstExecDelaySeconds 第一次执行的等待时间,适用于自动调用,这是一个神奇的字段
* @param execExpireMinute 执行的超时时间,单位分钟
* @param maxExecTimes 最大的执行次数,有默认有错误重试,超过该数值就不会再重试
* @param taskList 当前任务的依赖,没有依赖可以为null
* @throws DuplicateTaskException 如果存在同一个任务类型中,taskNo有重复,就会出现该异常
*/
public void addQiluTask(String taskType, String taskNo,
String invokeMethod, Map<String, String> context,
int firstExecDelaySeconds, int execExpireMinute, int maxExecTimes , Set<OneTask> taskList)新增一个任务,如果有重复,则会抛出异常.
/**同步执行一个任务
* @param taskType 任务类型
* @param taskNo 任务编号
* @return 执行结果
* @throws LockingTaskFailedException 锁失败异常
*/
public SyncTaskResult syncExecuteTask(String taskType, String taskNo) throws LockingTaskFailedException同步执行,会使用当前调用方法的线程,并等待执行结果,会阻塞当前线程的执行
/**异步执行一个任务
* @param taskType
* @param taskNo
*/
public void asynExecuteTask(final String taskType, final String taskNo)异步执行,没有返回值.
handler的实现
/**
* 在增加的任务里面,需要被回调的方法
* TaskExecResult,是当前任务的返回值,如果在新增任务的时候,有传入执行次数,允许当前任务返回失败
* 返回true,代表当前任务执行成功,后续也不会再执行
* 入参TaskExecContext,是在创建任务的时候,需要被传入的任务参数,这个参数以JSON的方式持久在数据库的CONTEXT_PARAM字段中
*/
public TaskExecResult clearOnlineExpireHandler(TaskExecContext context) {
logger.info("ready to execute user online expire clear task===============");
int count = userOnlineInfoService.clearExpireToken();
logger.info("end for execute user online expire clear task , delete {} record===============" , count);
return new TaskExecResult(true);
}handler的方法声明,必须和上述示例保持一致.
TaskExecResult是handler的返回值,构造函数中的boolean参数,如果返回true,代表当前任务执行成功,如果返回false,则代表执行失败.
执行成功的任务,在执行完以后,SYS_QILU_TASK表中的任务状态会变更为complete,如果执行失败,后续会等待重跑任务执行,进行重跑.
其他
任务的状态
| 状态 | 说明 |
|---|---|
| WAITING_EXECUTE | 刚刚插入,等待执行 |
| EXECUTING | 执行中,通过状态来做的乐观锁 |
| COMPLETE | 已完成 |
| NOT_RETRY | 失败,超过重试次数,不再重试 |
如果一个任务以QiluTask执行,后续需要重跑,只需要在数据库中把数据库中的状态设置为WAITING_EXECUTE,同时设置EXECUTED_TIME,不要让重跑超过次数
任务执行
任务的执行,主要是参考taskService的执行方法,只要执行方法被调用,任务就会执行.另外,qiluTask自己有独立的补单任务,补单任务会依赖于时间执行.
补单任务会检索SYS_QILU_TASK表中的NEXT_EXEC_TIME和TASK_STATE,满足以后才会被补单. 检查依赖失败,不会增加执行次数
使用场景
交易补单
假设我们的需求是,处理一个订单,这个订单需要入账,同时保存入账结果.但是由于各种原因,入账不一定是同步的,有的时候账务系统会返回"处理中"或者完全没响应.这个时候,就需要补单操作,我们的订单处理的逻辑如下:
public result 订单处理(){
try{
锁(订单)
{
@事务
{
订单状态更新
}
入账结果 = 调用入账();
if(入账结果 != null){
@事务
{
保存订单(入账结果)
}
}else{
@事务
{
Map<String , String> context = new HashMap<String , String>();
context.put("orderNo" , 订单号);
context.put("accountRequestNo" , 入账请求号);
try{
qiluTaskService.addTask("xxx订单-补单",订单号 ,"执行handler", ,5 , 5 , 5 , null);
}catch(Exception){}
}
}
}
}
}上面的示例代码是常见的一个交易入账的实现,有些事情需要注意
- 调用入账如果是一个远程操作,一定不能包含在订单状态更新的事务里面,有很多严重的问题,详细请看<分布式交易中的不一致>
- 一定要有明确的成功或者失败,才能更新订单结果
- 一定需要考虑入账结果为null的情况,比如网络被挖断了
- qiluTask在这边的玩法,就是在没有明确结果的前提下,订单的结果也不能更新,触发一个异步任务帮我去轮询,至于这个任务是集群中的哪台机器做,随便
跑P方案一
每日的定时跑P,解决方案一,在每天的随便什么时候(比如凌晨2点),触发一个springTask,调用qiluTaskService.addTask方法,向qiluTask注册今天的任务.这边要注意一下2点
- 今天的任务要不要依赖昨天的任务??什么意思,就是昨天的任务如果出现各种问题以后,今天的任务还要不要跑,是等着还是不管之前的记录.
- 今天的后续任务要不要依赖前置任务???
假设今天的跑P流程是这样的

我们的代码可以这么实现
qiluTaskService.addTask("前置任务1","日期" , handler , map , second(执行时间-now) , 5 , 5 , [{"前置任务1",昨天的日期}]);
qiluTaskService.addTask("前置任务2","日期" , handler2 , map , second(执行时间-now) , 5 , 5 , [{"前置任务1",今天的日期}]);
qiluTaskService.addTask("主任务","日期" , handler3 , map , second(执行时间-now) , 5 , 5 , [{"前置任务2",今天的日期} , {"主任务",昨天的日期}]);
qiluTaskService.addTask("收尾任务","日期" , handler4 , map , second(执行时间-now) , 5 , 5 , [{"主任务",今天的日期}]);That's IT!!!有些细节说明下
- 前置任务1依赖于自己的昨天的任务,其他的任务都依赖于这个前置任务1
- 如果昨天的前置任务1,因为各种原因,原本今天凌晨2点的任务,最终执行的时间是明天的凌晨4点,上述配置,会在明天的凌晨4点之后的qiluTask补单任务中,开始执行今天的前置任务1,后续任务也会跟上
- 至于主任务来说,不但依赖了今天的前置任务,还依赖了昨天的主任务,那个依赖只是一个双重保险
通过上述方案,我们可以做到
- 每天的跑P结果是能看到的,而且可以监控
- 如果主任务没有跑完,是因为前置任务2没跑完,我们可以单独修改前置任务2的状态,让前置任务2执行完成以后,会自动触发主任务.
超级好用的taskNo生成器
如果一个任务,需要每5分钟执行一次,如果使用qiluTask来执行,需要一个稳定的任务号,这个任务号,需要5分钟内是唯一的.
可以借助于spring中org.springframework.scheduling.support.CronSequenceGenerator的类,这个类可以直接new出来,在构造函数中,可以传入一个cron表达式,通过nextDate方法,得到一个date,再通过date保留到5分钟级别,所以我们可以每分钟都尝试生成一个任务,而在5分钟内可以得到同样一个任务号.
//针对周期性没有具体taskNo,可以通过时间生成类似批次号,作为taskNo
//下面的代码,就是每分钟的23秒,会生成一个taskNo,至于当前的代码啥时候被触发,可以忽略
CronSequenceGenerator generator = new CronSequenceGenerator("23 0/1 * * * ?");
String taskNo = format.format(generator.next(new Date()));qiluTask的限制
- 注意服务器的时间.采用上述的方式,会依赖于服务器的时间.如果服务器的时间不正确,会导致任务在不同的时间执行,这种行为算是某种黑客行为(当年的千年虫的一个临时解决方案),不要惊慌,就算任务已经执行,也不会导致重复执行,因为qiluTask使用数据库的记录来实现防重.
- 使用qiluTask以后,只需要刚改对应的任务状态和下次执行时间,就能让任务被随时调起.这个功能慎用,一定要确认对应的业务功能没有重复判断,或者没有执行.
