package flow import ( "bytes" "encoding/json" "fmt" ) // Data interface type Data = interface{} type opEntry struct { name string inputs []operation // still figuring, might be operation executor interface{} } // Flow structure // We could Create a single array of operations // refs would only mean id, types would be embed in operation type Flow struct { registry *Registry consts []Data data []Data operations []opEntry err error } // New create a new flow func New() *Flow { return &Flow{ registry: globalRegistry, consts: []Data{}, data: []Data{}, operations: []opEntry{}, } } // OpFunc return func (f *Flow) Op(name string, params ...interface{}) operation { inputs := make([]operation, len(params)) for i, p := range params { switch v := p.(type) { case operation: inputs[i] = v default: inputs[i] = f.Const(v) } } // Grab executor here executor, err := f.registry.Get(name) if err != nil { f.err = err return OpFunc{} } f.operations = append(f.operations, opEntry{name, inputs, executor}) refID := len(f.operations) - 1 // Initialize opfunc maybe return OpFunc{ref{f, refID}} } // Const returns a const operation func (f *Flow) Const(value interface{}) OpConst { f.consts = append(f.consts, value) refID := len(f.consts) - 1 return OpConst{ref{f, refID}} } // Variable operation func (f *Flow) Variable(value interface{}) OpVar { f.data = append(f.consts, value) refID := len(f.data) - 1 return OpVar{ref{f, refID}} } func (f *Flow) In(paramId int) OpIn { return OpIn{ref{f, paramId}} } // Run a batch of operation func (f *Flow) Run(ops ...operation) ([]Data, error) { if f.err != nil { return nil, f.err } res := []Data{} for _, o := range ops { res = append(res, o.Process()) } return res, nil } // Analyse every operations func (f *Flow) Analyse(params ...Data) string { ret := bytes.NewBuffer(nil) fmt.Fprintf(ret, "Flow analysis:\n") for i, op := range f.operations { fmt.Fprintf(ret, " [%d] %s(", i, op.name) for j, in := range op.inputs { ref := in.fref() if j != 0 { fmt.Fprintf(ret, ", ") } ires := in.Process(params...) fmt.Fprintf(ret, "%s@%d(%v)", typeName(in), ref.ID, ires) } fmt.Fprintf(ret, ") - ") // Create OpProcessor and execute // opfn := OpFunc{ref{f, i}} res := opfn.Process(params...) fmt.Fprintf(ret, "%v\n", res) } return ret.String() } func (f *Flow) String() string { ret := bytes.NewBuffer(nil) // consts fmt.Fprintf(ret, "consts:\n") for i, v := range f.consts { fmt.Fprintf(ret, " [%d] %v\n", i, v) } fmt.Fprintf(ret, "data:\n") // Or variable for i, v := range f.data { fmt.Fprintf(ret, " [%d] %v\n", i, v) } fmt.Fprintf(ret, "operations:\n") for i, v := range f.operations { fmt.Fprintf(ret, " [%d] %s(", i, v.name) for j, in := range v.inputs { ref := in.fref() if j != 0 { fmt.Fprintf(ret, ", ") } fmt.Fprintf(ret, "%s@%d", typeName(in), ref.ID) } fmt.Fprintf(ret, ")\n") } return ret.String() } // Ref? func (f *Flow) MarshalJSON() ([]byte, error) { data := map[string]interface{}{} type opMarshal struct { Name string Input []map[string]interface{} } operations := make([]opMarshal, len(f.operations)) for i, o := range f.operations { refs := []map[string]interface{}{} for _, in := range o.inputs { // Switch type? refs = append(refs, map[string]interface{}{ "type": typeName(in), "id": in.fref().ID, }) } operations[i] = opMarshal{o.name, refs} } data["operations"] = operations data["data"] = f.data data["consts"] = f.consts return json.Marshal(data) }