|
@@ -0,0 +1,210 @@
|
|
|
+package flow
|
|
|
+
|
|
|
+import (
|
|
|
+ "bytes"
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
+ "reflect"
|
|
|
+)
|
|
|
+
|
|
|
+// Data interface
|
|
|
+type Data = interface{}
|
|
|
+
|
|
|
+type opEntry struct {
|
|
|
+ name string
|
|
|
+ inputs []*operation // still figuring, might be operation
|
|
|
+ executor interface{}
|
|
|
+}
|
|
|
+
|
|
|
+// Flow structure
|
|
|
+// We could Create a single array of operations
|
|
|
+// refs would only mean id, types would be embed in operation
|
|
|
+type Flow struct {
|
|
|
+ registry *Registry
|
|
|
+
|
|
|
+ consts []Data
|
|
|
+ data map[string]Data // Should be named, to fetch later
|
|
|
+ operations []opEntry
|
|
|
+
|
|
|
+ err error
|
|
|
+ runID int
|
|
|
+}
|
|
|
+
|
|
|
+// New create a new flow
|
|
|
+func New() *Flow {
|
|
|
+ return &Flow{
|
|
|
+ registry: globalRegistry,
|
|
|
+ // Data
|
|
|
+ consts: []Data{},
|
|
|
+ data: map[string]Data{},
|
|
|
+ operations: []opEntry{},
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//SetRegistry use the registry specified
|
|
|
+func (f *Flow) SetRegistry(r *Registry) *Flow {
|
|
|
+ f.registry = r
|
|
|
+ // chain
|
|
|
+ return f
|
|
|
+}
|
|
|
+
|
|
|
+// Op return an function operator
|
|
|
+// name - a previous registered function
|
|
|
+// params - the function inputs
|
|
|
+func (f *Flow) Op(name string, params ...interface{}) Operation {
|
|
|
+ // Use this on Set?
|
|
|
+ inputs := make([]*operation, len(params))
|
|
|
+ for i, p := range params {
|
|
|
+ switch v := p.(type) {
|
|
|
+ case *operation:
|
|
|
+ inputs[i] = v
|
|
|
+ default:
|
|
|
+ inputs[i] = f.Const(v).(*operation)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Grab executor here
|
|
|
+ executor, err := f.registry.Get(name)
|
|
|
+ if err != nil {
|
|
|
+ f.err = err
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ f.operations = append(f.operations, opEntry{name, inputs, executor})
|
|
|
+ refID := len(f.operations) - 1
|
|
|
+ // Initialize opfunc maybe
|
|
|
+ return opFunc(f, refID)
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+// Const returns a const operation
|
|
|
+func (f *Flow) Const(value Data) Operation {
|
|
|
+ f.consts = append(f.consts, value)
|
|
|
+ refID := len(f.consts) - 1
|
|
|
+ return opConst(f, refID)
|
|
|
+}
|
|
|
+
|
|
|
+// Var operation
|
|
|
+func (f *Flow) Var(name string, initial ...Data) Operation {
|
|
|
+ if _, ok := f.data[name]; !ok {
|
|
|
+ var v interface{}
|
|
|
+ if len(initial) > 0 {
|
|
|
+ v = initial[0]
|
|
|
+ }
|
|
|
+ f.data[name] = v
|
|
|
+ }
|
|
|
+ return opVar(f, name)
|
|
|
+}
|
|
|
+
|
|
|
+// In flow input operator
|
|
|
+// paramID - index of the parameter
|
|
|
+func (f *Flow) In(paramID int) Operation {
|
|
|
+ return opIn(f, paramID)
|
|
|
+}
|
|
|
+
|
|
|
+// Run a batch of operation?
|
|
|
+func (f *Flow) Run(op Operation, params ...Data) (Data, error) {
|
|
|
+ if f.err != nil {
|
|
|
+ return nil, f.err
|
|
|
+ }
|
|
|
+ cache := map[*operation]Data{}
|
|
|
+ return f.run(cache, op, params...)
|
|
|
+}
|
|
|
+func (f *Flow) run(cache map[*operation]Data, op Operation, params ...Data) (Data, error) {
|
|
|
+ o := op.(*operation)
|
|
|
+ if v, ok := cache[o]; ok {
|
|
|
+ return v, nil
|
|
|
+ }
|
|
|
+ // Manually fetch func data because of caching
|
|
|
+ var r Data
|
|
|
+ // This is wrong since the only source of func should be on operation
|
|
|
+ if o.kind == "func" {
|
|
|
+ op := f.operations[o.id.(int)]
|
|
|
+ callParam := make([]reflect.Value, len(op.inputs))
|
|
|
+ for i, in := range op.inputs {
|
|
|
+ fr, _ := f.run(cache, in, params...) // ignore error
|
|
|
+ callParam[i] = reflect.ValueOf(fr)
|
|
|
+ }
|
|
|
+ r = reflect.ValueOf(op.executor).Call(callParam)[0].Interface()
|
|
|
+ } else {
|
|
|
+ r = o.process(nil, params...)
|
|
|
+ }
|
|
|
+ cache[o] = r
|
|
|
+ return r, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Analyse every operations
|
|
|
+func (f *Flow) Analyse(params ...Data) string {
|
|
|
+ ret := bytes.NewBuffer(nil)
|
|
|
+ fmt.Fprintf(ret, "Ops analysis:\n")
|
|
|
+
|
|
|
+ for i, op := range f.operations {
|
|
|
+ fmt.Fprintf(ret, " [%d] %s(", i, op.name)
|
|
|
+ for j, in := range op.inputs {
|
|
|
+ //ref := in.(Op)
|
|
|
+ if j != 0 {
|
|
|
+ fmt.Fprintf(ret, ", ")
|
|
|
+ }
|
|
|
+ ires := in.Process(params...)
|
|
|
+ fmt.Fprintf(ret, "%s[%v](%v)", in.kind, in.id, ires)
|
|
|
+ }
|
|
|
+ fmt.Fprintf(ret, ") - ")
|
|
|
+ // Create OpProcessor and execute
|
|
|
+ //
|
|
|
+ opfn := opFunc(f, i)
|
|
|
+ res := opfn.Process(params...)
|
|
|
+ fmt.Fprintf(ret, "%v\n", res)
|
|
|
+ }
|
|
|
+ return ret.String()
|
|
|
+}
|
|
|
+
|
|
|
+func (f *Flow) String() string {
|
|
|
+ ret := bytes.NewBuffer(nil)
|
|
|
+ // consts
|
|
|
+ fmt.Fprintf(ret, "consts:\n")
|
|
|
+ for i, v := range f.consts {
|
|
|
+ fmt.Fprintf(ret, " [%d] %v\n", i, v)
|
|
|
+ }
|
|
|
+ fmt.Fprintf(ret, "data:\n") // Or variable
|
|
|
+ for k, v := range f.data {
|
|
|
+ fmt.Fprintf(ret, " [%v] %v\n", k, v)
|
|
|
+ }
|
|
|
+
|
|
|
+ fmt.Fprintf(ret, "operations:\n")
|
|
|
+ for i, v := range f.operations {
|
|
|
+ fmt.Fprintf(ret, " [%d] %s(", i, v.name)
|
|
|
+ for j, in := range v.inputs {
|
|
|
+ if j != 0 {
|
|
|
+ fmt.Fprintf(ret, ", ")
|
|
|
+ }
|
|
|
+ fmt.Fprintf(ret, "%s[%v]", in.kind, in.id)
|
|
|
+ }
|
|
|
+ fmt.Fprintf(ret, ")\n")
|
|
|
+ }
|
|
|
+
|
|
|
+ return ret.String()
|
|
|
+}
|
|
|
+
|
|
|
+// MarshalJSON implementation
|
|
|
+func (f *Flow) MarshalJSON() ([]byte, error) {
|
|
|
+ data := map[string]interface{}{}
|
|
|
+ type opMarshal struct {
|
|
|
+ Name string
|
|
|
+ Input []map[string]interface{}
|
|
|
+ }
|
|
|
+ operations := make([]opMarshal, len(f.operations))
|
|
|
+ for i, o := range f.operations {
|
|
|
+ refs := []map[string]interface{}{}
|
|
|
+ for _, in := range o.inputs { // Switch type?
|
|
|
+ refs = append(refs, map[string]interface{}{
|
|
|
+ "type": in.kind,
|
|
|
+ "id": in.id,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ operations[i] = opMarshal{o.name, refs}
|
|
|
+ }
|
|
|
+ data["operations"] = operations
|
|
|
+ data["data"] = f.data
|
|
|
+ data["consts"] = f.consts
|
|
|
+
|
|
|
+ return json.Marshal(data)
|
|
|
+}
|