package flow import ( "bytes" "encoding/json" "errors" "flow/registry" "fmt" "io" "os" "sync" ) // Data interface type Data = interface{} // Flow structure // We could Create a single array of operations // refs would only mean id, types would be embed in operation type Flow struct { sync.Mutex registry *registry.R idGen func() string consts map[string]Data Data map[string]Data // Should be named, to fetch later operations sync.Map runID int // Experimental run Event hooks Hooks } // New create a new flow func New() *Flow { return &Flow{ registry: registry.Global, idGen: func() string { return RandString(8) }, // Data consts: map[string]Data{}, Data: map[string]Data{}, operations: sync.Map{}, //map[string]opEntry{}, } } //SetRegistry use the registry specified func (f *Flow) SetRegistry(r *registry.R) *Flow { f.registry = r // chain return f } //SetIDGen set the id generator that will generate //ID for new nodes func (f *Flow) SetIDGen(idGen func() string) { f.idGen = idGen } // Must Helper to return from operations func (f *Flow) Must(op Operation, err error) Operation { if err != nil { panic(err) } return op } // Auto ID generation // Op return an function operator // name - a previous registered function // params - the function inputs func (f *Flow) Op(name string, params ...interface{}) (Operation, error) { var op Operation var err error allocErr := f.allocID(func(id string) error { op, err = f.DefOp(id, name, params...) return err }) if allocErr != nil { return nil, allocErr } return op, err } // ErrOp error operation with generated ID func (f *Flow) ErrOp(operr error) (Operation, error) { var op Operation var err error allocErr := f.allocID(func(id string) error { op, err = f.DefErrOp(id, operr) return err }) if allocErr != nil { return nil, err } return op, err } // Const returns a const operation with generated ID func (f *Flow) Const(value Data) (Operation, error) { var op Operation var err error allocErr := f.allocID(func(id string) error { op, err = f.DefConst(id, value) return err }) if allocErr != nil { return nil, allocErr } return op, err } // Var operation func (f *Flow) Var(name string, initial ...Data) Operation { var op Operation err := f.allocID(func(id string) error { op = f.DefVar(id, name, initial...) return nil }) if err != nil { return nil } return op } // In input operation func (f *Flow) In(paramID int) (Operation, error) { var op Operation err := f.allocID(func(id string) error { op = f.DefIn(id, paramID) return nil }) if err != nil { return nil, err } return op, nil } // Analyse every operations func (f *Flow) Analyse(w io.Writer, params ...Data) { if w == nil { w = os.Stdout } fmt.Fprintf(w, "Ops analysis:\n") f.operations.Range(func(pk, po interface{}) bool { k, op := pk.(string), po.(*operation) fw := bytes.NewBuffer(nil) //fmt.Fprintf(w, " [%s] (%v)", k, op.name) fmt.Fprintf(fw, " [%s] %s(", pk, op.name) for j, in := range op.inputs { //ref := in.(Op) if j != 0 { fmt.Fprintf(fw, ", ") } ires, err := in.Process(params...) if err != nil { fmt.Fprintf(w, "Operator: %s error#%s\n", op.name, err) return false } fmt.Fprintf(fw, " %s[%v](%v)", op.kind, op.id, ires) } fmt.Fprintf(fw, ") - ") // Create OpProcessor and execute // opfn := f.GetOp(k) res, err := opfn.Process(params...) if err != nil { fmt.Fprintf(fw, "ERR\n") } fmt.Fprintf(fw, "%v\n", res) fmt.Fprintf(w, "%s", fw.String()) return true }) } ///////////////////////////// // Serializers inspectors ////////////////////// func (f *Flow) String() string { ret := bytes.NewBuffer(nil) fmt.Fprintf(ret, "Flow\n") // consts fmt.Fprintf(ret, "consts:\n") for i, v := range f.consts { fmt.Fprintf(ret, " [%s] %v\n", i, v) } fmt.Fprintf(ret, "data:\n") // Or variable for k, v := range f.Data { fmt.Fprintf(ret, " [%v] %v\n", k, v) } fmt.Fprintf(ret, "operations:\n") f.operations.Range(func(pk, pv interface{}) bool { k, v := pk.(string), pv.(*operation) fmt.Fprintf(ret, " [%s] %s(", k, v.name) for j, in := range v.inputs { if j != 0 { fmt.Fprintf(ret, ", ") } fmt.Fprintf(ret, "%s[%v]", "func", in.ID()) } fmt.Fprintf(ret, ")\n") return true }) return ret.String() } // MarshalJSON implementation func (f *Flow) MarshalJSON() ([]byte, error) { data := map[string]interface{}{} type opMarshal struct { Name string Input []map[string]interface{} } operations := map[string]opMarshal{} f.operations.Range(func(pk, po interface{}) bool { k, o := pk.(string), po.(*operation) refs := []map[string]interface{}{} for _, in := range o.inputs { // Switch type? refs = append(refs, map[string]interface{}{ "type": in.kind, "id": in.id, }) } operations[k] = opMarshal{o.name, refs} return true }) data["operations"] = operations data["data"] = f.Data data["consts"] = f.consts return json.Marshal(data) } func (f *Flow) allocID(fn func(id string) error) error { genID := func() (string, error) { f.Lock() defer f.Unlock() var id string // generate ID for i := 0; i < 10; i++ { id = f.idGen() if _, ok := f.operations.Load(id); !ok { f.operations.Store(id, nil) // tmp return id, nil } } return "", errors.New("ID Exausted") } // Safe generate an ID id, err := genID() if err != nil { return err } err = fn(id) if err != nil { f.operations.Delete(id) } return nil } /*func (f *Flow) addEntry(entry *operation) (string, error) { f.Lock() defer f.Unlock() // generate ID for i := 0; i < 10; i++ { id := f.idGen() if _, ok := f.operations.Load(id); ok { continue } f.operations.Store(id, entry) return id, nil } return "", errors.New("ID exausted") }*/ // GetOp Return an existing operation or return notfound error func (f *Flow) GetOp(id string) *operation { op, ok := f.operations.Load(id) if !ok { return nil } return op.(*operation) } ////////////////////////////////////////////// // Experimental event //////////////// // Hook attach the node event hooks func (f *Flow) Hook(hook Hook) { f.hooks.Attach(hook) }