|
@@ -2,8 +2,12 @@ package flow
|
|
|
|
|
|
import (
|
|
import (
|
|
"bytes"
|
|
"bytes"
|
|
|
|
+ "crypto/rand"
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"fmt"
|
|
"fmt"
|
|
|
|
+ "io"
|
|
|
|
+ "log"
|
|
|
|
+ "os"
|
|
"reflect"
|
|
"reflect"
|
|
)
|
|
)
|
|
|
|
|
|
@@ -24,7 +28,7 @@ type Flow struct {
|
|
|
|
|
|
consts []Data
|
|
consts []Data
|
|
data map[string]Data // Should be named, to fetch later
|
|
data map[string]Data // Should be named, to fetch later
|
|
- operations []opEntry
|
|
|
|
|
|
+ operations map[string]opEntry
|
|
|
|
|
|
err error
|
|
err error
|
|
runID int
|
|
runID int
|
|
@@ -37,10 +41,15 @@ func New() *Flow {
|
|
// Data
|
|
// Data
|
|
consts: []Data{},
|
|
consts: []Data{},
|
|
data: map[string]Data{},
|
|
data: map[string]Data{},
|
|
- operations: []opEntry{},
|
|
|
|
|
|
+ operations: map[string]opEntry{},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// Err returns internal error state
|
|
|
|
+func (f *Flow) Err() error {
|
|
|
|
+ return f.err
|
|
|
|
+}
|
|
|
|
+
|
|
//SetRegistry use the registry specified
|
|
//SetRegistry use the registry specified
|
|
func (f *Flow) SetRegistry(r *Registry) *Flow {
|
|
func (f *Flow) SetRegistry(r *Registry) *Flow {
|
|
f.registry = r
|
|
f.registry = r
|
|
@@ -48,6 +57,35 @@ func (f *Flow) SetRegistry(r *Registry) *Flow {
|
|
return f
|
|
return f
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// DefOp Manual tag an Operation
|
|
|
|
+func (f *Flow) DefOp(id string, name string, params ...interface{}) Operation {
|
|
|
|
+ inputs := make([]*operation, len(params))
|
|
|
|
+ for i, p := range params {
|
|
|
|
+ switch v := p.(type) {
|
|
|
|
+ case *operation:
|
|
|
|
+ inputs[i] = v
|
|
|
|
+ default:
|
|
|
|
+ log.Println("WARNING defining const with value", v)
|
|
|
|
+ inputs[i] = f.Const(v).(*operation)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // Grab executor here
|
|
|
|
+ executor, err := f.registry.Get(name)
|
|
|
|
+ if err != nil {
|
|
|
|
+ f.err = err
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+ f.operations[id] = opEntry{name, inputs, executor}
|
|
|
|
+ return opFunc(f, id)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// Res returns a deferred operation result
|
|
|
|
+// passing the Id
|
|
|
|
+func (f *Flow) Res(id string) Operation {
|
|
|
|
+ // Defered operation
|
|
|
|
+ return opFunc(f, id)
|
|
|
|
+}
|
|
|
|
+
|
|
// Op return an function operator
|
|
// Op return an function operator
|
|
// name - a previous registered function
|
|
// name - a previous registered function
|
|
// params - the function inputs
|
|
// params - the function inputs
|
|
@@ -59,6 +97,8 @@ func (f *Flow) Op(name string, params ...interface{}) Operation {
|
|
case *operation:
|
|
case *operation:
|
|
inputs[i] = v
|
|
inputs[i] = v
|
|
default:
|
|
default:
|
|
|
|
+ // fail here
|
|
|
|
+ log.Println("WARNING defining const with value", v)
|
|
inputs[i] = f.Const(v).(*operation)
|
|
inputs[i] = f.Const(v).(*operation)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -69,11 +109,19 @@ func (f *Flow) Op(name string, params ...interface{}) Operation {
|
|
f.err = err
|
|
f.err = err
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
- f.operations = append(f.operations, opEntry{name, inputs, executor})
|
|
|
|
- refID := len(f.operations) - 1
|
|
|
|
- // Initialize opfunc maybe
|
|
|
|
- return opFunc(f, refID)
|
|
|
|
|
|
+ // generate ID
|
|
|
|
|
|
|
|
+ //f.operations = append(f.operations, opEntry{name, inputs, executor})
|
|
|
|
+ //refID := len(f.operations) - 1
|
|
|
|
+ for {
|
|
|
|
+ uuid := puuid()
|
|
|
|
+ if _, ok := f.operations[uuid]; ok {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ f.operations[uuid] = opEntry{name, inputs, executor}
|
|
|
|
+ return opFunc(f, uuid)
|
|
|
|
+ }
|
|
|
|
+ // Initialize opfunc maybe
|
|
}
|
|
}
|
|
|
|
|
|
// Const returns a const operation
|
|
// Const returns a const operation
|
|
@@ -118,7 +166,7 @@ func (f *Flow) run(cache map[*operation]Data, op Operation, params ...Data) (Dat
|
|
var r Data
|
|
var r Data
|
|
// This is wrong since the only source of func should be on operation
|
|
// This is wrong since the only source of func should be on operation
|
|
if o.kind == "func" {
|
|
if o.kind == "func" {
|
|
- op := f.operations[o.id.(int)]
|
|
|
|
|
|
+ op := f.operations[o.id.(string)]
|
|
callParam := make([]reflect.Value, len(op.inputs))
|
|
callParam := make([]reflect.Value, len(op.inputs))
|
|
for i, in := range op.inputs {
|
|
for i, in := range op.inputs {
|
|
fr, _ := f.run(cache, in, params...) // ignore error
|
|
fr, _ := f.run(cache, in, params...) // ignore error
|
|
@@ -133,28 +181,38 @@ func (f *Flow) run(cache map[*operation]Data, op Operation, params ...Data) (Dat
|
|
}
|
|
}
|
|
|
|
|
|
// Analyse every operations
|
|
// Analyse every operations
|
|
-func (f *Flow) Analyse(params ...Data) string {
|
|
|
|
- ret := bytes.NewBuffer(nil)
|
|
|
|
- fmt.Fprintf(ret, "Ops analysis:\n")
|
|
|
|
-
|
|
|
|
- for i, op := range f.operations {
|
|
|
|
- fmt.Fprintf(ret, " [%d] %s(", i, op.name)
|
|
|
|
|
|
+func (f *Flow) Analyse(w io.Writer, params ...Data) {
|
|
|
|
+ if w == nil {
|
|
|
|
+ w = os.Stdout
|
|
|
|
+ }
|
|
|
|
+ fmt.Fprintf(w, "Ops analysis:\n")
|
|
|
|
+ for k, op := range f.operations {
|
|
|
|
+ fw := bytes.NewBuffer(nil)
|
|
|
|
+ //fmt.Fprintf(w, " [%s] (%v)", k, op.name)
|
|
|
|
+ fmt.Fprintf(fw, " [%s] %s(", k, op.name)
|
|
for j, in := range op.inputs {
|
|
for j, in := range op.inputs {
|
|
//ref := in.(Op)
|
|
//ref := in.(Op)
|
|
if j != 0 {
|
|
if j != 0 {
|
|
- fmt.Fprintf(ret, ", ")
|
|
|
|
|
|
+ fmt.Fprintf(fw, ", ")
|
|
}
|
|
}
|
|
ires := in.Process(params...)
|
|
ires := in.Process(params...)
|
|
- fmt.Fprintf(ret, "%s[%v](%v)", in.kind, in.id, ires)
|
|
|
|
|
|
+ if f.err != nil {
|
|
|
|
+ fmt.Fprintf(w, "Operator: %s error#%s\n", op.name, f.err.Error())
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ fmt.Fprintf(fw, " %s[%v](%v)", in.kind, in.id, ires)
|
|
}
|
|
}
|
|
- fmt.Fprintf(ret, ") - ")
|
|
|
|
|
|
+ fmt.Fprintf(fw, ") - ")
|
|
// Create OpProcessor and execute
|
|
// Create OpProcessor and execute
|
|
//
|
|
//
|
|
- opfn := opFunc(f, i)
|
|
|
|
|
|
+ opfn := opFunc(f, k)
|
|
res := opfn.Process(params...)
|
|
res := opfn.Process(params...)
|
|
- fmt.Fprintf(ret, "%v\n", res)
|
|
|
|
|
|
+ fmt.Fprintf(fw, "%v\n", res)
|
|
|
|
+
|
|
|
|
+ fmt.Fprintf(w, "%s", fw.String())
|
|
|
|
+
|
|
}
|
|
}
|
|
- return ret.String()
|
|
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
func (f *Flow) String() string {
|
|
func (f *Flow) String() string {
|
|
@@ -170,8 +228,8 @@ func (f *Flow) String() string {
|
|
}
|
|
}
|
|
|
|
|
|
fmt.Fprintf(ret, "operations:\n")
|
|
fmt.Fprintf(ret, "operations:\n")
|
|
- for i, v := range f.operations {
|
|
|
|
- fmt.Fprintf(ret, " [%d] %s(", i, v.name)
|
|
|
|
|
|
+ for k, v := range f.operations {
|
|
|
|
+ fmt.Fprintf(ret, " [%s] %s(", k, v.name)
|
|
for j, in := range v.inputs {
|
|
for j, in := range v.inputs {
|
|
if j != 0 {
|
|
if j != 0 {
|
|
fmt.Fprintf(ret, ", ")
|
|
fmt.Fprintf(ret, ", ")
|
|
@@ -191,8 +249,8 @@ func (f *Flow) MarshalJSON() ([]byte, error) {
|
|
Name string
|
|
Name string
|
|
Input []map[string]interface{}
|
|
Input []map[string]interface{}
|
|
}
|
|
}
|
|
- operations := make([]opMarshal, len(f.operations))
|
|
|
|
- for i, o := range f.operations {
|
|
|
|
|
|
+ operations := map[string]opMarshal{}
|
|
|
|
+ for k, o := range f.operations {
|
|
refs := []map[string]interface{}{}
|
|
refs := []map[string]interface{}{}
|
|
for _, in := range o.inputs { // Switch type?
|
|
for _, in := range o.inputs { // Switch type?
|
|
refs = append(refs, map[string]interface{}{
|
|
refs = append(refs, map[string]interface{}{
|
|
@@ -200,7 +258,7 @@ func (f *Flow) MarshalJSON() ([]byte, error) {
|
|
"id": in.id,
|
|
"id": in.id,
|
|
})
|
|
})
|
|
}
|
|
}
|
|
- operations[i] = opMarshal{o.name, refs}
|
|
|
|
|
|
+ operations[k] = opMarshal{o.name, refs}
|
|
}
|
|
}
|
|
data["operations"] = operations
|
|
data["operations"] = operations
|
|
data["data"] = f.data
|
|
data["data"] = f.data
|
|
@@ -208,3 +266,17 @@ func (f *Flow) MarshalJSON() ([]byte, error) {
|
|
|
|
|
|
return json.Marshal(data)
|
|
return json.Marshal(data)
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+func puuid() (uuid string) {
|
|
|
|
+
|
|
|
|
+ b := make([]byte, 16)
|
|
|
|
+ _, err := rand.Read(b)
|
|
|
|
+ if err != nil {
|
|
|
|
+ fmt.Println("Error: ", err)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ uuid = fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
|
|
|
|
+
|
|
|
|
+ return
|
|
|
|
+}
|