123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- package flow
- //
- // Find a way to improve this mess, maybe it can be merged in one func
- //
- //
- import (
- "errors"
- "fmt"
- "reflect"
- )
- // OpCtx operation Context
- type OpCtx map[Operation]Data
- // NewOpCtx creates a running context
- func newOpCtx() OpCtx {
- return OpCtx{}
- }
- // dumbSet
- func dumbSet(params ...Data) {}
- // Operation interface
- type Operation interface {
- Set(inputs ...Data) // Special var method
- Process(params ...Data) Data
- }
- // Run Context actually not OpCTX
- //local operation information
- type operation struct {
- flow *Flow
- id interface{} // Interface key
- kind string
- set func(params ...Data)
- process func(ctx OpCtx, params ...Data) Data
- }
- // Process operation process wrapper
- func (o *operation) Process(params ...Data) Data {
- return o.processWithCtx(newOpCtx(), params...)
- }
- // Every single one is run with this internally
- func (o *operation) processWithCtx(ctx OpCtx, params ...Data) Data {
- if o.flow.err != nil {
- return nil
- }
- if ctx == nil { // No cache/Context
- return o.process(ctx, params...)
- }
- if v, ok := ctx[o]; ok {
- return v
- }
- res := o.process(ctx, params...)
- ctx[o] = res
- return res
- }
- // Set setter for certain operations (Var)
- func (o *operation) Set(params ...Data) {
- o.set(params...)
- }
- func opIn(f *Flow, id int) *operation {
- return &operation{
- flow: f,
- id: id,
- kind: "in",
- set: dumbSet,
- process: func(ctx OpCtx, params ...Data) Data {
- if id >= len(params) || id < 0 {
- f.err = errors.New("invalid input")
- return nil
- }
- return params[id]
- },
- }
- }
- func opConst(f *Flow, id int) *operation {
- return &operation{
- flow: f,
- id: id,
- kind: "const",
- set: dumbSet,
- process: func(ctx OpCtx, params ...Data) Data {
- ret := f.consts[id]
- return ret
- },
- }
- }
- func opFunc(f *Flow, id string) *operation {
- return &operation{
- flow: f,
- id: id,
- kind: "func",
- set: dumbSet,
- process: func(ctx OpCtx, params ...Data) Data {
- op, ok := f.operations[id]
- if !ok {
- f.err = fmt.Errorf("invalid operation %s", id)
- return nil
- }
- callParam := make([]reflect.Value, len(op.inputs))
- for i, in := range op.inputs {
- fr := in.processWithCtx(ctx, params...)
- if fr == nil {
- f.err = errors.New("returning nil")
- return nil
- }
- callParam[i] = reflect.ValueOf(fr)
- }
- return reflect.ValueOf(op.executor).Call(callParam)[0].Interface()
- },
- }
- }
- func opVar(f *Flow, id string) *operation {
- return &operation{
- flow: f,
- id: id,
- kind: "var",
- set: func(params ...Data) { f.data[id] = params[0] },
- process: func(ctx OpCtx, params ...Data) Data { return f.data[id] },
- }
- }
|