This action will force synchronization from m631521383/KMQueue, which will overwrite any changes that you have made since you forked the repository, and can not be recovered!!!
Synchronous operation will process in the background and will refresh the page when finishing processing. Please be patient.
该框架是基于redis实现的分布式队列,简单灵活。
下面简单介绍下该队列的一些设计,如果还有其他不懂得地方可以参考源码和注释,代码中我加入了详尽的注释。
还有其他问题可以提issue。
更新历史:
2018年1月23日:新增健康检测 防止执行耗时久的任务被备份队列监听检测到并当作失败任务重试。 但需要用户自行实现健康检测的逻辑,后续考虑通过zookeeper实现健康上报。
KMQueue队列分为两种模式:
default
- 简单队列safe
- 安全队列其中默认为default
。
可以以queueName:queueMode
格式设置队列的模式。
queueName 队列名称
default 为默认队列,可以不指定,默认值。 特性:队列任务可能会丢失,队列任务没有超时限制。
queueMode 队列模式,可选值有:default、safe。
safe 为安全队列,任务有重试策略,达到重试次数依旧失败或者任务存活超时(这里说的超时是指AliveTimeout)(这两者都称为最终失败),Monitor会发出通知, 这样可以根据业务做一些处理,推荐将这些失败的任务持久化到数据库作为日志记录。当然或许你还有更好的处理方式。
注意:需要开启备份队列监听程序BackupQueueMonitor,否则安全队列中最终失败的任务只会存储在备份队列中,而没有消费者去消费处理,这是很危险的行为
new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe")
...
worker1_queue
为简单队列,worker2_queue
为安全队列。
注意:为了更好的支持业务(将已存在的某个队列的
DEFAULT
改为SAFE
,并重启服务的情况),做如下处理: 当new KMQueueManager.Builder
的队列名称参数中,只要有一个队列指定了SAFE
模式,就会创建备份队列(用于队列任务监控,设置任务超时、失败任务重试等), 并且该备份队列的名称基于传入的所有队列名称生成(无论其队列是否是SAFE
模式)。
上面的例子中,备份队列的生成策略为:
base64(md5("worker1_queue" + "worker2_queue"))
构造方法声明如下:
public Task(String queue,
String uid,
boolean isUnique,
String type,
String data,
Task.TaskStatus status)
uid:如果业务需要区分队列任务的唯一性,请自行生成uid参数, 否则队列默认使用uuid生成策略,这会导致即使data数据完全相同的任务也会被当作两个不同的任务处理。
是否是唯一任务,即队列中同一时刻只存在一个该任务。
type:用于业务逻辑的处理,你可以根据不同的type任务类型,调用不同的handler去处理,可以不传。
有三种方式获取Redis连接,详情查看KMQueueManager.Builder
构造方法的三种重载形式。
如果你使用spring,建议获取spring中配置的redis连接池对象,并通过如下构造方法创建队列管理器:
public Builder(Pool<Jedis> pool, String... queues)
aliveTimeout
);因为初始化备份队列时设置了循环标记;
所以Monitor这里采用定时Job策略,使用brpoplpush backupQueue backupQueue
循环遍历备份队列,遇到循环标记结束循环遍历。
对执行超时(对应的是大于protectedTimeout
)或者存活时间超时(对应的是大于aliveTimeout
)的任务做处理。
分为两种情况:
Pipeline
,决定这些任务的一些额外处理,比如持久化到数据库做日志记录。
// 任务彻底失败后的处理,需要实现Pipeline接口,自行实现处理逻辑
TaskPipeline taskPipeline = new TaskPipeline();
BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName)
...
.setPipeline(taskPipeline).build();
RetryTimes
的任务重新放回任务队列执行,同时更新任务状态:
使用方式:
// 健康检测
MyAliveDetectHandler detectHandler = new MyAliveDetectHandler();
...
// 构造Monitor监听器
BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName)
...
.registerAliveDetectHandler(detectHandler)
.build();
// 执行监听
backupQueueMonitor.monitor();
registerAliveDetectHandler()
方法可以设置Null,则不会启用健康检测。
检查正在执行的任务是否还在执行(存活),
为了防止耗时比较久的任务(任务的执行时间超出了通过队列管理器配置的任务执行超时时间 - 默认值:com.kingsoft.wps.mail.queue.config.Constant.PROTECTED_TIMEOUT) 会被备份队列监听器检测到并重新放入任务队列执行(因为备份队列监听器会把超出通过队列管理器配置的任务执行超时时间的任务当作是失败的任务(参考 什么是失败任务?)并进行重试)。
通过这种检测机制,可以保证check(Task)返回为true的任务(任务还在执行)不会被备份队列监听器重新放入任务队列重试。 这里只是提供一个接口,用户需要自己实现执行任务的健康检测。
目前健康检测机制还只是处于初步阶段,核心的检测逻辑还需要用户自行实现,这里只是提供一个接口。
一个比较简单的实现方式就是起一个定时job,每隔n毫秒检查线程中正在执行任务的状态,在redis中以 "任务的id + ALIVE_KEY_SUFFIX" 为key,ttl 为 n+m 毫秒(m < n, m用于保证两次job的空窗期),标记正在执行的任务。 然后AliveDetectHandler的实现类根据task去检查redis中是否存在该key,如果存在,返回true
@Test
public void pushTaskTest() {
KMQueueManager kmQueueManager = new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe")
.setMaxWaitMillis(-1L)
.setMaxTotal(600)
.setMaxIdle(300)
.setAliveTimeout(Constant.ALIVE_TIMEOUT)
.build();
// 初始化队列
kmQueueManager.init();
// 1.获取队列
TaskQueue taskQueue = kmQueueManager.getTaskQueue("worker2_queue");
// 2.创建任务
JSONObject ob = new JSONObject();
ob.put("data", "mail proxy task");
String data = JSON.toJSONString(ob);
// 参数 uid:如果业务需要区分队列任务的唯一性,请自行生成uid参数,
// 否则队列默认使用uuid生成策略,这会导致即使data数据完全相同的任务也会被当作两个不同的任务处理。
// 参数 isUnique:是否是唯一任务,即队列中同一时刻只存在一个该任务。
// 只有当该任务所属队列是安全队列时才生效;如果为true,则通过uid判断任务的唯一性。
// 参数 type:用于业务逻辑的处理,你可以根据不同的type任务类型,调用不同的handler去处理,可以不传。
Task task = new Task(taskQueue.getName(), "", true, "", data, new Task.TaskStatus());
// 3.将任务加入队列
taskQueue.pushTask(task);
}
@Test
public void popTaskTest() {
KMQueueManager kmQueueManager = new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe")
.setMaxWaitMillis(-1L)
.setMaxTotal(600)
.setMaxIdle(300)
.setAliveTimeout(Constant.ALIVE_TIMEOUT)
.build();
// 初始化队列
kmQueueManager.init();
// 1.获取队列
TaskQueue taskQueue = kmQueueManager.getTaskQueue("worker1_queue");
// 2.获取任务
Task task = taskQueue.popTask();
// 业务处理放到TaskConsumersHandler里
if (task != null) {
task.doTask(kmQueueManager, MyTaskHandler.class);
}
}
你可以自行实现TaskHandler
接口,创建适合你自己业务逻辑的任务处理类,并通过下面代码执行任务处理。
task.doTask(kmQueueManager, TaskHandler.class)
此外,doTask
方法还支持业务传参,通过第三个参数实现params
。
task.doTask(kmQueueManager, TaskHandler.class, params)
如果业务处理抛出异常,队列也将其当作任务执行完成处理,
并通过taskQueue.finishTask(this)
完成任务。
public void doTask(KMQueueManager kmQueueManager, Class clazz, Object... params) {
// 获取任务所属队列
TaskQueue taskQueue = kmQueueManager.getTaskQueue(this.getQueue());
String queueMode = taskQueue.getMode();
if (KMQueueManager.SAFE.equals(queueMode)) {// 安全队列
try {
handleTask(clazz, params);
} catch (Throwable e) {
e.printStackTrace();
}
// 任务执行完成,删除备份队列的相应任务
taskQueue.finishTask(this);
} else {// 普通队列
handleTask(clazz);
}
}
不会再进行任务重试操作。
这点可能不太容易理解,为什么任务抛出异常失败了,队列不会执行重试呢?
因为任务执行抛出异常是业务级的错误,队列不做干预。
队列的重试只是针对消费任务的线程被kill掉或者服务器宕机等情况,此时该任务还没执行完,任务的消费者还没告诉队列任务执行完成了。 此时备份队列监控会执行任务的重试。
如果你想在任务抛出异常失败时执行任务重试,可以不使用task.doTask
,当任务抛出异常时,不执行任务的taskQueue.finishTask(this)
操作。
这样备份队列监控会在下一个job对该任务进行检查处理。
taskQueue.finishTask(this)
是一个非常方便的工具方法。
@Test
public void monitorTaskTest() {
// 健康检测
MyAliveDetectHandler detectHandler = new MyAliveDetectHandler();
// 任务彻底失败后的处理,需要实现Pipeline接口,自行实现处理逻辑
MyPipeline pipeline = new MyPipeline();
// 根据任务队列的名称构造备份队列的名称,注意:这里的任务队列参数一定要和KMQueueManager构造时传入的一一对应。
String backUpQueueName = KMQUtils.genBackUpQueueName("worker1_queue", "worker2_queue:safe");
// 构造Monitor监听器
BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName)
.setMaxWaitMillis(-1L)
.setMaxTotal(600)
.setMaxIdle(300)
.setAliveTimeout(Constant.ALIVE_TIMEOUT)
.setProtectedTimeout(Constant.PROTECTED_TIMEOUT)
.setRetryTimes(Constant.RETRY_TIMES)
.registerAliveDetectHandler(detectHandler)
.setPipeline(pipeline).build();
// 执行监听
backupQueueMonitor.monitor();
}
重要的事情说三遍:
如果指定了队列的模式为安全队列,一定要开启备份队列监控!!!一定要开启备份队列监控!!!一定要开启备份队列监控!!!
任务执行抛出异常是业务级的错误,队列不做干预,队列依旧把它当作是成功的任务。
队列的重试只是针对消费任务的线程被kill掉或者服务器宕机等情况,此时该任务还没执行完,任务的消费者还没告诉队列任务执行完成了。 此时备份队列监控会执行任务的重试。在这种情况下,任务才能定义为失败任务。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。