5 Star 45 Fork 6

mqyqingkong / flowprocess

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
processfile_hadlerfunc_test.go 2.02 KB
一键复制 编辑 原始数据 按行查看 历史
mqyqingkong 提交于 2023-07-19 21:34 . add:HandlerFunc
package flowprocess_test
import (
"bufio"
"log"
"os"
"strings"
"sync"
"testing"
"time"
"gitee.com/mqyqingkong/flowprocess"
)
func Test_ProcessFileFlowWay_HandlerFunc(t *testing.T) {
start := time.Now()
fp := flowprocess.NewFlow()
queneCount := 4000
filepath := "testfile/2553.txt"
//Node-0: read file lines. We define 1 processor to read file.
fp.AddNodeHandlerFunc(1, queneCount, func(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error) {
f, err := os.Open(filepath)
if err != nil {
return err
}
defer f.Close()
sc := bufio.NewScanner(f)
for sc.Scan() {
line := sc.Text()
dispatch(line)
}
return nil
})
wordCount := map[string]int{}
lock := sync.Mutex{}
//Node-1: split and count. we define 4 parallel processors to split and count.
node1 := fp.AddNodeHandlerFunc(4, 1, func(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error) {
line := inTask.(string)
sps := splitText(line)
for i := 0; i < len(sps); i++ {
st := strings.TrimSpace(sps[i])
if len(st) > 0 {
lock.Lock()
wordCount[st]++
lock.Unlock()
}
}
return nil
})
node1.OnCompleted(func(cancelUnExpected bool, dispatch func(outTask flowprocess.Task) error) {
dispatch(wordCount)
})
//Node-2: sort.
fp.AddNodeHandlerFunc(1, 0, func(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error) {
sortedWc := sortWc(wordCount, true)
return dispatch(sortedWc)
})
fp.Start()
var sortedWc []wordAndCount
if res, ok := fp.Result(); ok {
sortedWc = res.([]wordAndCount)
duration := time.Since(start)
log.Printf("duration(ms):%v\n", duration.Milliseconds())
} else {
log.Fatal("wait flow result error")
}
expect := []wordAndCount{
{"the", 7480},
{"of", 4540},
{"to", 3825},
{"and", 3484},
{"her", 2484},
{"that", 2091},
{"was", 2076},
{"she", 1950},
{"in", 1898},
{"a", 1896},
}
topN := 10
for i := 0; i < topN; i++ {
if expect[i] != sortedWc[i] {
log.Fatalf("expect[%d] != sortedWc[%d]", i, i)
}
}
}
Go
1
https://gitee.com/mqyqingkong/flowprocess.git
git@gitee.com:mqyqingkong/flowprocess.git
mqyqingkong
flowprocess
flowprocess
master

搜索帮助