123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- package flow
- //
- // Find a way to improve this mess, maybe it can be merged in one func
- //
- //
- import (
- "errors"
- "fmt"
- "reflect"
- "sync"
- )
- // OpCtx operation Context
- type OpCtx = *sync.Map
- // NewOpCtx creates a running context
- func newOpCtx() OpCtx {
- return &sync.Map{}
- }
- // dumbSet
- func dumbSet(params ...Data) {}
- // Operation interface
- type Operation interface { // Id perhaps?
- ID() string
- Set(inputs ...Data) // Special var method
- Process(params ...Data) (Data, error)
- }
- // Run Context actually not OpCTX
- //local operation information
- type operation struct {
- flow *Flow
- id interface{} // Interface key
- kind string
- set func(params ...Data)
- process func(ctx OpCtx, params ...Data) (Data, error)
- }
- // Id returns string Id of the operaton
- func (o *operation) ID() string {
- return fmt.Sprint(o.id)
- }
- // Process operation process wrapper
- func (o *operation) Process(params ...Data) (Data, error) {
- return o.processWithCtx(newOpCtx(), params...)
- }
- // Every single one is run with this internally
- func (o *operation) processWithCtx(ctx OpCtx, params ...Data) (Data, error) {
- return o.process(ctx, params...)
- /*entry, _ := o.flow.getOp(fmt.Sprint(o.id))
- if entry == nil {
- log.Println("Entry is nil for id:", o.id, ", why??")
- return nil
- }
- entry.Lock()
- defer entry.Unlock()
- if o.flow.err != nil {
- return nil
- }
- if ctx == nil { // No cache/Context
- }
- if v, ok := ctx.Load(o.id); ok {
- return v
- }
- res := o.process(ctx, params...)
- ctx.Store(o.id, res)
- return res*/
- }
- // Set setter for certain operations (Var)
- func (o *operation) Set(params ...Data) {
- o.set(params...)
- }
- //////////////////////////////////////
- // Operators definition
- ///////////////
- func opIn(f *Flow, id int) *operation {
- return &operation{
- flow: f,
- id: id,
- kind: "in",
- set: dumbSet,
- process: func(ctx OpCtx, params ...Data) (Data, error) {
- if id >= len(params) || id < 0 {
- return nil, errors.New("invalid input")
- }
- return params[id], nil
- },
- }
- }
- func opConst(f *Flow, id string) *operation {
- return &operation{
- flow: f,
- id: id,
- kind: "const",
- set: dumbSet,
- process: func(ctx OpCtx, params ...Data) (Data, error) {
- ret := f.consts[id]
- return ret, nil
- },
- }
- }
- // Debug type
- func opFunc(f *Flow, id string) *operation {
- return &operation{
- flow: f,
- id: id,
- kind: "func",
- set: dumbSet,
- process: func(ctx OpCtx, params ...Data) (ret Data, err error) {
- defer func() {
- if r := recover(); r != nil {
- err = fmt.Errorf("Panic: %v", r)
- f.hooks.error(id, err)
- }
- }()
- op, ok := f.getOp(id)
- if !ok {
- err = fmt.Errorf("invalid operation '%s'", id)
- f.hooks.error(id, err)
- return nil, err
- }
- /*if f.err != nil {
- return nil
- }*/
- op.Lock()
- defer op.Unlock()
- if ctx != nil {
- if v, ok := ctx.Load(id); ok { // Cache
- return v, nil
- }
- }
- // Check inputs
- fnval := reflect.ValueOf(op.executor)
- if fnval.Type().NumIn() != len(op.inputs) {
- err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
- f.hooks.error(id, err)
- return nil, err
- }
- /////////////////////////////
- // NEW PARALLEL PROCESSING
- ///////////
- f.hooks.wait(id)
- callParam := make([]reflect.Value, len(op.inputs))
- // Parallel processing if inputs
- wg := sync.WaitGroup{}
- wg.Add(len(op.inputs))
- for i, in := range op.inputs {
- go func(i int, in *operation) {
- defer wg.Done()
- fr, err := in.processWithCtx(ctx, params...)
- if err != nil {
- return
- }
- callParam[i] = reflect.ValueOf(fr)
- }(i, in)
- }
- wg.Wait()
- // Return type checking
- errMsg := ""
- for i, p := range callParam {
- // TypeChecking checking
- if !p.IsValid() {
- errMsg += fmt.Sprintf("Input %d invalid\n", i)
- } else if !p.Type().AssignableTo(fnval.Type().In(i)) {
- errMsg += fmt.Sprintf("Input %d type mismatch expected: %v got :%v\n", i, fnval.Type().In(i), p.Type())
- }
- }
- if len(errMsg) > 0 {
- err := errors.New(errMsg)
- f.hooks.error(id, err)
- return nil, err
- }
- f.hooks.start(id)
- fnret := fnval.Call(callParam)
- if len(fnret) > 1 && (fnret[1].Interface() != nil) {
- err, ok := fnret[1].Interface().(error)
- if !ok {
- err = errors.New("unknown error")
- }
- f.hooks.error(id, err)
- return nil, err
- }
- // THE RESULT
- ret = fnret[0].Interface()
- if ctx != nil {
- ctx.Store(id, ret)
- }
- f.hooks.finish(id, ret)
- return ret, nil
- },
- }
- }
- func opVar(f *Flow, id string) *operation {
- return &operation{
- flow: f,
- id: id,
- kind: "var",
- set: func(params ...Data) { f.data[id] = params[0] },
- process: func(ctx OpCtx, params ...Data) (Data, error) { return f.data[id], nil },
- }
- }
- func opNil(f *Flow) *operation {
- return &operation{
- flow: f,
- kind: "nil",
- process: func(ctx OpCtx, params ...Data) (Data, error) { return nil, errors.New("Nil operation") },
- }
- }
|