|
@@ -8,8 +8,9 @@ package flow
|
|
import (
|
|
import (
|
|
"errors"
|
|
"errors"
|
|
"fmt"
|
|
"fmt"
|
|
|
|
+ "path"
|
|
"reflect"
|
|
"reflect"
|
|
- "runtime/debug"
|
|
|
|
|
|
+ "runtime"
|
|
"sync"
|
|
"sync"
|
|
)
|
|
)
|
|
|
|
|
|
@@ -17,100 +18,72 @@ type executorFunc func(*Session, ...Data) (Data, error)
|
|
|
|
|
|
// Operation interface
|
|
// Operation interface
|
|
type Operation interface { // Id perhaps?
|
|
type Operation interface { // Id perhaps?
|
|
- //ID() string
|
|
|
|
- //Set(input Data) // Special var method
|
|
|
|
Process(params ...Data) (Data, error)
|
|
Process(params ...Data) (Data, error)
|
|
}
|
|
}
|
|
|
|
|
|
type operation struct {
|
|
type operation struct {
|
|
sync.Mutex
|
|
sync.Mutex
|
|
- flow *Flow
|
|
|
|
- name string
|
|
|
|
- kind string
|
|
|
|
-
|
|
|
|
- // Could be a simple ID for operation, but it will depend on flow
|
|
|
|
- inputs []*operation // still figuring, might be Operation
|
|
|
|
-
|
|
|
|
- fn interface{} // the registry func
|
|
|
|
|
|
+ flow *Flow
|
|
|
|
+ name string
|
|
|
|
+ kind string
|
|
|
|
+ inputs []*operation // still figuring, might be Operation
|
|
executor executorFunc // the executor?
|
|
executor executorFunc // the executor?
|
|
- //processor func(OpCtx, params ...Data) (Data, error)
|
|
|
|
|
|
+
|
|
|
|
+ // Debug information for each operation
|
|
|
|
+ file string
|
|
|
|
+ line int
|
|
}
|
|
}
|
|
|
|
|
|
// NewOperation creates an operation
|
|
// NewOperation creates an operation
|
|
func (f *Flow) newOperation(kind string, inputs []*operation) *operation {
|
|
func (f *Flow) newOperation(kind string, inputs []*operation) *operation {
|
|
|
|
+ _, file, line, _ := runtime.Caller(2) // outside of operation.go?
|
|
return &operation{
|
|
return &operation{
|
|
Mutex: sync.Mutex{},
|
|
Mutex: sync.Mutex{},
|
|
flow: f,
|
|
flow: f,
|
|
kind: kind,
|
|
kind: kind,
|
|
inputs: inputs,
|
|
inputs: inputs,
|
|
|
|
+
|
|
|
|
+ file: file,
|
|
|
|
+ line: line,
|
|
//name: fmt.Sprintf("(var)<%s>", name),
|
|
//name: fmt.Sprintf("(var)<%s>", name),
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
|
|
+func (o *operation) String() string {
|
|
|
|
+ _, file := path.Split(o.file)
|
|
|
|
+ return fmt.Sprintf("[%s:%d]:{%s,%s}", file, o.line, o.kind, o.name)
|
|
|
|
+}
|
|
|
|
|
|
-// Process params are the global inputs
|
|
|
|
|
|
+// Process the operation with a new session
|
|
|
|
+// ginputs are the global inputs
|
|
func (o *operation) Process(ginputs ...Data) (Data, error) {
|
|
func (o *operation) Process(ginputs ...Data) (Data, error) {
|
|
- // Create CTX
|
|
|
|
s := o.flow.NewSession()
|
|
s := o.flow.NewSession()
|
|
- return s.Run(o, ginputs...)
|
|
|
|
|
|
+ return s.run(o, ginputs...)
|
|
}
|
|
}
|
|
|
|
|
|
-// make Executor for func
|
|
|
|
-// safe run a func
|
|
|
|
-func (f *Flow) asTrigger(op *operation) executorFunc {
|
|
|
|
- return func(sess *Session, params ...Data) (Data, error) {
|
|
|
|
- f.hooks.start(op)
|
|
|
|
-
|
|
|
|
- //panic recoverer, since nodes are not our functions
|
|
|
|
- var err error
|
|
|
|
- var res Data
|
|
|
|
- // Safe thing
|
|
|
|
- func() {
|
|
|
|
- defer func() {
|
|
|
|
- if r := recover(); r != nil {
|
|
|
|
- debug.PrintStack()
|
|
|
|
- err = fmt.Errorf("%v", r)
|
|
|
|
- }
|
|
|
|
- }()
|
|
|
|
- res, err = op.executor(sess, params...)
|
|
|
|
- }()
|
|
|
|
-
|
|
|
|
- if err != nil {
|
|
|
|
- f.hooks.error(op, err)
|
|
|
|
- } else {
|
|
|
|
- f.hooks.finish(op, res)
|
|
|
|
- }
|
|
|
|
- return res, err
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// processInputs will run a list of operations and return reflect values
|
|
|
|
-// to be processed next
|
|
|
|
-// NEW PARALLEL PROCESSING
|
|
|
|
-
|
|
|
|
// Var create a operation
|
|
// Var create a operation
|
|
func (f *Flow) Var(name string, initial Data) Operation {
|
|
func (f *Flow) Var(name string, initial Data) Operation {
|
|
inputs := f.makeInputs(initial)
|
|
inputs := f.makeInputs(initial)
|
|
|
|
|
|
op := f.newOperation("var", inputs)
|
|
op := f.newOperation("var", inputs)
|
|
- op.name = fmt.Sprintf("(var)<%s>", name)
|
|
|
|
-
|
|
|
|
op.executor = func(sess *Session, ginputs ...Data) (Data, error) {
|
|
op.executor = func(sess *Session, ginputs ...Data) (Data, error) {
|
|
- val, ok := f.Data[name]
|
|
|
|
|
|
+ if name == "" {
|
|
|
|
+ return nil, errors.New("Invalid name")
|
|
|
|
+ }
|
|
|
|
+ val, ok := f.Data.Load(name)
|
|
if !ok {
|
|
if !ok {
|
|
var initial Data
|
|
var initial Data
|
|
- f.hooks.wait(op)
|
|
|
|
- res, err := sess.RunList(op.inputs, ginputs...)
|
|
|
|
|
|
+ res, err := sess.processInputs(op, ginputs...)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
+
|
|
if len(res) > 0 {
|
|
if len(res) > 0 {
|
|
initial = res[0]
|
|
initial = res[0]
|
|
}
|
|
}
|
|
|
|
|
|
val = initial
|
|
val = initial
|
|
- f.Data[name] = val
|
|
|
|
|
|
+ f.Data.Store(name, val)
|
|
}
|
|
}
|
|
return val, nil
|
|
return val, nil
|
|
}
|
|
}
|
|
@@ -121,14 +94,15 @@ func (f *Flow) Var(name string, initial Data) Operation {
|
|
func (f *Flow) SetVar(name string, data Data) Operation {
|
|
func (f *Flow) SetVar(name string, data Data) Operation {
|
|
inputs := f.makeInputs(data)
|
|
inputs := f.makeInputs(data)
|
|
op := f.newOperation("setvar", inputs)
|
|
op := f.newOperation("setvar", inputs)
|
|
- op.name = fmt.Sprintf("(setvar)<%s>", name)
|
|
|
|
op.executor = func(sess *Session, ginputs ...Data) (Data, error) {
|
|
op.executor = func(sess *Session, ginputs ...Data) (Data, error) {
|
|
- f.hooks.wait(op)
|
|
|
|
- res, err := sess.RunList(op.inputs, ginputs...)
|
|
|
|
|
|
+ if name == "" {
|
|
|
|
+ return nil, errors.New("Invalid name")
|
|
|
|
+ }
|
|
|
|
+ res, err := sess.processInputs(op, ginputs...)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
- f.Data[name] = res[0]
|
|
|
|
|
|
+ f.Data.Store(name, res[0])
|
|
return res[0], nil
|
|
return res[0], nil
|
|
}
|
|
}
|
|
|
|
|
|
@@ -176,25 +150,19 @@ func (f *Flow) Const(value Data) Operation {
|
|
}
|
|
}
|
|
|
|
|
|
op := f.newOperation("const", nil)
|
|
op := f.newOperation("const", nil)
|
|
- op.name = fmt.Sprintf("(const)<%v:%v>", constID, value)
|
|
|
|
op.executor = func(*Session, ...Data) (Data, error) { return f.consts[constID], nil }
|
|
op.executor = func(*Session, ...Data) (Data, error) { return f.consts[constID], nil }
|
|
- //f.operations = append(f.operations, op)
|
|
|
|
-
|
|
|
|
return op
|
|
return op
|
|
}
|
|
}
|
|
|
|
|
|
// In define input operation
|
|
// In define input operation
|
|
func (f *Flow) In(paramID int) Operation {
|
|
func (f *Flow) In(paramID int) Operation {
|
|
-
|
|
|
|
op := f.newOperation("in", nil)
|
|
op := f.newOperation("in", nil)
|
|
- op.name = fmt.Sprintf("(in)<%d>", paramID)
|
|
|
|
op.executor = func(sess *Session, ginputs ...Data) (Data, error) {
|
|
op.executor = func(sess *Session, ginputs ...Data) (Data, error) {
|
|
if paramID < 0 || paramID >= len(ginputs) {
|
|
if paramID < 0 || paramID >= len(ginputs) {
|
|
return nil, ErrInput
|
|
return nil, ErrInput
|
|
}
|
|
}
|
|
return ginputs[paramID], nil
|
|
return ginputs[paramID], nil
|
|
}
|
|
}
|
|
- //f.operations = append(f.operations, op)
|
|
|
|
return op
|
|
return op
|
|
}
|
|
}
|
|
|
|
|
|
@@ -213,20 +181,22 @@ func (f *Flow) makeInputs(params ...Data) []*operation {
|
|
}
|
|
}
|
|
|
|
|
|
// make any go func as an executor
|
|
// make any go func as an executor
|
|
-// Trigger
|
|
|
|
func makeExecutor(op *operation, fn interface{}) executorFunc {
|
|
func makeExecutor(op *operation, fn interface{}) executorFunc {
|
|
|
|
+ fnval := reflect.ValueOf(fn)
|
|
|
|
+ callParam := make([]reflect.Value, fnval.Type().NumIn())
|
|
|
|
+
|
|
// ExecutorFunc
|
|
// ExecutorFunc
|
|
- return func(sess *Session, ginput ...Data) (Data, error) {
|
|
|
|
|
|
+ return func(sess *Session, ginputs ...Data) (Data, error) {
|
|
// Change to wait to wait for the inputs
|
|
// Change to wait to wait for the inputs
|
|
-
|
|
|
|
- op.flow.hooks.wait(op)
|
|
|
|
- inRes, err := sess.RunList(op.inputs, ginput...)
|
|
|
|
|
|
+ inRes, err := sess.processInputs(op, ginputs...)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
+ // If the fn is a special we execute directly
|
|
|
|
+ if gFn, ok := fn.(func(...Data) (Data, error)); ok {
|
|
|
|
+ return gFn(inRes...)
|
|
|
|
+ }
|
|
|
|
|
|
- fnval := reflect.ValueOf(fn)
|
|
|
|
- callParam := make([]reflect.Value, len(inRes))
|
|
|
|
for i, r := range inRes {
|
|
for i, r := range inRes {
|
|
if r == nil {
|
|
if r == nil {
|
|
callParam[i] = reflect.Zero(fnval.Type().In(i))
|
|
callParam[i] = reflect.Zero(fnval.Type().In(i))
|
|
@@ -236,7 +206,6 @@ func makeExecutor(op *operation, fn interface{}) executorFunc {
|
|
}
|
|
}
|
|
|
|
|
|
// Start again and execute function
|
|
// Start again and execute function
|
|
- op.flow.hooks.start(op)
|
|
|
|
fnret := fnval.Call(callParam)
|
|
fnret := fnval.Call(callParam)
|
|
if len(fnret) == 0 {
|
|
if len(fnret) == 0 {
|
|
return nil, nil
|
|
return nil, nil
|