package flow // // Find a way to improve this mess, maybe it can be merged in one func // // import ( "errors" "fmt" "reflect" ) // OpCtx operation Context type OpCtx map[Operation]Data // NewOpCtx creates a running context func newOpCtx() OpCtx { return OpCtx{} } // dumbSet func dumbSet(params ...Data) {} // Operation interface type Operation interface { Set(inputs ...Data) // Special var method Process(params ...Data) Data } // Run Context actually not OpCTX //local operation information type operation struct { flow *Flow id interface{} // Interface key kind string set func(params ...Data) process func(ctx OpCtx, params ...Data) Data } // Process operation process wrapper func (o *operation) Process(params ...Data) Data { return o.processWithCtx(newOpCtx(), params...) } // Every single one is run with this internally func (o *operation) processWithCtx(ctx OpCtx, params ...Data) Data { if o.flow.err != nil { return nil } if ctx == nil { // No cache/Context return o.process(ctx, params...) } if v, ok := ctx[o]; ok { return v } res := o.process(ctx, params...) ctx[o] = res return res } // Set setter for certain operations (Var) func (o *operation) Set(params ...Data) { o.set(params...) } func opIn(f *Flow, id int) *operation { return &operation{ flow: f, id: id, kind: "in", set: dumbSet, process: func(ctx OpCtx, params ...Data) Data { if id >= len(params) || id < 0 { f.err = errors.New("invalid input") return nil } return params[id] }, } } func opConst(f *Flow, id int) *operation { return &operation{ flow: f, id: id, kind: "const", set: dumbSet, process: func(ctx OpCtx, params ...Data) Data { ret := f.consts[id] return ret }, } } func opFunc(f *Flow, id string) *operation { return &operation{ flow: f, id: id, kind: "func", set: dumbSet, process: func(ctx OpCtx, params ...Data) Data { op, ok := f.operations[id] if !ok { f.err = fmt.Errorf("invalid operation %s", id) return nil } callParam := make([]reflect.Value, len(op.inputs)) for i, in := range op.inputs { fr := in.processWithCtx(ctx, params...) if fr == nil { f.err = errors.New("returning nil") return nil } callParam[i] = reflect.ValueOf(fr) } return reflect.ValueOf(op.executor).Call(callParam)[0].Interface() }, } } func opVar(f *Flow, id string) *operation { return &operation{ flow: f, id: id, kind: "var", set: func(params ...Data) { f.data[id] = params[0] }, process: func(ctx OpCtx, params ...Data) Data { return f.data[id] }, } }