|
@@ -8,6 +8,7 @@ package flow
|
|
import (
|
|
import (
|
|
"errors"
|
|
"errors"
|
|
"fmt"
|
|
"fmt"
|
|
|
|
+ "log"
|
|
"reflect"
|
|
"reflect"
|
|
"sync"
|
|
"sync"
|
|
)
|
|
)
|
|
@@ -70,9 +71,13 @@ func (f *Flow) makeTrigger(id string, fn executorFunc) executorFunc {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// save makeExecutor with a bunch of type checks
|
|
|
|
+// we will create a less safer functions to get performance
|
|
func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
|
|
func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
|
|
return func(ctx OpCtx, params ...Data) (Data, error) {
|
|
return func(ctx OpCtx, params ...Data) (Data, error) {
|
|
var err error
|
|
var err error
|
|
|
|
+
|
|
|
|
+ //panic recoverer, since nodes are not our functions
|
|
defer func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
if r := recover(); r != nil {
|
|
err = fmt.Errorf("%v", r)
|
|
err = fmt.Errorf("%v", r)
|
|
@@ -80,7 +85,6 @@ func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
|
|
}
|
|
}
|
|
}()
|
|
}()
|
|
|
|
|
|
- // Should not need this
|
|
|
|
op := f.GetOp(id)
|
|
op := f.GetOp(id)
|
|
if op == nil {
|
|
if op == nil {
|
|
err = fmt.Errorf("invalid operation '%s'", id)
|
|
err = fmt.Errorf("invalid operation '%s'", id)
|
|
@@ -89,80 +93,32 @@ func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
|
|
}
|
|
}
|
|
op.Lock()
|
|
op.Lock()
|
|
defer op.Unlock()
|
|
defer op.Unlock()
|
|
|
|
+
|
|
|
|
+ // Load from cache if any
|
|
if ctx != nil {
|
|
if ctx != nil {
|
|
- if v, ok := ctx.Load(id); ok { // Cache
|
|
|
|
|
|
+ if v, ok := ctx.Load(id); ok {
|
|
return v, nil
|
|
return v, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- // Check inputs
|
|
|
|
- fnval := reflect.ValueOf(fn)
|
|
|
|
- if fnval.Type().NumIn() != len(op.inputs) {
|
|
|
|
- err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
|
|
|
|
- f.hooks.error(id, err)
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- /////////////////////////////
|
|
|
|
- // NEW PARALLEL PROCESSING
|
|
|
|
- ///////////
|
|
|
|
- //f.hooks.wait(id)
|
|
|
|
|
|
+ // Wait for inputs
|
|
f.hooks.wait(id)
|
|
f.hooks.wait(id)
|
|
|
|
|
|
- callParam := make([]reflect.Value, len(op.inputs))
|
|
|
|
- callErrors := ""
|
|
|
|
- paramMutex := sync.Mutex{}
|
|
|
|
- // Parallel processing if inputs
|
|
|
|
- wg := sync.WaitGroup{}
|
|
|
|
- wg.Add(len(op.inputs))
|
|
|
|
- for i, in := range op.inputs {
|
|
|
|
- go func(i int, in *operation) {
|
|
|
|
- defer wg.Done()
|
|
|
|
- // Sub function with type checking
|
|
|
|
- //Check the ctx
|
|
|
|
- fr, err := in.executor(ctx, params...)
|
|
|
|
- if err != nil {
|
|
|
|
- paramMutex.Lock()
|
|
|
|
- callErrors += err.Error() + "\n"
|
|
|
|
- paramMutex.Unlock()
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- if fr == nil {
|
|
|
|
- callParam[i] = reflect.Zero(fnval.Type().In(i))
|
|
|
|
- }
|
|
|
|
- res := reflect.ValueOf(fr)
|
|
|
|
- if !res.IsValid() {
|
|
|
|
- paramMutex.Lock()
|
|
|
|
- callErrors += fmt.Sprintf("Input %d invalid\n", i)
|
|
|
|
- paramMutex.Unlock()
|
|
|
|
- return
|
|
|
|
- } else if !res.Type().ConvertibleTo(fnval.Type().In(i)) {
|
|
|
|
- if fnval.Type().In(i).Kind() == reflect.String {
|
|
|
|
- callParam[i] = reflect.ValueOf(fmt.Sprint(res.Interface()))
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- paramMutex.Lock()
|
|
|
|
- callErrors += fmt.Sprintf("Input %d type: %v(%v) cannot be converted to %v\n", i, res.Type(), res.Interface(), fnval.Type().In(i))
|
|
|
|
- paramMutex.Unlock()
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- // can convert too?
|
|
|
|
-
|
|
|
|
- // CheckError and safelly append
|
|
|
|
- callParam[i] = res.Convert(fnval.Type().In(i))
|
|
|
|
- }(i, in)
|
|
|
|
- }
|
|
|
|
- wg.Wait()
|
|
|
|
- // Return type error checking
|
|
|
|
- if len(callErrors) > 0 {
|
|
|
|
- err := errors.New(callErrors)
|
|
|
|
|
|
+ fnval := reflect.ValueOf(fn)
|
|
|
|
+ callParam, err := f.processInputs(ctx, op, fnval, params...)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Println("ERR:", err)
|
|
f.hooks.error(id, err)
|
|
f.hooks.error(id, err)
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
|
|
// The actual operation process
|
|
// The actual operation process
|
|
- f.hooks.start(id)
|
|
|
|
|
|
|
|
|
|
+ // Func returned starting the process
|
|
|
|
+ f.hooks.start(id)
|
|
|
|
+ // if entry is special we pass the Flow?
|
|
fnret := fnval.Call(callParam)
|
|
fnret := fnval.Call(callParam)
|
|
|
|
+ // Output erroring
|
|
if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) {
|
|
if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) {
|
|
err, ok := fnret[1].Interface().(error)
|
|
err, ok := fnret[1].Interface().(error)
|
|
if !ok {
|
|
if !ok {
|
|
@@ -174,6 +130,8 @@ func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
|
|
|
|
|
|
// THE RESULT
|
|
// THE RESULT
|
|
ret := fnret[0].Interface()
|
|
ret := fnret[0].Interface()
|
|
|
|
+
|
|
|
|
+ // Store in the cache
|
|
if ctx != nil {
|
|
if ctx != nil {
|
|
ctx.Store(id, ret)
|
|
ctx.Store(id, ret)
|
|
}
|
|
}
|
|
@@ -182,27 +140,112 @@ func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+/////////////////////////////
|
|
|
|
+// NEW PARALLEL PROCESSING
|
|
|
|
+///////////
|
|
|
|
+func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, params ...Data) ([]reflect.Value, error) {
|
|
|
|
+ // Flow injector
|
|
|
|
+ nInputs := fnval.Type().NumIn()
|
|
|
|
+
|
|
|
|
+ // Total inputs
|
|
|
|
+ OcallParam := make([]reflect.Value, nInputs)
|
|
|
|
+ callParam := OcallParam
|
|
|
|
+ offs := 0
|
|
|
|
+
|
|
|
|
+ // Inject flow if the first param is of type Flow
|
|
|
|
+ if nInputs > 0 && fnval.Type().In(0) == reflect.TypeOf(f) {
|
|
|
|
+ OcallParam[0] = reflect.ValueOf(f)
|
|
|
|
+ offs = 1
|
|
|
|
+ nInputs--
|
|
|
|
+ //shift one to process inputs
|
|
|
|
+ callParam = OcallParam[1:]
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if nInputs != len(op.inputs) {
|
|
|
|
+ return nil, fmt.Errorf("expect %d inputs got %d", nInputs, len(op.inputs))
|
|
|
|
+ } //Wait
|
|
|
|
+
|
|
|
|
+ callErrors := ""
|
|
|
|
+ paramMutex := sync.Mutex{}
|
|
|
|
+ // Parallel processing if inputs
|
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
|
+ wg.Add(len(op.inputs))
|
|
|
|
+ for i, in := range op.inputs {
|
|
|
|
+ go func(i int, in *operation) {
|
|
|
|
+ defer wg.Done()
|
|
|
|
+ inTyp := fnval.Type().In(i + offs)
|
|
|
|
+ /////////////////
|
|
|
|
+ // Executor
|
|
|
|
+ fr, err := in.executor(ctx, params...)
|
|
|
|
+
|
|
|
|
+ paramMutex.Lock()
|
|
|
|
+ defer paramMutex.Unlock()
|
|
|
|
+ if err != nil {
|
|
|
|
+ callErrors += err.Error() + "\n"
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ if fr == nil {
|
|
|
|
+ callParam[i] = reflect.ValueOf(reflect.Zero(inTyp).Interface())
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ res := reflect.ValueOf(fr)
|
|
|
|
+ var cres reflect.Value
|
|
|
|
+
|
|
|
|
+ // Conversion effort
|
|
|
|
+ switch {
|
|
|
|
+ case !res.IsValid():
|
|
|
|
+ callErrors += fmt.Sprintf("Input %d invalid\n", i)
|
|
|
|
+ return
|
|
|
|
+ case !res.Type().ConvertibleTo(inTyp):
|
|
|
|
+ if inTyp.Kind() != reflect.String {
|
|
|
|
+ callErrors += fmt.Sprintf("Input %d type: %v(%v) cannot be converted to %v\n", i, res.Type(), res.Interface(), inTyp)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ cres = reflect.ValueOf(fmt.Sprint(res.Interface()))
|
|
|
|
+ default:
|
|
|
|
+ cres = res.Convert(inTyp)
|
|
|
|
+ }
|
|
|
|
+ // CheckError and safelly append
|
|
|
|
+ callParam[i] = cres
|
|
|
|
+ }(i, in)
|
|
|
|
+ }
|
|
|
|
+ wg.Wait()
|
|
|
|
+ if callErrors != "" {
|
|
|
|
+ return nil, errors.New(callErrors)
|
|
|
|
+ }
|
|
|
|
+ /*offs := 0
|
|
|
|
+ if fnval.Type().NumIn() > 0 && fnval.Type().In(0) == reflect.TypeOf(f) {
|
|
|
|
+ nInputs--
|
|
|
|
+ offs = 1
|
|
|
|
+ }*/
|
|
|
|
+
|
|
|
|
+ return OcallParam, nil
|
|
|
|
+}
|
|
|
|
+
|
|
// DefVar define var operation with optional initial
|
|
// DefVar define var operation with optional initial
|
|
func (f *Flow) DefVar(id string, name string, initial ...Data) Operation {
|
|
func (f *Flow) DefVar(id string, name string, initial ...Data) Operation {
|
|
// Unique
|
|
// Unique
|
|
- if _, ok := f.data[name]; !ok {
|
|
|
|
|
|
+ if _, ok := f.Data[name]; !ok {
|
|
var v interface{}
|
|
var v interface{}
|
|
if len(initial) > 0 {
|
|
if len(initial) > 0 {
|
|
v = initial[0]
|
|
v = initial[0]
|
|
}
|
|
}
|
|
- f.data[name] = v
|
|
|
|
|
|
+ f.Data[name] = v
|
|
}
|
|
}
|
|
- setter := func(v Data) { f.data[name] = v }
|
|
|
|
|
|
+ setter := func(v Data) { f.Data[name] = v }
|
|
|
|
|
|
opEntry := &operation{
|
|
opEntry := &operation{
|
|
- Mutex: sync.Mutex{},
|
|
|
|
- id: id,
|
|
|
|
- flow: f,
|
|
|
|
- name: fmt.Sprintf("(var)<%s>", name),
|
|
|
|
- kind: "var",
|
|
|
|
- inputs: nil,
|
|
|
|
- setter: setter,
|
|
|
|
- executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { return f.data[name], nil }),
|
|
|
|
|
|
+ Mutex: sync.Mutex{},
|
|
|
|
+ id: id,
|
|
|
|
+ flow: f,
|
|
|
|
+ name: fmt.Sprintf("(var)<%s>", name),
|
|
|
|
+ kind: "var",
|
|
|
|
+ inputs: nil,
|
|
|
|
+ setter: setter,
|
|
|
|
+ executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) {
|
|
|
|
+ // if f.data == nil we set from the init operation
|
|
|
|
+ return f.Data[name], nil
|
|
|
|
+ }),
|
|
}
|
|
}
|
|
f.operations.Store(id, opEntry)
|
|
f.operations.Store(id, opEntry)
|
|
return opEntry
|
|
return opEntry
|