代码拉取完成,页面将自动刷新
package flowprocess_test
import (
"bufio"
"log"
"os"
"strings"
"sync"
"testing"
"time"
"gitee.com/mqyqingkong/flowprocess"
)
func Test_ProcessFileFlowWay_HandlerFunc(t *testing.T) {
start := time.Now()
fp := flowprocess.NewFlow()
queneCount := 4000
filepath := "testfile/2553.txt"
//Node-0: read file lines. We define 1 processor to read file.
fp.AddNodeHandlerFunc(1, queneCount, func(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error) {
f, err := os.Open(filepath)
if err != nil {
return err
}
defer f.Close()
sc := bufio.NewScanner(f)
for sc.Scan() {
line := sc.Text()
dispatch(line)
}
return nil
})
wordCount := map[string]int{}
lock := sync.Mutex{}
//Node-1: split and count. we define 4 parallel processors to split and count.
node1 := fp.AddNodeHandlerFunc(4, 1, func(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error) {
line := inTask.(string)
sps := splitText(line)
for i := 0; i < len(sps); i++ {
st := strings.TrimSpace(sps[i])
if len(st) > 0 {
lock.Lock()
wordCount[st]++
lock.Unlock()
}
}
return nil
})
node1.OnCompleted(func(cancelUnExpected bool, dispatch func(outTask flowprocess.Task) error) {
dispatch(wordCount)
})
//Node-2: sort.
fp.AddNodeHandlerFunc(1, 0, func(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error) {
sortedWc := sortWc(wordCount, true)
return dispatch(sortedWc)
})
fp.Start()
var sortedWc []wordAndCount
if res, ok := fp.Result(); ok {
sortedWc = res.([]wordAndCount)
duration := time.Since(start)
log.Printf("duration(ms):%v\n", duration.Milliseconds())
} else {
log.Fatal("wait flow result error")
}
expect := []wordAndCount{
{"the", 7480},
{"of", 4540},
{"to", 3825},
{"and", 3484},
{"her", 2484},
{"that", 2091},
{"was", 2076},
{"she", 1950},
{"in", 1898},
{"a", 1896},
}
topN := 10
for i := 0; i < topN; i++ {
if expect[i] != sortedWc[i] {
log.Fatalf("expect[%d] != sortedWc[%d]", i, i)
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。