package flow // // Find a way to improve this mess, maybe it can be merged in one func // // import ( "errors" "fmt" "log" "reflect" "sync" ) type executorFunc func(OpCtx, ...Data) (Data, error) // Operation interface type Operation interface { // Id perhaps? ID() string Set(input Data) // Special var method Process(params ...Data) (Data, error) } // Will be named operation type operation struct { sync.Mutex flow *Flow id string name string kind string inputs []*operation // still figuring, might be Operation setter func(Data) executor executorFunc } // OpCtx operation Context type OpCtx = *sync.Map // NewOpCtx creates a running context func newOpCtx() OpCtx { return &sync.Map{} } func (o *operation) ID() string { return o.id } func (o *operation) Process(params ...Data) (Data, error) { // Create CTX ctx := newOpCtx() return o.executor(ctx, params...) //return executeOP(o.flow, o.id, ctx, params...) } func (o *operation) Set(data Data) { if o.setter != nil { o.setter(data) } } // make Executor for func func (f *Flow) makeTrigger(id string, fn executorFunc) executorFunc { return func(ctx OpCtx, params ...Data) (Data, error) { f.hooks.start(id) d, err := fn(ctx, params...) if err != nil { f.hooks.error(id, err) return d, err } f.hooks.finish(id, d) return d, err } } // save makeExecutor with a bunch of type checks // we will create a less safer functions to get performance func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc { return func(ctx OpCtx, params ...Data) (Data, error) { var err error //panic recoverer, since nodes are not our functions defer func() { if r := recover(); r != nil { err = fmt.Errorf("%v", r) f.hooks.error(id, err) } }() op := f.GetOp(id) if op == nil { err = fmt.Errorf("invalid operation '%s'", id) f.hooks.error(id, err) return nil, err } op.Lock() defer op.Unlock() // Load from cache if any if ctx != nil { if v, ok := ctx.Load(id); ok { return v, nil } } // Wait for inputs f.hooks.wait(id) fnval := reflect.ValueOf(fn) callParam, err := f.processInputs(ctx, op, fnval, params...) if err != nil { log.Println("ERR:", err) f.hooks.error(id, err) return nil, err } // The actual operation process // Func returned starting the process f.hooks.start(id) // if entry is special we pass the Flow? fnret := fnval.Call(callParam) // Output erroring if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) { err, ok := fnret[1].Interface().(error) if !ok { err = errors.New("unknown error") } f.hooks.error(id, err) return nil, err } // THE RESULT ret := fnret[0].Interface() // Store in the cache if ctx != nil { ctx.Store(id, ret) } f.hooks.finish(id, ret) return ret, nil } } ///////////////////////////// // NEW PARALLEL PROCESSING /////////// func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, params ...Data) ([]reflect.Value, error) { // Flow injector nInputs := fnval.Type().NumIn() // Total inputs OcallParam := make([]reflect.Value, nInputs) callParam := OcallParam offs := 0 // Inject flow if the first param is of type Flow if nInputs > 0 && fnval.Type().In(0) == reflect.TypeOf(f) { OcallParam[0] = reflect.ValueOf(f) offs = 1 nInputs-- //shift one to process inputs callParam = OcallParam[1:] } if nInputs != len(op.inputs) { return nil, fmt.Errorf("expect %d inputs got %d", nInputs, len(op.inputs)) } //Wait callErrors := "" paramMutex := sync.Mutex{} // Parallel processing if inputs wg := sync.WaitGroup{} wg.Add(len(op.inputs)) for i, in := range op.inputs { go func(i int, in *operation) { defer wg.Done() inTyp := fnval.Type().In(i + offs) ///////////////// // Executor fr, err := in.executor(ctx, params...) paramMutex.Lock() defer paramMutex.Unlock() if err != nil { callErrors += err.Error() + "\n" return } if fr == nil { callParam[i] = reflect.ValueOf(reflect.Zero(inTyp).Interface()) } res := reflect.ValueOf(fr) var cres reflect.Value // Conversion effort switch { case !res.IsValid(): callErrors += fmt.Sprintf("Input %d invalid\n", i) return case !res.Type().ConvertibleTo(inTyp): if inTyp.Kind() != reflect.String { callErrors += fmt.Sprintf("Input %d type: %v(%v) cannot be converted to %v\n", i, res.Type(), res.Interface(), inTyp) return } cres = reflect.ValueOf(fmt.Sprint(res.Interface())) default: cres = res.Convert(inTyp) } // CheckError and safelly append callParam[i] = cres }(i, in) } wg.Wait() if callErrors != "" { return nil, errors.New(callErrors) } /*offs := 0 if fnval.Type().NumIn() > 0 && fnval.Type().In(0) == reflect.TypeOf(f) { nInputs-- offs = 1 }*/ return OcallParam, nil } // DefVar define var operation with optional initial func (f *Flow) DefVar(id string, name string, initial ...Data) Operation { // Unique if _, ok := f.Data[name]; !ok { var v interface{} if len(initial) > 0 { v = initial[0] } f.Data[name] = v } setter := func(v Data) { f.Data[name] = v } opEntry := &operation{ Mutex: sync.Mutex{}, id: id, flow: f, name: fmt.Sprintf("(var)<%s>", name), kind: "var", inputs: nil, setter: setter, executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { // if f.data == nil we set from the init operation return f.Data[name], nil }), } f.operations.Store(id, opEntry) return opEntry } // DefOp Manual tag an Operation func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation, error) { inputs := make([]*operation, len(params)) for i, p := range params { switch v := p.(type) { case *operation: inputs[i] = v default: c, err := f.Const(v) if err != nil { return nil, err } inputs[i], _ = c.(*operation) } } // Grab executor here registryFn, err := f.registry.Get(name) if err != nil { return nil, err } executor := f.makeExecutor(id, registryFn) op := &operation{ Mutex: sync.Mutex{}, id: id, flow: f, name: name, kind: "func", inputs: inputs, setter: nil, // No set executor: executor, } f.operations.Store(id, op) return op, nil } // DefErrOp define a nil operation that will return error // Usefull for builders func (f *Flow) DefErrOp(id string, err error) (Operation, error) { op := &operation{ Mutex: sync.Mutex{}, id: id, flow: f, name: fmt.Sprintf("(error)<%v>", err), kind: "error", inputs: nil, setter: nil, executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { return nil, err }), } f.operations.Store(id, op) return op, nil } // DefConst define a const by defined ID func (f *Flow) DefConst(id string, value Data) (Operation, error) { // Optimize this definition f.consts[id] = value op := &operation{ id: id, Mutex: sync.Mutex{}, flow: f, name: fmt.Sprintf("(const)<%s>", id), kind: "const", inputs: nil, setter: nil, executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { return f.consts[id], nil }), } f.operations.Store(id, op) return op, nil } // DefIn define input operation func (f *Flow) DefIn(id string, paramID int) Operation { op := &operation{ id: id, Mutex: sync.Mutex{}, flow: f, name: fmt.Sprintf("(in)<%d>", paramID), kind: "in", inputs: nil, setter: nil, executor: f.makeTrigger(id, func(ctx OpCtx, params ...Data) (Data, error) { return params[paramID], nil }), } f.operations.Store(id, op) return op }