package flow // // Find a way to improve this mess, maybe it can be merged in one func // // import ( "errors" "fmt" "log" "reflect" "runtime/debug" "sync" ) type executorFunc func(OpCtx, ...Data) (Data, error) // Operation interface type Operation interface { // Id perhaps? //ID() string //Set(input Data) // Special var method Process(params ...Data) (Data, error) } type operation struct { sync.Mutex flow *Flow name string kind string // Could be a simple ID for operation, but it will depend on flow inputs []*operation // still figuring, might be Operation fn interface{} // Any func executor executorFunc } // OpCtx operation Context type OpCtx = *sync.Map // NewOpCtx creates a running context func newOpCtx() OpCtx { return &sync.Map{} } //func (o *operation) ID() string { return o.id } func (o *operation) Process(params ...Data) (Data, error) { // Create CTX ctx := newOpCtx() return o.executor(ctx, params...) } // make Executor for func func (f *Flow) asTrigger(op *operation, fn executorFunc) executorFunc { return func(ctx OpCtx, params ...Data) (Data, error) { f.hooks.start(op) //panic recoverer, since nodes are not our functions var err error var res Data func() { defer func() { if r := recover(); r != nil { log.Println("Panic:", r) debug.PrintStack() err = fmt.Errorf("%v", r) } }() res, err = fn(ctx, params...) }() if err != nil { f.hooks.error(op, err) } else { f.hooks.finish(op, res) } return res, err } } // safe makeExecutor with a bunch of type checks // we will create a less safer functions to get performance func (f *Flow) makeExecutor(op *operation, fn interface{}) executorFunc { // ExecutorFunc return func(ctx OpCtx, params ...Data) (Data, error) { /*op := f.GetOp(id) if op == nil { return nil, fmt.Errorf("invalid operation '%s'", id) }*/ op.Lock() defer op.Unlock() // Load from cache if any if ctx != nil { if v, ok := ctx.Load(op); ok { return v, nil } } // Change to wait to wait for the inputs f.hooks.wait(op) fnval := reflect.ValueOf(fn) callParam, err := f.processInputs(ctx, op, fnval, params...) if err != nil { return nil, err } // Start again and execute function f.hooks.start(op) fnret := fnval.Call(callParam) if len(fnret) == 0 { return nil, nil } // Output erroring if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) { err, ok := fnret[len(fnret)-1].Interface().(error) if !ok { err = errors.New("unknown error") } return nil, err } // THE RESULT ret := fnret[0].Interface() // Store in the cache if ctx != nil { ctx.Store(op, ret) } return ret, nil } } // processInputs will run a list of operations and return reflect values // to be processed next // NEW PARALLEL PROCESSING func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, params ...Data) ([]reflect.Value, error) { nInputs := fnval.Type().NumIn() // Total inputs callParam := make([]reflect.Value, nInputs) if nInputs != len(op.inputs) { return nil, fmt.Errorf("expect %d inputs got %d", nInputs, len(op.inputs)) } //Wait callErrors := "" paramMutex := sync.Mutex{} // Parallel processing if inputs wg := sync.WaitGroup{} wg.Add(len(op.inputs)) for i, in := range op.inputs { go func(i int, in *operation) { defer wg.Done() inTyp := fnval.Type().In(i) ///////////////// // Executor fr, err := in.executor(ctx, params...) //log.Println("Executing:", in.id, in.name, fr) paramMutex.Lock() defer paramMutex.Unlock() if err != nil { callErrors += err.Error() + "\n" return } if fr == nil { callParam[i] = reflect.Zero(inTyp) return } res := reflect.ValueOf(fr) var cres reflect.Value // Conversion effort switch { case !res.IsValid(): callErrors += fmt.Sprintf("Input %d invalid\n", i) return case !res.Type().ConvertibleTo(inTyp): if inTyp.Kind() != reflect.String { callErrors += fmt.Sprintf("Input %d type: %v(%v) cannot be converted to %v\n", i, res.Type(), res.Interface(), inTyp) log.Println(f) return } cres = reflect.ValueOf(fmt.Sprint(res.Interface())) default: cres = res.Convert(inTyp) } // CheckError and safelly append callParam[i] = cres }(i, in) } wg.Wait() // Check for any error if callErrors != "" { log.Println("Call errors:", callErrors) return nil, errors.New(callErrors) } return callParam, nil } // Var create a operation func (f *Flow) Var(name string, initial Data) Operation { // Input from params var input *operation switch v := initial.(type) { case *operation: input = v default: c := f.Const(v) input = c.(*operation) } op := &operation{ Mutex: sync.Mutex{}, flow: f, name: fmt.Sprintf("(var)<%s>", name), kind: "var", inputs: []*operation{input}, } op.executor = f.makeExecutor(op, func(initial Data) Data { val, ok := f.Data[name] if !ok { val = initial f.Data[name] = val } return val }) return op } // Var define var operation with optional initial /*func (f *Flow) Var(name string, initial ...Data) Operation { // Unique if _, ok := f.Data[name]; !ok { var v interface{} if len(initial) > 0 { v = initial[0] } f.Data[name] = v } op := &operation{ Mutex: sync.Mutex{}, flow: f, name: fmt.Sprintf("(var)<%s>", name), kind: "var", inputs: nil, } op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) { // if f.data == nil we set from the init operation return f.Data[name], nil }) f.operations = append(f.operations, op) //f.operations.Store(id, op) return op }*/ // Op Manual tag an Operation // Define operation for ID func (f *Flow) Op(name string, params ...interface{}) Operation { inputs := make([]*operation, len(params)) for i, p := range params { switch v := p.(type) { case *operation: inputs[i] = v default: c := f.Const(v) inputs[i], _ = c.(*operation) } } // If special executor we attach our func // Grab executor here registryFn, err := f.registry.Get(name) if err != nil { return f.ErrOp(err) } op := &operation{ Mutex: sync.Mutex{}, flow: f, name: name, kind: "func", inputs: inputs, } executor := f.makeExecutor(op, registryFn) op.executor = f.asTrigger(op, executor) f.operations = append(f.operations, op) return op } // ErrOp define a nil operation that will return error // Usefull for builders func (f *Flow) ErrOp(err error) Operation { op := &operation{ Mutex: sync.Mutex{}, //id: id, flow: f, name: fmt.Sprintf("(error)<%v>", err), kind: "error", inputs: nil, } op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) { return nil, err }) //f.operations = append(f.operations, op) return op } // Const define a const by defined ID func (f *Flow) Const(value Data) Operation { // Optimize this definition constID := -1 for k, v := range f.consts { if v == value { constID = k break } } if constID == -1 { constID = len(f.consts) f.consts = append(f.consts, value) } op := &operation{ Mutex: sync.Mutex{}, flow: f, name: fmt.Sprintf("(const)<%v:%v>", constID, value), kind: "const", inputs: nil, } op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) { return f.consts[constID], nil }) //f.operations = append(f.operations, op) return op } // In define input operation func (f *Flow) In(paramID int) Operation { op := &operation{ Mutex: sync.Mutex{}, flow: f, name: fmt.Sprintf("(in)<%d>", paramID), kind: "in", inputs: nil, } op.executor = f.asTrigger(op, func(ctx OpCtx, params ...Data) (Data, error) { if paramID < 0 || paramID >= len(params) { return nil, ErrInput } return params[paramID], nil }) f.operations = append(f.operations, op) return op }