123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- package flow
- //
- // Find a way to improve this mess, maybe it can be merged in one func
- //
- //
- import (
- "errors"
- "fmt"
- "log"
- "reflect"
- "sync"
- )
- type executorFunc func(OpCtx, ...Data) (Data, error)
- // Operation interface
- type Operation interface { // Id perhaps?
- ID() string
- Set(input Data) // Special var method
- Process(params ...Data) (Data, error)
- }
- // Will be named operation
- type operation struct {
- sync.Mutex
- flow *Flow
- id string
- name string
- kind string
- inputs []*operation // still figuring, might be Operation
- setter func(Data)
- executor executorFunc
- }
- // OpCtx operation Context
- type OpCtx = *sync.Map
- // NewOpCtx creates a running context
- func newOpCtx() OpCtx {
- return &sync.Map{}
- }
- func (o *operation) ID() string { return o.id }
- func (o *operation) Process(params ...Data) (Data, error) {
- // Create CTX
- ctx := newOpCtx()
- return o.executor(ctx, params...)
- //return executeOP(o.flow, o.id, ctx, params...)
- }
- func (o *operation) Set(data Data) {
- if o.setter != nil {
- o.setter(data)
- }
- }
- // make Executor for func
- func (f *Flow) makeTrigger(id string, fn executorFunc) executorFunc {
- return func(ctx OpCtx, params ...Data) (Data, error) {
- f.hooks.start(id)
- d, err := fn(ctx, params...)
- if err != nil {
- f.hooks.error(id, err)
- return d, err
- }
- f.hooks.finish(id, d)
- return d, err
- }
- }
- // save makeExecutor with a bunch of type checks
- // we will create a less safer functions to get performance
- func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
- return func(ctx OpCtx, params ...Data) (Data, error) {
- var err error
- //panic recoverer, since nodes are not our functions
- defer func() {
- if r := recover(); r != nil {
- err = fmt.Errorf("%v", r)
- f.hooks.error(id, err)
- }
- }()
- op := f.GetOp(id)
- if op == nil {
- err = fmt.Errorf("invalid operation '%s'", id)
- f.hooks.error(id, err)
- return nil, err
- }
- op.Lock()
- defer op.Unlock()
- // Load from cache if any
- if ctx != nil {
- if v, ok := ctx.Load(id); ok {
- return v, nil
- }
- }
- // Wait for inputs
- f.hooks.wait(id)
- fnval := reflect.ValueOf(fn)
- callParam, err := f.processInputs(ctx, op, fnval, params...)
- if err != nil {
- log.Println("ERR:", err)
- f.hooks.error(id, err)
- return nil, err
- }
- // The actual operation process
- // Func returned starting the process
- f.hooks.start(id)
- // if entry is special we pass the Flow?
- fnret := fnval.Call(callParam)
- // Output erroring
- if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) {
- err, ok := fnret[1].Interface().(error)
- if !ok {
- err = errors.New("unknown error")
- }
- f.hooks.error(id, err)
- return nil, err
- }
- // THE RESULT
- ret := fnret[0].Interface()
- // Store in the cache
- if ctx != nil {
- ctx.Store(id, ret)
- }
- f.hooks.finish(id, ret)
- return ret, nil
- }
- }
- /////////////////////////////
- // NEW PARALLEL PROCESSING
- ///////////
- func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, params ...Data) ([]reflect.Value, error) {
- // Flow injector
- nInputs := fnval.Type().NumIn()
- // Total inputs
- OcallParam := make([]reflect.Value, nInputs)
- callParam := OcallParam
- offs := 0
- // Inject flow if the first param is of type Flow
- if nInputs > 0 && fnval.Type().In(0) == reflect.TypeOf(f) {
- OcallParam[0] = reflect.ValueOf(f)
- offs = 1
- nInputs--
- //shift one to process inputs
- callParam = OcallParam[1:]
- }
- if nInputs != len(op.inputs) {
- return nil, fmt.Errorf("expect %d inputs got %d", nInputs, len(op.inputs))
- } //Wait
- callErrors := ""
- paramMutex := sync.Mutex{}
- // Parallel processing if inputs
- wg := sync.WaitGroup{}
- wg.Add(len(op.inputs))
- for i, in := range op.inputs {
- go func(i int, in *operation) {
- defer wg.Done()
- inTyp := fnval.Type().In(i + offs)
- /////////////////
- // Executor
- fr, err := in.executor(ctx, params...)
- paramMutex.Lock()
- defer paramMutex.Unlock()
- if err != nil {
- callErrors += err.Error() + "\n"
- return
- }
- if fr == nil {
- callParam[i] = reflect.ValueOf(reflect.Zero(inTyp).Interface())
- }
- res := reflect.ValueOf(fr)
- var cres reflect.Value
- // Conversion effort
- switch {
- case !res.IsValid():
- callErrors += fmt.Sprintf("Input %d invalid\n", i)
- return
- case !res.Type().ConvertibleTo(inTyp):
- if inTyp.Kind() != reflect.String {
- callErrors += fmt.Sprintf("Input %d type: %v(%v) cannot be converted to %v\n", i, res.Type(), res.Interface(), inTyp)
- return
- }
- cres = reflect.ValueOf(fmt.Sprint(res.Interface()))
- default:
- cres = res.Convert(inTyp)
- }
- // CheckError and safelly append
- callParam[i] = cres
- }(i, in)
- }
- wg.Wait()
- if callErrors != "" {
- return nil, errors.New(callErrors)
- }
- /*offs := 0
- if fnval.Type().NumIn() > 0 && fnval.Type().In(0) == reflect.TypeOf(f) {
- nInputs--
- offs = 1
- }*/
- return OcallParam, nil
- }
- // DefVar define var operation with optional initial
- func (f *Flow) DefVar(id string, name string, initial ...Data) Operation {
- // Unique
- if _, ok := f.Data[name]; !ok {
- var v interface{}
- if len(initial) > 0 {
- v = initial[0]
- }
- f.Data[name] = v
- }
- setter := func(v Data) { f.Data[name] = v }
- opEntry := &operation{
- Mutex: sync.Mutex{},
- id: id,
- flow: f,
- name: fmt.Sprintf("(var)<%s>", name),
- kind: "var",
- inputs: nil,
- setter: setter,
- executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) {
- // if f.data == nil we set from the init operation
- return f.Data[name], nil
- }),
- }
- f.operations.Store(id, opEntry)
- return opEntry
- }
- // DefOp Manual tag an Operation
- func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation, error) {
- inputs := make([]*operation, len(params))
- for i, p := range params {
- switch v := p.(type) {
- case *operation:
- inputs[i] = v
- default:
- c, err := f.Const(v)
- if err != nil {
- return nil, err
- }
- inputs[i], _ = c.(*operation)
- }
- }
- // Grab executor here
- registryFn, err := f.registry.Get(name)
- if err != nil {
- return nil, err
- }
- executor := f.makeExecutor(id, registryFn)
- op := &operation{
- Mutex: sync.Mutex{},
- id: id,
- flow: f,
- name: name,
- kind: "func",
- inputs: inputs,
- setter: nil, // No set
- executor: executor,
- }
- f.operations.Store(id, op)
- return op, nil
- }
- // DefErrOp define a nil operation that will return error
- // Usefull for builders
- func (f *Flow) DefErrOp(id string, err error) (Operation, error) {
- op := &operation{
- Mutex: sync.Mutex{},
- id: id,
- flow: f,
- name: fmt.Sprintf("(error)<%v>", err),
- kind: "error",
- inputs: nil,
- setter: nil,
- executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { return nil, err }),
- }
- f.operations.Store(id, op)
- return op, nil
- }
- // DefConst define a const by defined ID
- func (f *Flow) DefConst(id string, value Data) (Operation, error) {
- // Optimize this definition
- f.consts[id] = value
- op := &operation{
- id: id,
- Mutex: sync.Mutex{},
- flow: f,
- name: fmt.Sprintf("(const)<%s>", id),
- kind: "const",
- inputs: nil,
- setter: nil,
- executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { return f.consts[id], nil }),
- }
- f.operations.Store(id, op)
- return op, nil
- }
- // DefIn define input operation
- func (f *Flow) DefIn(id string, paramID int) Operation {
- op := &operation{
- id: id,
- Mutex: sync.Mutex{},
- flow: f,
- name: fmt.Sprintf("(in)<%d>", paramID),
- kind: "in",
- inputs: nil,
- setter: nil,
- executor: f.makeTrigger(id, func(ctx OpCtx, params ...Data) (Data, error) { return params[paramID], nil }),
- }
- f.operations.Store(id, op)
- return op
- }
|