package flow // // Find a way to improve this mess, maybe it can be merged in one func // // import ( "errors" "fmt" "reflect" "sync" ) // OpCtx operation Context type OpCtx = *sync.Map // NewOpCtx creates a running context func newOpCtx() OpCtx { return &sync.Map{} } // dumbSet func dumbSet(params ...Data) {} // Operation interface type Operation interface { // Id perhaps? ID() string Set(inputs ...Data) // Special var method Process(params ...Data) (Data, error) } // Run Context actually not OpCTX //local operation information type operation struct { flow *Flow id interface{} // Interface key kind string set func(params ...Data) process func(ctx OpCtx, params ...Data) (Data, error) } // Id returns string Id of the operaton func (o *operation) ID() string { return fmt.Sprint(o.id) } // Process operation process wrapper func (o *operation) Process(params ...Data) (Data, error) { return o.processWithCtx(newOpCtx(), params...) } // Every single one is run with this internally func (o *operation) processWithCtx(ctx OpCtx, params ...Data) (Data, error) { return o.process(ctx, params...) /*entry, _ := o.flow.getOp(fmt.Sprint(o.id)) if entry == nil { log.Println("Entry is nil for id:", o.id, ", why??") return nil } entry.Lock() defer entry.Unlock() if o.flow.err != nil { return nil } if ctx == nil { // No cache/Context } if v, ok := ctx.Load(o.id); ok { return v } res := o.process(ctx, params...) ctx.Store(o.id, res) return res*/ } // Set setter for certain operations (Var) func (o *operation) Set(params ...Data) { o.set(params...) } ////////////////////////////////////// // Operators definition /////////////// func opIn(f *Flow, id int) *operation { return &operation{ flow: f, id: id, kind: "in", set: dumbSet, process: func(ctx OpCtx, params ...Data) (Data, error) { if id >= len(params) || id < 0 { return nil, errors.New("invalid input") } return params[id], nil }, } } func opConst(f *Flow, id string) *operation { return &operation{ flow: f, id: id, kind: "const", set: dumbSet, process: func(ctx OpCtx, params ...Data) (Data, error) { ret := f.consts[id] return ret, nil }, } } // Debug type func opFunc(f *Flow, id string) *operation { return &operation{ flow: f, id: id, kind: "func", set: dumbSet, process: func(ctx OpCtx, params ...Data) (ret Data, err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("Panic: %v", r) f.hooks.error(id, err) } }() op, ok := f.getOp(id) if !ok { err = fmt.Errorf("invalid operation '%s'", id) f.hooks.error(id, err) return nil, err } /*if f.err != nil { return nil }*/ op.Lock() defer op.Unlock() if ctx != nil { if v, ok := ctx.Load(id); ok { // Cache return v, nil } } // Check inputs fnval := reflect.ValueOf(op.executor) if fnval.Type().NumIn() != len(op.inputs) { err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs)) f.hooks.error(id, err) return nil, err } ///////////////////////////// // NEW PARALLEL PROCESSING /////////// f.hooks.wait(id) callParam := make([]reflect.Value, len(op.inputs)) // Parallel processing if inputs wg := sync.WaitGroup{} wg.Add(len(op.inputs)) for i, in := range op.inputs { go func(i int, in *operation) { defer wg.Done() fr, err := in.processWithCtx(ctx, params...) if err != nil { return } callParam[i] = reflect.ValueOf(fr) }(i, in) } wg.Wait() // Return type checking errMsg := "" for i, p := range callParam { // TypeChecking checking if !p.IsValid() { errMsg += fmt.Sprintf("Input %d invalid\n", i) } else if !p.Type().AssignableTo(fnval.Type().In(i)) { errMsg += fmt.Sprintf("Input %d type mismatch expected: %v got :%v\n", i, fnval.Type().In(i), p.Type()) } } if len(errMsg) > 0 { err := errors.New(errMsg) f.hooks.error(id, err) return nil, err } f.hooks.start(id) fnret := fnval.Call(callParam) if len(fnret) > 1 && (fnret[1].Interface() != nil) { err, ok := fnret[1].Interface().(error) if !ok { err = errors.New("unknown error") } f.hooks.error(id, err) return nil, err } // THE RESULT ret = fnret[0].Interface() if ctx != nil { ctx.Store(id, ret) } f.hooks.finish(id, ret) return ret, nil }, } } func opVar(f *Flow, id string) *operation { return &operation{ flow: f, id: id, kind: "var", set: func(params ...Data) { f.data[id] = params[0] }, process: func(ctx OpCtx, params ...Data) (Data, error) { return f.data[id], nil }, } } func opNil(f *Flow) *operation { return &operation{ flow: f, kind: "nil", process: func(ctx OpCtx, params ...Data) (Data, error) { return nil, errors.New("Nil operation") }, } }