5 Star 45 Fork 6

mqyqingkong / flowprocess

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
flownode.go 4.54 KB
一键复制 编辑 原始数据 按行查看 历史
mqyqingkong 提交于 2023-07-19 21:34 . add:HandlerFunc
package flowprocess
import (
"context"
"errors"
"fmt"
"sync"
)
type NodeKey string
const (
NODE_ID NodeKey = "nodeId"
)
// node 定义了流处理引擎节点
type node struct {
processors []Processor
outChanSize int
outChan TaskChan
inChan TaskChan
done chan struct{}
id int
ctx context.Context
cancelFunc context.CancelFunc
preNode Node
processorSelector ProcessorSelector
onCompletedFunc func(cancelUnExpected bool, dispatch func(outTask Task) error)
cancelUnExpected bool
}
// NewNode 创建流处理引擎节点。processors 表示处理逻辑,每个 Processor独占一个goroutine。processorChanBufferSize 为输出通道的大小。id 为节点标识。
// preNode 指向上一个节点,取消任务时,会调用preNode.Cancel()。
func NewNode(processors []Processor, outChanSize int, id int, preNode Node) Node {
ctx, cancelFunc := context.WithCancel(context.WithValue(context.Background(), NODE_ID, id))
pu := &node{
processors: processors,
outChanSize: outChanSize,
outChan: make(TaskChan, outChanSize),
id: id,
ctx: ctx,
cancelFunc: cancelFunc,
preNode: preNode,
done: make(chan struct{}),
}
if preNode != nil {
pu.inChan = preNode.outputChan()
}
return pu
}
func (pu *node) process(in TaskChan) (out TaskChan) {
inchains := pu.splitTaskChan(in)
var wg sync.WaitGroup
wg.Add(len(pu.processors))
for i, processor := range pu.processors {
go func(index int, processor Processor, inTaskChan <-chan Task, outTaskChan chan<- Task) {
cancelAllProcess := processor.Proccess(inTaskChan, outTaskChan, pu.ctx)
if cancelAllProcess {
pu.Cancel()
pu.cancelUnExpected = true
}
wg.Done()
}(i, processor, inchains[i], pu.outChan)
}
go func() {
wg.Wait()
pu.onCompleted()
pu.Cancel()
close(pu.outChan)
close(pu.done)
}()
return pu.outChan
}
func (pu *node) Cancel() {
if pu.preNode != nil {
pu.preNode.Cancel()
}
pu.cancelFunc()
}
func (pu *node) Done() <-chan struct{} {
return pu.done
}
func (pu *node) SetProcessorSelector(processorSelector ProcessorSelector) {
pu.processorSelector = processorSelector
}
func (pu *node) SubmitTask(task Task) error {
if pu.inChan == nil {
return fmt.Errorf("can not submit task, the node[%d] dose not have InTaskChan", pu.id)
}
select {
case <-pu.ctx.Done():
return pu.ctx.Err()
case pu.inChan <- task:
}
return nil
}
func (pu *node) onCompleted() {
if pu.onCompletedFunc == nil {
return
}
dispatchFunc := func(outTask Task) error {
//to prevent blocking when task is cancelled
select {
case <-pu.ctx.Done():
return errors.New("task execution is cancelled, dispatch fail")
case pu.outChan <- outTask:
return nil
}
}
pu.onCompletedFunc(pu.cancelUnExpected, dispatchFunc)
}
func (pu *node) OnCompleted(f func(cancelUnExpected bool, dispatch func(outTask Task) error)) {
pu.onCompletedFunc = f
}
func (pu *node) outputChan() (out TaskChan) {
return pu.outChan
}
func (pu *node) outputChanSize() int {
return pu.outChanSize
}
func (pu *node) splitTaskChan(in TaskChan) (outs []TaskChan) {
outs = make([]TaskChan, len(pu.processors))
if pu.processorSelector == nil {
for i := 0; i < len(pu.processors); i++ {
outs[i] = in
}
return
}
processorInTaskChanIndex := pu.processorSelector.DefineProcessorInTaskChan()
if len(processorInTaskChanIndex) != len(pu.processors) {
panic(fmt.Errorf("DefineProcessorInTaskChan error, processorInTaskChanIndex's length must be %d", len(pu.processors)))
}
totalChan := maxInt(processorInTaskChanIndex) + 1
uniqueOuts := make([]TaskChan, totalChan)
for i := 0; i < totalChan; i++ {
uniqueOuts[i] = make(TaskChan, pu.preNode.outputChanSize())
}
for processorId, chanId := range processorInTaskChanIndex {
if chanId >= totalChan {
panic(fmt.Errorf("processor-%d's InTaskChan-index must less than %d", processorId, totalChan))
}
outs[processorId] = uniqueOuts[chanId]
}
closeOuts := func() {
for i := 0; i < len(uniqueOuts); i++ {
close(uniqueOuts[i])
}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-pu.ctx.Done():
return
case task, ok := <-in:
if !ok {
return
}
inTaskChanIndex := pu.processorSelector.SelectInTaskChanIndex(task)
select {
case <-pu.ctx.Done():
return
case uniqueOuts[inTaskChanIndex] <- task:
}
}
}
}()
go func() {
wg.Wait()
closeOuts()
}()
return
}
func maxInt(ints []int) int {
max := ints[0]
for i := 1; i < len(ints); i++ {
if max < ints[i] {
max = ints[i]
}
}
return max
}
Go
1
https://gitee.com/mqyqingkong/flowprocess.git
git@gitee.com:mqyqingkong/flowprocess.git
mqyqingkong
flowprocess
flowprocess
master

搜索帮助