|
@@ -18,20 +18,21 @@ type executorFunc func(OpCtx, ...Data) (Data, error)
|
|
|
|
|
|
// Operation interface
|
|
// Operation interface
|
|
type Operation interface { // Id perhaps?
|
|
type Operation interface { // Id perhaps?
|
|
- ID() string
|
|
|
|
- Set(input Data) // Special var method
|
|
|
|
|
|
+ //ID() string
|
|
|
|
+ //Set(input Data) // Special var method
|
|
Process(params ...Data) (Data, error)
|
|
Process(params ...Data) (Data, error)
|
|
}
|
|
}
|
|
|
|
|
|
-// Will be named operation
|
|
|
|
type operation struct {
|
|
type operation struct {
|
|
sync.Mutex
|
|
sync.Mutex
|
|
- flow *Flow
|
|
|
|
- id string
|
|
|
|
- name string
|
|
|
|
- kind string
|
|
|
|
- inputs []*operation // still figuring, might be Operation
|
|
|
|
- setter func(Data)
|
|
|
|
|
|
+ flow *Flow
|
|
|
|
+ name string
|
|
|
|
+ kind string
|
|
|
|
+
|
|
|
|
+ // Could be a simple ID for operation, but it will depend on flow
|
|
|
|
+ inputs []*operation // still figuring, might be Operation
|
|
|
|
+
|
|
|
|
+ fn interface{} // Any func
|
|
executor executorFunc
|
|
executor executorFunc
|
|
}
|
|
}
|
|
|
|
|
|
@@ -43,25 +44,19 @@ func newOpCtx() OpCtx {
|
|
return &sync.Map{}
|
|
return &sync.Map{}
|
|
}
|
|
}
|
|
|
|
|
|
-func (o *operation) ID() string { return o.id }
|
|
|
|
|
|
+//func (o *operation) ID() string { return o.id }
|
|
|
|
|
|
func (o *operation) Process(params ...Data) (Data, error) {
|
|
func (o *operation) Process(params ...Data) (Data, error) {
|
|
// Create CTX
|
|
// Create CTX
|
|
ctx := newOpCtx()
|
|
ctx := newOpCtx()
|
|
return o.executor(ctx, params...)
|
|
return o.executor(ctx, params...)
|
|
|
|
|
|
- //return executeOP(o.flow, o.id, ctx, params...)
|
|
|
|
-}
|
|
|
|
-func (o *operation) Set(data Data) {
|
|
|
|
- if o.setter != nil {
|
|
|
|
- o.setter(data)
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
// make Executor for func
|
|
// make Executor for func
|
|
-func (f *Flow) asTrigger(id string, fn executorFunc) executorFunc {
|
|
|
|
|
|
+func (f *Flow) asTrigger(op *operation, fn executorFunc) executorFunc {
|
|
return func(ctx OpCtx, params ...Data) (Data, error) {
|
|
return func(ctx OpCtx, params ...Data) (Data, error) {
|
|
- f.hooks.start(id)
|
|
|
|
|
|
+ f.hooks.start(op)
|
|
|
|
|
|
//panic recoverer, since nodes are not our functions
|
|
//panic recoverer, since nodes are not our functions
|
|
var err error
|
|
var err error
|
|
@@ -78,45 +73,45 @@ func (f *Flow) asTrigger(id string, fn executorFunc) executorFunc {
|
|
}()
|
|
}()
|
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
- f.hooks.error(id, err)
|
|
|
|
|
|
+ f.hooks.error(op, err)
|
|
} else {
|
|
} else {
|
|
- f.hooks.finish(id, res)
|
|
|
|
|
|
+ f.hooks.finish(op, res)
|
|
}
|
|
}
|
|
return res, err
|
|
return res, err
|
|
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-// save makeExecutor with a bunch of type checks
|
|
|
|
|
|
+// safe makeExecutor with a bunch of type checks
|
|
// we will create a less safer functions to get performance
|
|
// we will create a less safer functions to get performance
|
|
-func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
|
|
|
|
|
|
+func (f *Flow) makeExecutor(op *operation, fn interface{}) executorFunc {
|
|
|
|
+ // ExecutorFunc
|
|
return func(ctx OpCtx, params ...Data) (Data, error) {
|
|
return func(ctx OpCtx, params ...Data) (Data, error) {
|
|
|
|
|
|
- op := f.GetOp(id)
|
|
|
|
|
|
+ /*op := f.GetOp(id)
|
|
if op == nil {
|
|
if op == nil {
|
|
return nil, fmt.Errorf("invalid operation '%s'", id)
|
|
return nil, fmt.Errorf("invalid operation '%s'", id)
|
|
- }
|
|
|
|
|
|
+ }*/
|
|
op.Lock()
|
|
op.Lock()
|
|
defer op.Unlock()
|
|
defer op.Unlock()
|
|
|
|
|
|
// Load from cache if any
|
|
// Load from cache if any
|
|
if ctx != nil {
|
|
if ctx != nil {
|
|
- if v, ok := ctx.Load(id); ok {
|
|
|
|
|
|
+ if v, ok := ctx.Load(op); ok {
|
|
return v, nil
|
|
return v, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- // Change to wait
|
|
|
|
- f.hooks.wait(id)
|
|
|
|
|
|
+ // Change to wait to wait for the inputs
|
|
|
|
+ f.hooks.wait(op)
|
|
|
|
|
|
fnval := reflect.ValueOf(fn)
|
|
fnval := reflect.ValueOf(fn)
|
|
callParam, err := f.processInputs(ctx, op, fnval, params...)
|
|
callParam, err := f.processInputs(ctx, op, fnval, params...)
|
|
if err != nil {
|
|
if err != nil {
|
|
- //log.Println("ERR", err)
|
|
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
|
|
- // Start again
|
|
|
|
- f.hooks.start(id)
|
|
|
|
|
|
+ // Start again and execute function
|
|
|
|
+ f.hooks.start(op)
|
|
fnret := fnval.Call(callParam)
|
|
fnret := fnval.Call(callParam)
|
|
if len(fnret) == 0 {
|
|
if len(fnret) == 0 {
|
|
return nil, nil
|
|
return nil, nil
|
|
@@ -134,15 +129,15 @@ func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
|
|
ret := fnret[0].Interface()
|
|
ret := fnret[0].Interface()
|
|
// Store in the cache
|
|
// Store in the cache
|
|
if ctx != nil {
|
|
if ctx != nil {
|
|
- ctx.Store(id, ret)
|
|
|
|
|
|
+ ctx.Store(op, ret)
|
|
}
|
|
}
|
|
return ret, nil
|
|
return ret, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-/////////////////////////////
|
|
|
|
|
|
+// processInputs will run a list of operations and return reflect values
|
|
|
|
+// to be processed next
|
|
// NEW PARALLEL PROCESSING
|
|
// NEW PARALLEL PROCESSING
|
|
-///////////
|
|
|
|
func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, params ...Data) ([]reflect.Value, error) {
|
|
func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, params ...Data) ([]reflect.Value, error) {
|
|
nInputs := fnval.Type().NumIn()
|
|
nInputs := fnval.Type().NumIn()
|
|
|
|
|
|
@@ -200,6 +195,8 @@ func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, para
|
|
}(i, in)
|
|
}(i, in)
|
|
}
|
|
}
|
|
wg.Wait()
|
|
wg.Wait()
|
|
|
|
+
|
|
|
|
+ // Check for any error
|
|
if callErrors != "" {
|
|
if callErrors != "" {
|
|
log.Println("Call errors:", callErrors)
|
|
log.Println("Call errors:", callErrors)
|
|
return nil, errors.New(callErrors)
|
|
return nil, errors.New(callErrors)
|
|
@@ -208,8 +205,40 @@ func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, para
|
|
return callParam, nil
|
|
return callParam, nil
|
|
}
|
|
}
|
|
|
|
|
|
-// DefVar define var operation with optional initial
|
|
|
|
-func (f *Flow) DefVar(id string, name string, initial ...Data) Operation {
|
|
|
|
|
|
+// Var create a operation
|
|
|
|
+func (f *Flow) Var(name string, initial Data) Operation {
|
|
|
|
+ // Input from params
|
|
|
|
+ var input *operation
|
|
|
|
+ switch v := initial.(type) {
|
|
|
|
+ case *operation:
|
|
|
|
+ input = v
|
|
|
|
+ default:
|
|
|
|
+ c := f.Const(v)
|
|
|
|
+ input = c.(*operation)
|
|
|
|
+ }
|
|
|
|
+ op := &operation{
|
|
|
|
+ Mutex: sync.Mutex{},
|
|
|
|
+ flow: f,
|
|
|
|
+ name: fmt.Sprintf("(var)<%s>", name),
|
|
|
|
+ kind: "var",
|
|
|
|
+ inputs: []*operation{input},
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ op.executor = f.makeExecutor(op, func(initial Data) Data {
|
|
|
|
+ val, ok := f.Data[name]
|
|
|
|
+ if !ok {
|
|
|
|
+ val = initial
|
|
|
|
+ f.Data[name] = val
|
|
|
|
+ }
|
|
|
|
+ return val
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ return op
|
|
|
|
+
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// Var define var operation with optional initial
|
|
|
|
+/*func (f *Flow) Var(name string, initial ...Data) Operation {
|
|
// Unique
|
|
// Unique
|
|
if _, ok := f.Data[name]; !ok {
|
|
if _, ok := f.Data[name]; !ok {
|
|
var v interface{}
|
|
var v interface{}
|
|
@@ -218,27 +247,26 @@ func (f *Flow) DefVar(id string, name string, initial ...Data) Operation {
|
|
}
|
|
}
|
|
f.Data[name] = v
|
|
f.Data[name] = v
|
|
}
|
|
}
|
|
- setter := func(v Data) { f.Data[name] = v }
|
|
|
|
|
|
|
|
- opEntry := &operation{
|
|
|
|
|
|
+ op := &operation{
|
|
Mutex: sync.Mutex{},
|
|
Mutex: sync.Mutex{},
|
|
- id: id,
|
|
|
|
flow: f,
|
|
flow: f,
|
|
name: fmt.Sprintf("(var)<%s>", name),
|
|
name: fmt.Sprintf("(var)<%s>", name),
|
|
kind: "var",
|
|
kind: "var",
|
|
inputs: nil,
|
|
inputs: nil,
|
|
- setter: setter,
|
|
|
|
- executor: f.asTrigger(id, func(OpCtx, ...Data) (Data, error) {
|
|
|
|
- // if f.data == nil we set from the init operation
|
|
|
|
- return f.Data[name], nil
|
|
|
|
- }),
|
|
|
|
}
|
|
}
|
|
- f.operations.Store(id, opEntry)
|
|
|
|
- return opEntry
|
|
|
|
-}
|
|
|
|
|
|
+ op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) {
|
|
|
|
+ // if f.data == nil we set from the init operation
|
|
|
|
+ return f.Data[name], nil
|
|
|
|
+ })
|
|
|
|
+ f.operations = append(f.operations, op)
|
|
|
|
+ //f.operations.Store(id, op)
|
|
|
|
+ return op
|
|
|
|
+}*/
|
|
|
|
|
|
-// DefOp Manual tag an Operation
|
|
|
|
-func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation, error) {
|
|
|
|
|
|
+// Op Manual tag an Operation
|
|
|
|
+// Define operation for ID
|
|
|
|
+func (f *Flow) Op(name string, params ...interface{}) Operation {
|
|
inputs := make([]*operation, len(params))
|
|
inputs := make([]*operation, len(params))
|
|
for i, p := range params {
|
|
for i, p := range params {
|
|
switch v := p.(type) {
|
|
switch v := p.(type) {
|
|
@@ -254,78 +282,81 @@ func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation,
|
|
// Grab executor here
|
|
// Grab executor here
|
|
registryFn, err := f.registry.Get(name)
|
|
registryFn, err := f.registry.Get(name)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return nil, err
|
|
|
|
|
|
+ return f.ErrOp(err)
|
|
}
|
|
}
|
|
- executor := f.makeExecutor(id, registryFn)
|
|
|
|
op := &operation{
|
|
op := &operation{
|
|
- Mutex: sync.Mutex{},
|
|
|
|
- id: id,
|
|
|
|
- flow: f,
|
|
|
|
- name: name,
|
|
|
|
- kind: "func",
|
|
|
|
- inputs: inputs,
|
|
|
|
- setter: nil, // No set
|
|
|
|
- executor: f.asTrigger(id, executor),
|
|
|
|
|
|
+ Mutex: sync.Mutex{},
|
|
|
|
+ flow: f,
|
|
|
|
+ name: name,
|
|
|
|
+ kind: "func",
|
|
|
|
+ inputs: inputs,
|
|
}
|
|
}
|
|
- f.operations.Store(id, op)
|
|
|
|
|
|
+ executor := f.makeExecutor(op, registryFn)
|
|
|
|
+ op.executor = f.asTrigger(op, executor)
|
|
|
|
+ f.operations = append(f.operations, op)
|
|
|
|
|
|
- return op, nil
|
|
|
|
|
|
+ return op
|
|
}
|
|
}
|
|
|
|
|
|
-// DefErrOp define a nil operation that will return error
|
|
|
|
|
|
+// ErrOp define a nil operation that will return error
|
|
// Usefull for builders
|
|
// Usefull for builders
|
|
-func (f *Flow) DefErrOp(id string, err error) Operation {
|
|
|
|
|
|
+func (f *Flow) ErrOp(err error) Operation {
|
|
op := &operation{
|
|
op := &operation{
|
|
- Mutex: sync.Mutex{},
|
|
|
|
- id: id,
|
|
|
|
- flow: f,
|
|
|
|
- name: fmt.Sprintf("(error)<%v>", err),
|
|
|
|
- kind: "error",
|
|
|
|
- inputs: nil,
|
|
|
|
- setter: nil,
|
|
|
|
- executor: f.asTrigger(id, func(OpCtx, ...Data) (Data, error) { return nil, err }),
|
|
|
|
|
|
+ Mutex: sync.Mutex{},
|
|
|
|
+ //id: id,
|
|
|
|
+ flow: f,
|
|
|
|
+ name: fmt.Sprintf("(error)<%v>", err),
|
|
|
|
+ kind: "error",
|
|
|
|
+ inputs: nil,
|
|
}
|
|
}
|
|
- f.operations.Store(id, op)
|
|
|
|
|
|
+ op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) { return nil, err })
|
|
|
|
+ //f.operations = append(f.operations, op)
|
|
return op
|
|
return op
|
|
}
|
|
}
|
|
|
|
|
|
-// DefConst define a const by defined ID
|
|
|
|
-func (f *Flow) DefConst(id string, value Data) Operation {
|
|
|
|
|
|
+// Const define a const by defined ID
|
|
|
|
+func (f *Flow) Const(value Data) Operation {
|
|
// Optimize this definition
|
|
// Optimize this definition
|
|
- f.consts[id] = value
|
|
|
|
|
|
+ constID := -1
|
|
|
|
+ for k, v := range f.consts {
|
|
|
|
+ if v == value {
|
|
|
|
+ constID = k
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if constID == -1 {
|
|
|
|
+ constID = len(f.consts)
|
|
|
|
+ f.consts = append(f.consts, value)
|
|
|
|
+ }
|
|
|
|
|
|
op := &operation{
|
|
op := &operation{
|
|
- id: id,
|
|
|
|
- Mutex: sync.Mutex{},
|
|
|
|
- flow: f,
|
|
|
|
- name: fmt.Sprintf("(const)<%s>", id),
|
|
|
|
- kind: "const",
|
|
|
|
- inputs: nil,
|
|
|
|
- setter: nil,
|
|
|
|
- executor: f.asTrigger(id, func(OpCtx, ...Data) (Data, error) { return f.consts[id], nil }),
|
|
|
|
|
|
+ Mutex: sync.Mutex{},
|
|
|
|
+ flow: f,
|
|
|
|
+ name: fmt.Sprintf("(const)<%v:%v>", constID, value),
|
|
|
|
+ kind: "const",
|
|
|
|
+ inputs: nil,
|
|
}
|
|
}
|
|
- f.operations.Store(id, op)
|
|
|
|
|
|
+ op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) { return f.consts[constID], nil })
|
|
|
|
+ //f.operations = append(f.operations, op)
|
|
|
|
|
|
return op
|
|
return op
|
|
}
|
|
}
|
|
|
|
|
|
-// DefIn define input operation
|
|
|
|
-func (f *Flow) DefIn(id string, paramID int) Operation {
|
|
|
|
|
|
+// In define input operation
|
|
|
|
+func (f *Flow) In(paramID int) Operation {
|
|
op := &operation{
|
|
op := &operation{
|
|
- id: id,
|
|
|
|
Mutex: sync.Mutex{},
|
|
Mutex: sync.Mutex{},
|
|
flow: f,
|
|
flow: f,
|
|
name: fmt.Sprintf("(in)<%d>", paramID),
|
|
name: fmt.Sprintf("(in)<%d>", paramID),
|
|
kind: "in",
|
|
kind: "in",
|
|
inputs: nil,
|
|
inputs: nil,
|
|
- setter: nil,
|
|
|
|
- executor: f.asTrigger(id, func(ctx OpCtx, params ...Data) (Data, error) {
|
|
|
|
- if paramID < 0 || paramID >= len(params) {
|
|
|
|
- return nil, ErrInput
|
|
|
|
- }
|
|
|
|
- return params[paramID], nil
|
|
|
|
- }),
|
|
|
|
}
|
|
}
|
|
- f.operations.Store(id, op)
|
|
|
|
|
|
+ op.executor = f.asTrigger(op, func(ctx OpCtx, params ...Data) (Data, error) {
|
|
|
|
+ if paramID < 0 || paramID >= len(params) {
|
|
|
|
+ return nil, ErrInput
|
|
|
|
+ }
|
|
|
|
+ return params[paramID], nil
|
|
|
|
+ })
|
|
|
|
+ f.operations = append(f.operations, op)
|
|
return op
|
|
return op
|
|
}
|
|
}
|