5 Star 45 Fork 6

mqyqingkong / flowprocess

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
processfile_test.go 5.96 KB
一键复制 编辑 原始数据 按行查看 历史
mqyqingkong 提交于 2023-02-26 22:02 . 优化ProcessorSelector
package flowprocess_test
import (
"bufio"
"context"
"log"
"os"
"sort"
"strings"
"testing"
"time"
"gitee.com/mqyqingkong/flowprocess"
)
func Test_ProcessFileGeneralWay(t *testing.T) {
wordCount := map[string]int{}
reverse := true
//You can replace the file with a larger file.
file := "testfile/2553.txt"
start := time.Now()
f, err := os.Open(file)
if err != nil {
panic(err)
}
defer f.Close()
sc := bufio.NewScanner(f)
//split lines
for sc.Scan() {
line := sc.Text()
sps := splitText(line)
for i := 0; i < len(sps); i++ {
st := strings.TrimSpace(sps[i])
if len(st) > 0 {
wordCount[st]++
}
}
}
//sort by word occurrence times desc
sortedWc := sortWc(wordCount, reverse)
duration := time.Since(start)
//print elapsed time
log.Printf("duration(ms):%v\n", duration.Milliseconds())
expect := []wordAndCount{
{"the", 7480},
{"of", 4540},
{"to", 3825},
{"and", 3484},
{"her", 2484},
{"that", 2091},
{"was", 2076},
{"she", 1950},
{"in", 1898},
{"a", 1896},
}
topN := 10
for i := 0; i < topN; i++ {
if expect[i] != sortedWc[i] {
log.Fatalf("expect[%d] != sortedWc[%d]", i, i)
}
}
}
func Test_ProcessFileFlowWay(t *testing.T) {
start := time.Now()
fp := flowprocess.NewFlow()
queneCount := 4000
//Node-0: read file lines. We define 1 processor to read file.
fp.AddNodeProcessors(queneCount,
&ReadFileProcessor{
//You can replace the file with a larger file.
Filepath: "testfile/2553.txt",
})
//Node-1: split and count. we define 4 parallel processors to split and count.
node1 := fp.AddNodeProcessors(queneCount,
&SplitAndCountProcessor{},
&SplitAndCountProcessor{},
&SplitAndCountProcessor{},
&SplitAndCountProcessor{},
)
node1.SetProcessorSelector(&SplitAndCountProcessorSelector{})
result := &SumWordCountProcessor{
reverse: true,
}
//Node-2: we define 1 processor to summarize.
fp.AddNodeProcessors(1,
result,
)
fp.Start()
var sortedWc []wordAndCount
if res, ok := fp.Result(); ok {
sortedWc = res.([]wordAndCount)
duration := time.Since(start)
log.Printf("duration(ms):%v\n", duration.Milliseconds())
} else {
log.Fatal("wait flow result error")
}
expect := []wordAndCount{
{"the", 7480},
{"of", 4540},
{"to", 3825},
{"and", 3484},
{"her", 2484},
{"that", 2091},
{"was", 2076},
{"she", 1950},
{"in", 1898},
{"a", 1896},
}
topN := 10
for i := 0; i < topN; i++ {
if expect[i] != sortedWc[i] {
log.Fatalf("expect[%d] != sortedWc[%d]", i, i)
}
}
}
//ReadFileProcessor reads file lines, and put the line into a OutTaskChan for next flow-node to process.
type ReadFileProcessor struct {
Filepath string
}
func (g *ReadFileProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool) {
f, err := os.Open(g.Filepath)
if err != nil {
panic(err)
}
defer f.Close()
sc := bufio.NewScanner(f)
for sc.Scan() {
select {
case <-ctx.Done():
return
default:
line := sc.Text()
outTask <- line
}
}
return
}
// node0 -------(不包含'and'、'of')----> node1-processor0、node1-processor1
// |----(包含'and'、'of')----> node1-processor2、node1-processor3
type SplitAndCountProcessorSelector struct {
}
// DefineInTaskChan 共4个processor,前2个processor用chan0,后2个用chan1
func (s *SplitAndCountProcessorSelector) DefineProcessorInTaskChan() (processorInTaskChanIndexes []int) {
return []int{0, 0, 1, 1}
}
// DefineInTaskChan 行中包含'and'、'of',转发给chan1,否则转发给chan0
func (s *SplitAndCountProcessorSelector) SelectInTaskChanIndex(task flowprocess.Task) (inTaskChanIndex int) {
line := task.(string)
if strings.Contains(line, "and") || strings.Contains(line, "of") {
return 1
}
return 0
}
//SplitAndCountProcessor splits the line and counts the word occurrence.
type SplitAndCountProcessor struct {
}
func (s *SplitAndCountProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool) {
wordCount := map[string]int{}
for {
select {
case <-ctx.Done():
return true
case task, ok := <-inTasks:
if ok {
line := task.(string)
sps := splitText(line)
for i := 0; i < len(sps); i++ {
st := strings.TrimSpace(sps[i])
if len(st) > 0 {
wordCount[st]++
}
}
} else {
outTask <- wordCount
return
}
}
}
}
func splitText(text string) []string {
text = strings.ReplaceAll(text, ".", "")
text = strings.ReplaceAll(text, ",", "")
text = strings.ReplaceAll(text, "?", "")
text = strings.ReplaceAll(text, "!", "")
text = strings.ReplaceAll(text, ":", "")
text = strings.ReplaceAll(text, "\"", "")
text = strings.ReplaceAll(text, "(", "")
text = strings.ReplaceAll(text, ")", "")
if len(text) > 0 {
sps := strings.Split(text, " ")
return sps
} else {
return nil
}
}
//SumWordCountProcessor summarizes the word occurrence.
type SumWordCountProcessor struct {
reverse bool
}
func (s *SumWordCountProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool) {
wordCount := map[string]int{}
for {
select {
case <-ctx.Done():
return true
case task, ok := <-inTasks:
if ok {
wc := task.(map[string]int)
for key, val := range wc {
wordCount[key] += val
}
} else {
sortedWc := sortWc(wordCount, s.reverse)
outTask <- sortedWc
return
}
}
}
}
func sortWc(wc map[string]int, reverse bool) []wordAndCount {
wcArr := make(wordAndCounts, len(wc))
i := 0
for key, val := range wc {
wcArr[i] = wordAndCount{
word: key,
count: val,
}
i++
}
if reverse {
sort.Sort(sort.Reverse(wcArr))
return wcArr
}
sort.Sort(wcArr)
return wcArr
}
type wordAndCount struct {
word string
count int
}
type wordAndCounts []wordAndCount
func (p wordAndCounts) Len() int { return len(p) }
func (p wordAndCounts) Less(i, j int) bool { return p[i].count < p[j].count }
func (p wordAndCounts) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
Go
1
https://gitee.com/mqyqingkong/flowprocess.git
git@gitee.com:mqyqingkong/flowprocess.git
mqyqingkong
flowprocess
flowprocess
master

搜索帮助