123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- package flow
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "os"
- "reflect"
- )
- // Data interface
- type Data = interface{}
- type opEntry struct {
- name string
- inputs []*operation // still figuring, might be operation
- executor interface{}
- }
- // Flow structure
- // We could Create a single array of operations
- // refs would only mean id, types would be embed in operation
- type Flow struct {
- registry *Registry
- consts []Data
- data map[string]Data // Should be named, to fetch later
- operations map[string]opEntry
- err error
- runID int
- // Experimental run Event
- handlers []func(name string, payLoad map[string]Data)
- }
- // New create a new flow
- func New() *Flow {
- return &Flow{
- registry: globalRegistry,
- // Data
- consts: []Data{},
- data: map[string]Data{},
- operations: map[string]opEntry{},
- }
- }
- // Err returns internal error state
- func (f *Flow) Err() error {
- return f.err
- }
- //SetRegistry use the registry specified
- func (f *Flow) SetRegistry(r *Registry) *Flow {
- f.registry = r
- // chain
- return f
- }
- // DefOp Manual tag an Operation
- func (f *Flow) DefOp(id string, name string, params ...interface{}) Operation {
- inputs := make([]*operation, len(params))
- for i, p := range params {
- switch v := p.(type) {
- case *operation:
- inputs[i] = v
- default:
- log.Println("WARNING defining const with value", v)
- inputs[i] = f.Const(v).(*operation)
- }
- }
- // Grab executor here
- executor, err := f.registry.Get(name)
- if err != nil {
- f.err = err
- return nil
- }
- f.operations[id] = opEntry{name, inputs, executor}
- return opFunc(f, id)
- }
- // Res returns a deferred operation result
- // passing the Id
- func (f *Flow) Res(id string) Operation {
- // Defered operation
- return opFunc(f, id)
- }
- // Op return an function operator
- // name - a previous registered function
- // params - the function inputs
- func (f *Flow) Op(name string, params ...interface{}) Operation {
- // Use this on Set?
- inputs := make([]*operation, len(params))
- for i, p := range params {
- switch v := p.(type) {
- case *operation:
- inputs[i] = v
- default:
- // fail here
- log.Println("WARNING defining const with value", v)
- inputs[i] = f.Const(v).(*operation)
- }
- }
- // Grab executor here
- executor, err := f.registry.Get(name)
- if err != nil {
- f.err = err
- return nil
- }
- // generate ID
- for {
- uuid := puuid()
- if _, ok := f.operations[uuid]; ok {
- continue
- }
- f.operations[uuid] = opEntry{name, inputs, executor}
- return opFunc(f, uuid)
- }
- // Initialize opfunc maybe
- }
- // Const returns a const operation
- func (f *Flow) Const(value Data) Operation {
- f.consts = append(f.consts, value)
- refID := len(f.consts) - 1
- return opConst(f, refID)
- }
- // Var operation
- func (f *Flow) Var(name string, initial ...Data) Operation {
- if _, ok := f.data[name]; !ok {
- var v interface{}
- if len(initial) > 0 {
- v = initial[0]
- }
- f.data[name] = v
- }
- return opVar(f, name)
- }
- // In flow input operator
- // paramID - index of the parameter
- func (f *Flow) In(paramID int) Operation {
- return opIn(f, paramID)
- }
- // Run a batch of operation?
- func (f *Flow) Run(op Operation, params ...Data) (Data, error) {
- if f.err != nil {
- return nil, f.err
- }
- cache := map[*operation]Data{}
- return f.run(cache, op, params...)
- }
- func (f *Flow) run(cache map[*operation]Data, op Operation, params ...Data) (Data, error) {
- o := op.(*operation)
- if v, ok := cache[o]; ok {
- return v, nil
- }
- // Manually fetch func data because of caching
- var r Data
- // This is wrong since the only source of func should be on operation
- if o.kind == "func" {
- op := f.operations[o.id.(string)]
- callParam := make([]reflect.Value, len(op.inputs))
- for i, in := range op.inputs {
- fr, _ := f.run(cache, in, params...) // ignore error
- callParam[i] = reflect.ValueOf(fr)
- }
- r = reflect.ValueOf(op.executor).Call(callParam)[0].Interface()
- } else {
- r = o.process(nil, params...)
- }
- cache[o] = r
- return r, nil
- }
- // Analyse every operations
- func (f *Flow) Analyse(w io.Writer, params ...Data) {
- if w == nil {
- w = os.Stdout
- }
- fmt.Fprintf(w, "Ops analysis:\n")
- for k, op := range f.operations {
- fw := bytes.NewBuffer(nil)
- //fmt.Fprintf(w, " [%s] (%v)", k, op.name)
- fmt.Fprintf(fw, " [%s] %s(", k, op.name)
- for j, in := range op.inputs {
- //ref := in.(Op)
- if j != 0 {
- fmt.Fprintf(fw, ", ")
- }
- ires := in.Process(params...)
- if f.err != nil {
- fmt.Fprintf(w, "Operator: %s error#%s\n", op.name, f.err.Error())
- return
- }
- fmt.Fprintf(fw, " %s[%v](%v)", in.kind, in.id, ires)
- }
- fmt.Fprintf(fw, ") - ")
- // Create OpProcessor and execute
- //
- opfn := opFunc(f, k)
- res := opfn.Process(params...)
- fmt.Fprintf(fw, "%v\n", res)
- fmt.Fprintf(w, "%s", fw.String())
- }
- }
- /////////////////////////////
- // Serializers inspectors
- //////////////////////
- func (f *Flow) String() string {
- ret := bytes.NewBuffer(nil)
- // consts
- fmt.Fprintf(ret, "consts:\n")
- for i, v := range f.consts {
- fmt.Fprintf(ret, " [%d] %v\n", i, v)
- }
- fmt.Fprintf(ret, "data:\n") // Or variable
- for k, v := range f.data {
- fmt.Fprintf(ret, " [%v] %v\n", k, v)
- }
- fmt.Fprintf(ret, "operations:\n")
- for k, v := range f.operations {
- fmt.Fprintf(ret, " [%s] %s(", k, v.name)
- for j, in := range v.inputs {
- if j != 0 {
- fmt.Fprintf(ret, ", ")
- }
- fmt.Fprintf(ret, "%s[%v]", in.kind, in.id)
- }
- fmt.Fprintf(ret, ")\n")
- }
- return ret.String()
- }
- // MarshalJSON implementation
- func (f *Flow) MarshalJSON() ([]byte, error) {
- data := map[string]interface{}{}
- type opMarshal struct {
- Name string
- Input []map[string]interface{}
- }
- operations := map[string]opMarshal{}
- for k, o := range f.operations {
- refs := []map[string]interface{}{}
- for _, in := range o.inputs { // Switch type?
- refs = append(refs, map[string]interface{}{
- "type": in.kind,
- "id": in.id,
- })
- }
- operations[k] = opMarshal{o.name, refs}
- }
- data["operations"] = operations
- data["data"] = f.data
- data["consts"] = f.consts
- return json.Marshal(data)
- }
- //////////////////////////////////////////////
- // Experimental event
- ////////////////
- // Handle attach a handler
- func (f *Flow) Handle(handler func(name string, payLoad map[string]Data)) {
- f.handlers = append(f.handlers, handler)
- }
- func (f *Flow) trigger(name string, payLoad map[string]Data) {
- for _, h := range f.handlers {
- h(name, payLoad)
- }
- }
|