package flow import ( "bytes" "encoding/json" "fmt" "io" "log" "os" "reflect" ) // Data interface type Data = interface{} type opEntry struct { name string inputs []*operation // still figuring, might be operation executor interface{} } // Flow structure // We could Create a single array of operations // refs would only mean id, types would be embed in operation type Flow struct { registry *Registry consts []Data data map[string]Data // Should be named, to fetch later operations map[string]opEntry err error runID int // Experimental run Event handlers []func(name string, payLoad map[string]Data) } // New create a new flow func New() *Flow { return &Flow{ registry: globalRegistry, // Data consts: []Data{}, data: map[string]Data{}, operations: map[string]opEntry{}, } } // Err returns internal error state func (f *Flow) Err() error { return f.err } //SetRegistry use the registry specified func (f *Flow) SetRegistry(r *Registry) *Flow { f.registry = r // chain return f } // DefOp Manual tag an Operation func (f *Flow) DefOp(id string, name string, params ...interface{}) Operation { inputs := make([]*operation, len(params)) for i, p := range params { switch v := p.(type) { case *operation: inputs[i] = v default: log.Println("WARNING defining const with value", v) inputs[i] = f.Const(v).(*operation) } } // Grab executor here executor, err := f.registry.Get(name) if err != nil { f.err = err return nil } f.operations[id] = opEntry{name, inputs, executor} return opFunc(f, id) } // Res returns a deferred operation result // passing the Id func (f *Flow) Res(id string) Operation { // Defered operation return opFunc(f, id) } // Op return an function operator // name - a previous registered function // params - the function inputs func (f *Flow) Op(name string, params ...interface{}) Operation { // Use this on Set? inputs := make([]*operation, len(params)) for i, p := range params { switch v := p.(type) { case *operation: inputs[i] = v default: // fail here log.Println("WARNING defining const with value", v) inputs[i] = f.Const(v).(*operation) } } // Grab executor here executor, err := f.registry.Get(name) if err != nil { f.err = err return nil } // generate ID for { uuid := puuid() if _, ok := f.operations[uuid]; ok { continue } f.operations[uuid] = opEntry{name, inputs, executor} return opFunc(f, uuid) } // Initialize opfunc maybe } // Const returns a const operation func (f *Flow) Const(value Data) Operation { f.consts = append(f.consts, value) refID := len(f.consts) - 1 return opConst(f, refID) } // Var operation func (f *Flow) Var(name string, initial ...Data) Operation { if _, ok := f.data[name]; !ok { var v interface{} if len(initial) > 0 { v = initial[0] } f.data[name] = v } return opVar(f, name) } // In flow input operator // paramID - index of the parameter func (f *Flow) In(paramID int) Operation { return opIn(f, paramID) } // Run a batch of operation? func (f *Flow) Run(op Operation, params ...Data) (Data, error) { if f.err != nil { return nil, f.err } cache := map[*operation]Data{} return f.run(cache, op, params...) } func (f *Flow) run(cache map[*operation]Data, op Operation, params ...Data) (Data, error) { o := op.(*operation) if v, ok := cache[o]; ok { return v, nil } // Manually fetch func data because of caching var r Data // This is wrong since the only source of func should be on operation if o.kind == "func" { op := f.operations[o.id.(string)] callParam := make([]reflect.Value, len(op.inputs)) for i, in := range op.inputs { fr, _ := f.run(cache, in, params...) // ignore error callParam[i] = reflect.ValueOf(fr) } r = reflect.ValueOf(op.executor).Call(callParam)[0].Interface() } else { r = o.process(nil, params...) } cache[o] = r return r, nil } // Analyse every operations func (f *Flow) Analyse(w io.Writer, params ...Data) { if w == nil { w = os.Stdout } fmt.Fprintf(w, "Ops analysis:\n") for k, op := range f.operations { fw := bytes.NewBuffer(nil) //fmt.Fprintf(w, " [%s] (%v)", k, op.name) fmt.Fprintf(fw, " [%s] %s(", k, op.name) for j, in := range op.inputs { //ref := in.(Op) if j != 0 { fmt.Fprintf(fw, ", ") } ires := in.Process(params...) if f.err != nil { fmt.Fprintf(w, "Operator: %s error#%s\n", op.name, f.err.Error()) return } fmt.Fprintf(fw, " %s[%v](%v)", in.kind, in.id, ires) } fmt.Fprintf(fw, ") - ") // Create OpProcessor and execute // opfn := opFunc(f, k) res := opfn.Process(params...) fmt.Fprintf(fw, "%v\n", res) fmt.Fprintf(w, "%s", fw.String()) } } ///////////////////////////// // Serializers inspectors ////////////////////// func (f *Flow) String() string { ret := bytes.NewBuffer(nil) // consts fmt.Fprintf(ret, "consts:\n") for i, v := range f.consts { fmt.Fprintf(ret, " [%d] %v\n", i, v) } fmt.Fprintf(ret, "data:\n") // Or variable for k, v := range f.data { fmt.Fprintf(ret, " [%v] %v\n", k, v) } fmt.Fprintf(ret, "operations:\n") for k, v := range f.operations { fmt.Fprintf(ret, " [%s] %s(", k, v.name) for j, in := range v.inputs { if j != 0 { fmt.Fprintf(ret, ", ") } fmt.Fprintf(ret, "%s[%v]", in.kind, in.id) } fmt.Fprintf(ret, ")\n") } return ret.String() } // MarshalJSON implementation func (f *Flow) MarshalJSON() ([]byte, error) { data := map[string]interface{}{} type opMarshal struct { Name string Input []map[string]interface{} } operations := map[string]opMarshal{} for k, o := range f.operations { refs := []map[string]interface{}{} for _, in := range o.inputs { // Switch type? refs = append(refs, map[string]interface{}{ "type": in.kind, "id": in.id, }) } operations[k] = opMarshal{o.name, refs} } data["operations"] = operations data["data"] = f.data data["consts"] = f.consts return json.Marshal(data) } ////////////////////////////////////////////// // Experimental event //////////////// // Handle attach a handler func (f *Flow) Handle(handler func(name string, payLoad map[string]Data)) { f.handlers = append(f.handlers, handler) } func (f *Flow) trigger(name string, payLoad map[string]Data) { for _, h := range f.handlers { h(name, payLoad) } }