5 Star 45 Fork 6

mqyqingkong / flowprocess

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
sort_test.go 2.22 KB
一键复制 编辑 原始数据 按行查看 历史
mqyqingkong 提交于 2023-02-23 18:05 . 支持task sort
package flowprocess_test
import (
"fmt"
"log"
"testing"
"time"
"gitee.com/mqyqingkong/flowprocess"
)
func TestSortHandler(t *testing.T) {
startTime := time.Now()
queneCount := 4000
fp := flowprocess.NewFlow()
taskSize := 1000000
//Node-0
fp.AddNodeTaskHandlers(queneCount, &genTaskHandler{
taskSize: taskSize,
})
//Node-1
ph := &processTaskHandler{}
fp.AddNodeTaskHandlers(queneCount, ph, ph, ph, ph)
//Node-2
fp.AddNodeTaskHandlers(queneCount, flowprocess.NewSortTaskHandler(0, taskSize))
//Node-3
ch := &collectTaskHandler{
taskBody: make([]string, 0, taskSize),
}
fp.AddNodeTaskHandlers(0, ch)
fp.Start()
res, ok := fp.Result()
if !ok {
log.Fatal("wait flow result error")
}
resultStrs := res.([]string)
log.Printf("time used: %d ms", time.Since(startTime).Milliseconds())
for i := 0; i < taskSize; i++ {
expected := fmt.Sprintf("processed:task[%d]", i)
if resultStrs[i] != expected {
log.Fatalf("task compare failed, expected: %s, actual: %s", expected, resultStrs[i])
}
}
}
type genTaskHandler struct {
flowprocess.TaskHandlerAdapter
taskSize int
}
func (h genTaskHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error) {
var seq uint64
for i := 0; i < h.taskSize; i++ {
task := flowprocess.ToTraceableTask(seq, fmt.Sprintf("task[%d]", seq))
err := dispatch(task)
if err != nil {
log.Fatalln("genTaskHandler dispatch error:", err)
return err
}
seq++
}
return nil
}
type processTaskHandler struct {
flowprocess.TaskHandlerAdapter
}
func (h *processTaskHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error) {
tt := inTask.(flowprocess.TraceableTask)
newttBody := "processed:" + tt.Inner().(string)
dispatch(flowprocess.ToTraceableTask(tt.TaskId(), newttBody))
return nil
}
type collectTaskHandler struct {
taskBody []string
}
func (h *collectTaskHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error) {
tt := inTask.(flowprocess.TraceableTask)
h.taskBody = append(h.taskBody, tt.Inner().(string))
return nil
}
func (h *collectTaskHandler) OnCompleted(dispatch func(outTask flowprocess.Task) error) (err error) {
return dispatch(h.taskBody)
}
Go
1
https://gitee.com/mqyqingkong/flowprocess.git
git@gitee.com:mqyqingkong/flowprocess.git
mqyqingkong
flowprocess
flowprocess
master

搜索帮助