5 Star 45 Fork 6

mqyqingkong / flowprocess

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
handler.go 4.07 KB
一键复制 编辑 原始数据 按行查看 历史
mqyqingkong 提交于 2023-07-19 21:34 . add:HandlerFunc
package flowprocess
import (
"container/list"
"log"
)
// TraceableTask 表示可追踪的task。
type TraceableTask interface {
// TaskId 用于获取TaskId,便于追踪。
TaskId() uint64
// Inner 用于获取底层的Task。
Inner() Task
}
type traceableTask struct {
taskId uint64
task Task
}
func (t *traceableTask) TaskId() uint64 {
return t.taskId
}
func (t *traceableTask) Inner() Task {
return t.task
}
// ToTraceableTask 将Task转为TraceableTask。
func ToTraceableTask(taskId uint64, task Task) TraceableTask {
t := &traceableTask{taskId: taskId, task: task}
return t
}
// TaskHandler 定义任务处理逻辑。
type TaskHandler interface {
// Handle 处理任务。如果返回的err不为nil,则会中断流处理。
// dispatch 用于转发任务至下一个节点。
Handle(inTask Task, dispatch func(outTask Task) error) (err error)
// OnCompleted 当前节点的任务处理完毕时,回调该方法。如果返回的err不为nil,则会中断流处理。
OnCompleted(dispatch func(outTask Task) error) (err error)
}
type TaskHandlerAdapter struct {
}
func (h TaskHandlerAdapter) OnCompleted(dispatch func(outTask Task) error) (err error) {
return nil
}
type TaskHandlerFuncAdapter struct {
TaskHandlerAdapter
handlerFunc func(inTask Task, dispatch func(outTask Task) error) (err error)
}
func (h TaskHandlerFuncAdapter) Handle(inTask Task, dispatch func(outTask Task) error) (err error) {
return h.handlerFunc(inTask, dispatch)
}
// NewSortTaskHandler 对任务按照taskId进行排序,taskId越小,越先被转发给下一个节点。
// taskStartId:第一个任务的id
// maxBlocking:队列的最大长度。如果超过最大长度,则会转发队列中taskId最小的任务。
func NewSortTaskHandler(taskStartId uint64, maxBlocking int) TaskHandler {
sh := &sortTaskHandler{
nextTaskId: taskStartId,
blocking: list.New(),
maxBlocking: maxBlocking,
}
return sh
}
type sortTaskHandler struct {
nextTaskId uint64
blocking *list.List
maxBlocking int
blockingStatistics int
}
func (h *sortTaskHandler) Handle(inTask Task, dispatch func(outTask Task) error) (err error) {
tt, ok := inTask.(TraceableTask)
if !ok {
return dispatch(inTask)
}
ttTaskId := tt.TaskId()
if ttTaskId < h.nextTaskId {
return dispatch(inTask)
}
if ttTaskId == h.nextTaskId {
err := dispatch(inTask)
if err != nil {
return err
}
h.nextTaskId++
} else if ttTaskId > h.nextTaskId { //按taskId顺序(前大,后小)放入h.blocking
h.pushToBlocking(tt)
}
return h.tryDispatchBlocking(dispatch)
}
func (h *sortTaskHandler) OnCompleted(dispatch func(outTask Task) error) (err error) {
log.Printf("runtime max blocking size: %d", h.blockingStatistics)
btask := h.blocking.Back()
for btask != nil {
btaskV := btask.Value.(TraceableTask)
err := dispatch(btaskV)
if err != nil {
return err
}
btaskPre := btask.Prev()
h.blocking.Remove(btask)
btask = btaskPre
}
return nil
}
func (h *sortTaskHandler) pushToBlocking(tt TraceableTask) {
defer func() {
if h.blocking.Len() > h.blockingStatistics {
h.blockingStatistics = h.blocking.Len()
}
}()
top := h.blocking.Front()
if top == nil {
h.blocking.PushFront(tt)
return
}
tmp := top
for {
tmpTask := tmp.Value.(*traceableTask)
if tt.TaskId() >= tmpTask.taskId {
h.blocking.InsertBefore(tt, tmp)
break
}
tmp = tmp.Next()
if tmp == nil {
h.blocking.PushBack(tt)
break
}
}
}
func (h *sortTaskHandler) tryDispatchBlocking(dispatch func(outTask Task) error) error {
btask := h.blocking.Back()
for btask != nil {
btaskV := btask.Value.(TraceableTask)
btTaskId := btaskV.TaskId()
if h.blocking.Len() > h.maxBlocking {
err := dispatch(btaskV)
if err != nil {
return err
}
h.nextTaskId = btTaskId + 1
} else if btTaskId == h.nextTaskId {
err := dispatch(btaskV)
if err != nil {
return err
}
h.nextTaskId++
} else if btTaskId < h.nextTaskId {
err := dispatch(btaskV)
if err != nil {
return err
}
} else {
break
}
btaskPre := btask.Prev()
h.blocking.Remove(btask)
btask = btaskPre
}
return nil
}
Go
1
https://gitee.com/mqyqingkong/flowprocess.git
git@gitee.com:mqyqingkong/flowprocess.git
mqyqingkong
flowprocess
flowprocess
master

搜索帮助