|
@@ -8,13 +8,12 @@ package flow
|
|
|
import (
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
- "log"
|
|
|
"reflect"
|
|
|
"runtime/debug"
|
|
|
"sync"
|
|
|
)
|
|
|
|
|
|
-type executorFunc func(OpCtx, ...Data) (Data, error)
|
|
|
+type executorFunc func(*Session, ...Data) (Data, error)
|
|
|
|
|
|
// Operation interface
|
|
|
type Operation interface { // Id perhaps?
|
|
@@ -32,44 +31,48 @@ type operation struct {
|
|
|
// Could be a simple ID for operation, but it will depend on flow
|
|
|
inputs []*operation // still figuring, might be Operation
|
|
|
|
|
|
- fn interface{} // Any func
|
|
|
- executor executorFunc
|
|
|
+ fn interface{} // the registry func
|
|
|
+ executor executorFunc // the executor?
|
|
|
+ //processor func(OpCtx, params ...Data) (Data, error)
|
|
|
}
|
|
|
|
|
|
-// OpCtx operation Context
|
|
|
-type OpCtx = *sync.Map
|
|
|
+// NewOperation creates an operation
|
|
|
+func (f *Flow) newOperation(kind string, inputs []*operation) *operation {
|
|
|
+ return &operation{
|
|
|
+ Mutex: sync.Mutex{},
|
|
|
+ flow: f,
|
|
|
+ kind: kind,
|
|
|
+ inputs: inputs,
|
|
|
+ //name: fmt.Sprintf("(var)<%s>", name),
|
|
|
+ }
|
|
|
|
|
|
-// NewOpCtx creates a running context
|
|
|
-func newOpCtx() OpCtx {
|
|
|
- return &sync.Map{}
|
|
|
}
|
|
|
|
|
|
-//func (o *operation) ID() string { return o.id }
|
|
|
-
|
|
|
-func (o *operation) Process(params ...Data) (Data, error) {
|
|
|
+// Process params are the global inputs
|
|
|
+func (o *operation) Process(ginputs ...Data) (Data, error) {
|
|
|
// Create CTX
|
|
|
- ctx := newOpCtx()
|
|
|
- return o.executor(ctx, params...)
|
|
|
-
|
|
|
+ s := o.flow.NewSession()
|
|
|
+ return s.Run(o, ginputs...)
|
|
|
}
|
|
|
|
|
|
// make Executor for func
|
|
|
-func (f *Flow) asTrigger(op *operation, fn executorFunc) executorFunc {
|
|
|
- return func(ctx OpCtx, params ...Data) (Data, error) {
|
|
|
+// safe run a func
|
|
|
+func (f *Flow) asTrigger(op *operation) executorFunc {
|
|
|
+ return func(sess *Session, params ...Data) (Data, error) {
|
|
|
f.hooks.start(op)
|
|
|
|
|
|
//panic recoverer, since nodes are not our functions
|
|
|
var err error
|
|
|
var res Data
|
|
|
+ // Safe thing
|
|
|
func() {
|
|
|
defer func() {
|
|
|
if r := recover(); r != nil {
|
|
|
- log.Println("Panic:", r)
|
|
|
debug.PrintStack()
|
|
|
err = fmt.Errorf("%v", r)
|
|
|
}
|
|
|
}()
|
|
|
- res, err = fn(ctx, params...)
|
|
|
+ res, err = op.executor(sess, params...)
|
|
|
}()
|
|
|
|
|
|
if err != nil {
|
|
@@ -82,235 +85,62 @@ func (f *Flow) asTrigger(op *operation, fn executorFunc) executorFunc {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// safe makeExecutor with a bunch of type checks
|
|
|
-// we will create a less safer functions to get performance
|
|
|
-func (f *Flow) makeExecutor(op *operation, fn interface{}) executorFunc {
|
|
|
- // ExecutorFunc
|
|
|
- return func(ctx OpCtx, params ...Data) (Data, error) {
|
|
|
-
|
|
|
- /*op := f.GetOp(id)
|
|
|
- if op == nil {
|
|
|
- return nil, fmt.Errorf("invalid operation '%s'", id)
|
|
|
- }*/
|
|
|
- op.Lock()
|
|
|
- defer op.Unlock()
|
|
|
-
|
|
|
- // Load from cache if any
|
|
|
- if ctx != nil {
|
|
|
- if v, ok := ctx.Load(op); ok {
|
|
|
- return v, nil
|
|
|
- }
|
|
|
- }
|
|
|
- // Change to wait to wait for the inputs
|
|
|
- f.hooks.wait(op)
|
|
|
-
|
|
|
- fnval := reflect.ValueOf(fn)
|
|
|
- callParam, err := f.processInputs(ctx, op, fnval, params...)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
-
|
|
|
- // Start again and execute function
|
|
|
- f.hooks.start(op)
|
|
|
- fnret := fnval.Call(callParam)
|
|
|
- if len(fnret) == 0 {
|
|
|
- return nil, nil
|
|
|
- }
|
|
|
- // Output erroring
|
|
|
- if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) {
|
|
|
- err, ok := fnret[len(fnret)-1].Interface().(error)
|
|
|
- if !ok {
|
|
|
- err = errors.New("unknown error")
|
|
|
- }
|
|
|
- return nil, err
|
|
|
- }
|
|
|
-
|
|
|
- // THE RESULT
|
|
|
- ret := fnret[0].Interface()
|
|
|
- // Store in the cache
|
|
|
- if ctx != nil {
|
|
|
- ctx.Store(op, ret)
|
|
|
- }
|
|
|
- return ret, nil
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
// processInputs will run a list of operations and return reflect values
|
|
|
// to be processed next
|
|
|
// NEW PARALLEL PROCESSING
|
|
|
-func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, params ...Data) ([]reflect.Value, error) {
|
|
|
- nInputs := fnval.Type().NumIn()
|
|
|
-
|
|
|
- // Total inputs
|
|
|
- callParam := make([]reflect.Value, nInputs)
|
|
|
-
|
|
|
- if nInputs != len(op.inputs) {
|
|
|
- return nil, fmt.Errorf("expect %d inputs got %d", nInputs, len(op.inputs))
|
|
|
- } //Wait
|
|
|
-
|
|
|
- callErrors := ""
|
|
|
- paramMutex := sync.Mutex{}
|
|
|
- // Parallel processing if inputs
|
|
|
- wg := sync.WaitGroup{}
|
|
|
- wg.Add(len(op.inputs))
|
|
|
- for i, in := range op.inputs {
|
|
|
- go func(i int, in *operation) {
|
|
|
- defer wg.Done()
|
|
|
- inTyp := fnval.Type().In(i)
|
|
|
- /////////////////
|
|
|
- // Executor
|
|
|
- fr, err := in.executor(ctx, params...)
|
|
|
- //log.Println("Executing:", in.id, in.name, fr)
|
|
|
-
|
|
|
- paramMutex.Lock()
|
|
|
- defer paramMutex.Unlock()
|
|
|
- if err != nil {
|
|
|
- callErrors += err.Error() + "\n"
|
|
|
- return
|
|
|
- }
|
|
|
- if fr == nil {
|
|
|
- callParam[i] = reflect.Zero(inTyp)
|
|
|
- return
|
|
|
- }
|
|
|
- res := reflect.ValueOf(fr)
|
|
|
- var cres reflect.Value
|
|
|
-
|
|
|
- // Conversion effort
|
|
|
- switch {
|
|
|
- case !res.IsValid():
|
|
|
- callErrors += fmt.Sprintf("Input %d invalid\n", i)
|
|
|
- return
|
|
|
- case !res.Type().ConvertibleTo(inTyp):
|
|
|
- if inTyp.Kind() != reflect.String {
|
|
|
- callErrors += fmt.Sprintf("Input %d type: %v(%v) cannot be converted to %v\n", i, res.Type(), res.Interface(), inTyp)
|
|
|
- log.Println(f)
|
|
|
- return
|
|
|
- }
|
|
|
- cres = reflect.ValueOf(fmt.Sprint(res.Interface()))
|
|
|
- default:
|
|
|
- cres = res.Convert(inTyp)
|
|
|
- }
|
|
|
- // CheckError and safelly append
|
|
|
- callParam[i] = cres
|
|
|
- }(i, in)
|
|
|
- }
|
|
|
- wg.Wait()
|
|
|
-
|
|
|
- // Check for any error
|
|
|
- if callErrors != "" {
|
|
|
- log.Println("Call errors:", callErrors)
|
|
|
- return nil, errors.New(callErrors)
|
|
|
- }
|
|
|
-
|
|
|
- return callParam, nil
|
|
|
-}
|
|
|
|
|
|
// Var create a operation
|
|
|
func (f *Flow) Var(name string, initial Data) Operation {
|
|
|
- // Input from params
|
|
|
- var input *operation
|
|
|
- switch v := initial.(type) {
|
|
|
- case *operation:
|
|
|
- input = v
|
|
|
- default:
|
|
|
- c := f.Const(v)
|
|
|
- input = c.(*operation)
|
|
|
- }
|
|
|
- op := &operation{
|
|
|
- Mutex: sync.Mutex{},
|
|
|
- flow: f,
|
|
|
- name: fmt.Sprintf("(var)<%s>", name),
|
|
|
- kind: "var",
|
|
|
- inputs: []*operation{input},
|
|
|
- }
|
|
|
+ inputs := f.makeInputs(initial)
|
|
|
|
|
|
- op.executor = f.makeExecutor(op, func(initial Data) Data {
|
|
|
+ op := f.newOperation("var", inputs)
|
|
|
+ op.name = fmt.Sprintf("(var)<%s>", name)
|
|
|
+
|
|
|
+ op.executor = func(sess *Session, ginputs ...Data) (Data, error) {
|
|
|
val, ok := f.Data[name]
|
|
|
if !ok {
|
|
|
+ var initial Data
|
|
|
+ f.hooks.wait(op)
|
|
|
+ res, err := sess.RunList(op.inputs, ginputs...)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if len(res) > 0 {
|
|
|
+ initial = res[0]
|
|
|
+ }
|
|
|
+
|
|
|
val = initial
|
|
|
f.Data[name] = val
|
|
|
}
|
|
|
- return val
|
|
|
- })
|
|
|
+ return val, nil
|
|
|
+ }
|
|
|
|
|
|
return op
|
|
|
|
|
|
}
|
|
|
|
|
|
-// Var define var operation with optional initial
|
|
|
-/*func (f *Flow) Var(name string, initial ...Data) Operation {
|
|
|
- // Unique
|
|
|
- if _, ok := f.Data[name]; !ok {
|
|
|
- var v interface{}
|
|
|
- if len(initial) > 0 {
|
|
|
- v = initial[0]
|
|
|
- }
|
|
|
- f.Data[name] = v
|
|
|
- }
|
|
|
-
|
|
|
- op := &operation{
|
|
|
- Mutex: sync.Mutex{},
|
|
|
- flow: f,
|
|
|
- name: fmt.Sprintf("(var)<%s>", name),
|
|
|
- kind: "var",
|
|
|
- inputs: nil,
|
|
|
- }
|
|
|
- op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) {
|
|
|
- // if f.data == nil we set from the init operation
|
|
|
- return f.Data[name], nil
|
|
|
- })
|
|
|
- f.operations = append(f.operations, op)
|
|
|
- //f.operations.Store(id, op)
|
|
|
- return op
|
|
|
-}*/
|
|
|
-
|
|
|
-// Op Manual tag an Operation
|
|
|
-// Define operation for ID
|
|
|
+// Op operation from registry
|
|
|
func (f *Flow) Op(name string, params ...interface{}) Operation {
|
|
|
- inputs := make([]*operation, len(params))
|
|
|
- for i, p := range params {
|
|
|
- switch v := p.(type) {
|
|
|
- case *operation:
|
|
|
- inputs[i] = v
|
|
|
- default:
|
|
|
- c := f.Const(v)
|
|
|
- inputs[i], _ = c.(*operation)
|
|
|
- }
|
|
|
- }
|
|
|
- // If special executor we attach our func
|
|
|
+ inputs := f.makeInputs(params...)
|
|
|
|
|
|
// Grab executor here
|
|
|
registryFn, err := f.registry.Get(name)
|
|
|
if err != nil {
|
|
|
return f.ErrOp(err)
|
|
|
}
|
|
|
- op := &operation{
|
|
|
- Mutex: sync.Mutex{},
|
|
|
- flow: f,
|
|
|
- name: name,
|
|
|
- kind: "func",
|
|
|
- inputs: inputs,
|
|
|
- }
|
|
|
- executor := f.makeExecutor(op, registryFn)
|
|
|
- op.executor = f.asTrigger(op, executor)
|
|
|
+ op := f.newOperation("func", inputs)
|
|
|
+ op.name = name
|
|
|
+ // make executor from registry func
|
|
|
+ op.executor = makeExecutor(op, registryFn)
|
|
|
f.operations = append(f.operations, op)
|
|
|
-
|
|
|
return op
|
|
|
}
|
|
|
|
|
|
// ErrOp define a nil operation that will return error
|
|
|
// Usefull for builders
|
|
|
func (f *Flow) ErrOp(err error) Operation {
|
|
|
- op := &operation{
|
|
|
- Mutex: sync.Mutex{},
|
|
|
- //id: id,
|
|
|
- flow: f,
|
|
|
- name: fmt.Sprintf("(error)<%v>", err),
|
|
|
- kind: "error",
|
|
|
- inputs: nil,
|
|
|
- }
|
|
|
- op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) { return nil, err })
|
|
|
- //f.operations = append(f.operations, op)
|
|
|
+ op := f.newOperation("error", nil)
|
|
|
+ op.executor = func(*Session, ...Data) (Data, error) { return nil, err }
|
|
|
return op
|
|
|
}
|
|
|
|
|
@@ -319,7 +149,7 @@ func (f *Flow) Const(value Data) Operation {
|
|
|
// Optimize this definition
|
|
|
constID := -1
|
|
|
for k, v := range f.consts {
|
|
|
- if v == value {
|
|
|
+ if reflect.DeepEqual(v, value) {
|
|
|
constID = k
|
|
|
break
|
|
|
}
|
|
@@ -329,14 +159,9 @@ func (f *Flow) Const(value Data) Operation {
|
|
|
f.consts = append(f.consts, value)
|
|
|
}
|
|
|
|
|
|
- op := &operation{
|
|
|
- Mutex: sync.Mutex{},
|
|
|
- flow: f,
|
|
|
- name: fmt.Sprintf("(const)<%v:%v>", constID, value),
|
|
|
- kind: "const",
|
|
|
- inputs: nil,
|
|
|
- }
|
|
|
- op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) { return f.consts[constID], nil })
|
|
|
+ op := f.newOperation("const", nil)
|
|
|
+ op.name = fmt.Sprintf("(const)<%v:%v>", constID, value)
|
|
|
+ op.executor = func(*Session, ...Data) (Data, error) { return f.consts[constID], nil }
|
|
|
//f.operations = append(f.operations, op)
|
|
|
|
|
|
return op
|
|
@@ -344,19 +169,73 @@ func (f *Flow) Const(value Data) Operation {
|
|
|
|
|
|
// In define input operation
|
|
|
func (f *Flow) In(paramID int) Operation {
|
|
|
- op := &operation{
|
|
|
- Mutex: sync.Mutex{},
|
|
|
- flow: f,
|
|
|
- name: fmt.Sprintf("(in)<%d>", paramID),
|
|
|
- kind: "in",
|
|
|
- inputs: nil,
|
|
|
- }
|
|
|
- op.executor = f.asTrigger(op, func(ctx OpCtx, params ...Data) (Data, error) {
|
|
|
- if paramID < 0 || paramID >= len(params) {
|
|
|
+
|
|
|
+ op := f.newOperation("in", nil)
|
|
|
+ op.name = fmt.Sprintf("(in)<%d>", paramID)
|
|
|
+ op.executor = func(sess *Session, ginputs ...Data) (Data, error) {
|
|
|
+ if paramID < 0 || paramID >= len(ginputs) {
|
|
|
return nil, ErrInput
|
|
|
}
|
|
|
- return params[paramID], nil
|
|
|
- })
|
|
|
- f.operations = append(f.operations, op)
|
|
|
+ return ginputs[paramID], nil
|
|
|
+ }
|
|
|
+ //f.operations = append(f.operations, op)
|
|
|
return op
|
|
|
}
|
|
|
+
|
|
|
+func (f *Flow) makeInputs(params ...Data) []*operation {
|
|
|
+ inputs := make([]*operation, len(params))
|
|
|
+ for i, p := range params {
|
|
|
+ switch v := p.(type) {
|
|
|
+ case *operation:
|
|
|
+ inputs[i] = v
|
|
|
+ default:
|
|
|
+ c := f.Const(v)
|
|
|
+ inputs[i], _ = c.(*operation)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return inputs
|
|
|
+}
|
|
|
+
|
|
|
+// make any go func as an executor
|
|
|
+// Trigger
|
|
|
+func makeExecutor(op *operation, fn interface{}) executorFunc {
|
|
|
+ // ExecutorFunc
|
|
|
+ return func(sess *Session, ginput ...Data) (Data, error) {
|
|
|
+ // Change to wait to wait for the inputs
|
|
|
+
|
|
|
+ op.flow.hooks.wait(op)
|
|
|
+ inRes, err := sess.RunList(op.inputs, ginput...)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ fnval := reflect.ValueOf(fn)
|
|
|
+ callParam := make([]reflect.Value, len(inRes))
|
|
|
+ for i, r := range inRes {
|
|
|
+ if r == nil {
|
|
|
+ callParam[i] = reflect.Zero(fnval.Type().In(i))
|
|
|
+ } else {
|
|
|
+ callParam[i] = reflect.ValueOf(r)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Start again and execute function
|
|
|
+ op.flow.hooks.start(op)
|
|
|
+ fnret := fnval.Call(callParam)
|
|
|
+ if len(fnret) == 0 {
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ // Output erroring
|
|
|
+ if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) {
|
|
|
+ err, ok := fnret[len(fnret)-1].Interface().(error)
|
|
|
+ if !ok {
|
|
|
+ err = errors.New("unknown error")
|
|
|
+ }
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ // THE RESULT
|
|
|
+ ret := fnret[0].Interface()
|
|
|
+ return ret, nil
|
|
|
+ }
|
|
|
+}
|