代码拉取完成,页面将自动刷新
package flowprocess_test
import (
"fmt"
"log"
"testing"
"time"
"gitee.com/mqyqingkong/flowprocess"
)
func TestSortHandler(t *testing.T) {
startTime := time.Now()
queneCount := 4000
fp := flowprocess.NewFlow()
taskSize := 1000000
//Node-0
fp.AddNodeTaskHandlers(queneCount, &genTaskHandler{
taskSize: taskSize,
})
//Node-1
ph := &processTaskHandler{}
fp.AddNodeTaskHandlers(queneCount, ph, ph, ph, ph)
//Node-2
fp.AddNodeTaskHandlers(queneCount, flowprocess.NewSortTaskHandler(0, taskSize))
//Node-3
ch := &collectTaskHandler{
taskBody: make([]string, 0, taskSize),
}
fp.AddNodeTaskHandlers(0, ch)
fp.Start()
res, ok := fp.Result()
if !ok {
log.Fatal("wait flow result error")
}
resultStrs := res.([]string)
log.Printf("time used: %d ms", time.Since(startTime).Milliseconds())
for i := 0; i < taskSize; i++ {
expected := fmt.Sprintf("processed:task[%d]", i)
if resultStrs[i] != expected {
log.Fatalf("task compare failed, expected: %s, actual: %s", expected, resultStrs[i])
}
}
}
type genTaskHandler struct {
flowprocess.TaskHandlerAdapter
taskSize int
}
func (h genTaskHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error) {
var seq uint64
for i := 0; i < h.taskSize; i++ {
task := flowprocess.ToTraceableTask(seq, fmt.Sprintf("task[%d]", seq))
err := dispatch(task)
if err != nil {
log.Fatalln("genTaskHandler dispatch error:", err)
return err
}
seq++
}
return nil
}
type processTaskHandler struct {
flowprocess.TaskHandlerAdapter
}
func (h *processTaskHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error) {
tt := inTask.(flowprocess.TraceableTask)
newttBody := "processed:" + tt.Inner().(string)
dispatch(flowprocess.ToTraceableTask(tt.TaskId(), newttBody))
return nil
}
type collectTaskHandler struct {
taskBody []string
}
func (h *collectTaskHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error) {
tt := inTask.(flowprocess.TraceableTask)
h.taskBody = append(h.taskBody, tt.Inner().(string))
return nil
}
func (h *collectTaskHandler) OnCompleted(dispatch func(outTask flowprocess.Task) error) (err error) {
return dispatch(h.taskBody)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。