package flow // // Find a way to improve this mess, maybe it can be merged in one func // // import ( "errors" "fmt" "log" "reflect" "sync" "github.com/gohxs/prettylog" ) // OpCtx operation Context type OpCtx = *sync.Map // NewOpCtx creates a running context func newOpCtx() OpCtx { return &sync.Map{} } // dumbSet func dumbSet(params ...Data) {} // Operation interface type Operation interface { // Id perhaps? ID() string Set(inputs ...Data) // Special var method Process(params ...Data) Data } // Run Context actually not OpCTX //local operation information type operation struct { sync.Mutex flow *Flow id interface{} // Interface key kind string src string set func(params ...Data) process func(ctx OpCtx, params ...Data) Data } // Id returns string Id of the operaton func (o *operation) ID() string { return fmt.Sprint(o.id) } // Process operation process wrapper func (o *operation) Process(params ...Data) Data { return o.processWithCtx(newOpCtx(), params...) } // Every single one is run with this internally func (o *operation) processWithCtx(ctx OpCtx, params ...Data) Data { entry, _ := o.flow.getOp(fmt.Sprint(o.id)) log := prettylog.New(o.ID() + ":" + entry.name) log.Printf("Operation waiting") o.Lock() defer o.Unlock() log.Printf("Executing") log.Println("Context", ctx) if o.flow.err != nil { return nil } if ctx == nil { // No cache/Context log.Printf("Processing") return o.process(ctx, params...) } if v, ok := ctx.Load(o); ok { log.Printf("Cached") return v } log.Printf("Processing") res := o.process(ctx, params...) log.Println("Storing", res) ctx.Store(o, res) return res } // Set setter for certain operations (Var) func (o *operation) Set(params ...Data) { o.set(params...) } ////////////////////////////////////// // Operators definition /////////////// func opIn(f *Flow, id int) *operation { return &operation{ flow: f, id: id, kind: "in", set: dumbSet, process: func(ctx OpCtx, params ...Data) Data { if id >= len(params) || id < 0 { f.err = errors.New("invalid input") return nil } return params[id] }, } } func opConst(f *Flow, id string) *operation { return &operation{ flow: f, id: id, kind: "const", set: dumbSet, process: func(ctx OpCtx, params ...Data) Data { ret := f.consts[id] return ret }, } } func opFunc(f *Flow, id string) *operation { return &operation{ flow: f, id: id, kind: "func", set: dumbSet, process: func(ctx OpCtx, params ...Data) Data { op, ok := f.getOp(id) if !ok { f.err = fmt.Errorf("invalid operation '%s'", id) log.Println("Operation not ok", f.err) f.hooks.error(id, f.err) return nil } fnval := reflect.ValueOf(op.executor) if fnval.Type().NumIn() != len(op.inputs) { f.err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs)) f.hooks.error(id, f.err) log.Println("Operation not ok", f.err) return nil } ///////////////////////////// // NEW PARALLEL PROCESSING /////////// f.hooks.wait(id) callParam := make([]reflect.Value, len(op.inputs)) wg := sync.WaitGroup{} wg.Add(len(op.inputs)) for i, in := range op.inputs { go func(i int, in *operation) { defer wg.Done() fr := in.processWithCtx(ctx, params...) callParam[i] = reflect.ValueOf(fr) }(i, in) } wg.Wait() // Check params for _, p := range callParam { if !p.IsValid() { f.err = fmt.Errorf("Input failed", p) log.Println("Flow err:", f.err) f.hooks.error(id, f.err) return nil } } f.hooks.start(id) fnret := fnval.Call(callParam) if len(fnret) > 1 { if err := fnret[1].Interface().(error); err != nil { f.err = err log.Println("Flow err:", f.err) f.hooks.error(id, f.err) return nil } } // check fnret[1] for error ret := fnret[0].Interface() f.hooks.finish(id, ret) return ret }, } } func opVar(f *Flow, id string) *operation { return &operation{ flow: f, id: id, kind: "var", set: func(params ...Data) { f.data[id] = params[0] }, process: func(ctx OpCtx, params ...Data) Data { return f.data[id] }, } } func opNil(f *Flow) *operation { return &operation{ flow: f, kind: "nil", process: func(ctx OpCtx, params ...Data) Data { f.err = errors.New("Nil operation"); return nil }, } }