package flow import ( "bytes" "encoding/json" "errors" "flow/registry" "fmt" "io" "os" "sync" ) // Data interface type Data = interface{} type opEntry struct { sync.Mutex name string inputs []*operation // still figuring, might be operation executor interface{} } // Flow structure // We could Create a single array of operations // refs would only mean id, types would be embed in operation type Flow struct { sync.Mutex registry *registry.R idGen func() string consts map[string]Data data map[string]Data // Should be named, to fetch later operations sync.Map runID int // Experimental run Event hooks Hooks } // New create a new flow func New() *Flow { return &Flow{ registry: registry.Global, idGen: func() string { return RandString(8) }, // Data consts: map[string]Data{}, data: map[string]Data{}, operations: sync.Map{}, //map[string]opEntry{}, } } //SetRegistry use the registry specified func (f *Flow) SetRegistry(r *registry.R) *Flow { f.registry = r // chain return f } //SetIDGen set the id generator that will generate //ID for new nodes func (f *Flow) SetIDGen(idGen func() string) { f.idGen = idGen } // Must Helper to return from operations func (f *Flow) Must(op Operation, err error) Operation { if err != nil { panic(err) } return op } // Res returns a deferred operation result // passing the Id func (f *Flow) Res(id string) Operation { return opFunc(f, id) } // DefOp Manual tag an Operation func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation, error) { inputs := make([]*operation, len(params)) for i, p := range params { switch v := p.(type) { case *operation: inputs[i] = v default: c, err := f.Const(v) if err != nil { return nil, err } inputs[i], _ = c.(*operation) } } // Grab executor here executor, err := f.registry.Get(name) if err != nil { return nil, err } f.operations.Store(id, &opEntry{sync.Mutex{}, name, inputs, executor}) return opFunc(f, id), nil } // Op return an function operator // name - a previous registered function // params - the function inputs func (f *Flow) Op(name string, params ...interface{}) (Operation, error) { var op Operation var err error allocErr := f.allocID(func(id string) error { op, err = f.DefOp(id, name, params...) return err }) if allocErr != nil { return nil, allocErr } return op, err } // DefErrOp define a nil operation that will return error // Usefull for builders func (f *Flow) DefErrOp(id string, err error) (Operation, error) { executor := func() error { return err } f.operations.Store(id, &opEntry{sync.Mutex{}, fmt.Sprintf("(error)<%v>", err), nil, executor}) return opFunc(f, id), nil } // ErrOp error operation with generated ID func (f *Flow) ErrOp(operr error) (Operation, error) { var op Operation var err error allocErr := f.allocID(func(id string) error { op, err = f.DefErrOp(id, operr) return err }) if allocErr != nil { return nil, err } return op, err } // DefConst define a const by defined ID func (f *Flow) DefConst(id string, value Data) (Operation, error) { f.consts[id] = value executor := func() Data { return f.consts[id] } f.operations.Store(id, &opEntry{sync.Mutex{}, fmt.Sprintf("(const)<%s>", id), nil, executor}) return opFunc(f, id), nil } // Const returns a const operation with generated ID func (f *Flow) Const(value Data) (Operation, error) { var op Operation var err error allocErr := f.allocID(func(id string) error { op, err = f.DefConst(id, value) return err }) if allocErr != nil { return nil, allocErr } return op, err } // Var operation func (f *Flow) Var(name string, initial ...Data) Operation { if _, ok := f.data[name]; !ok { var v interface{} if len(initial) > 0 { v = initial[0] } f.data[name] = v } return opVar(f, name) } // In flow input operator // paramID - index of the parameter func (f *Flow) In(paramID int) Operation { return opIn(f, paramID) } // Analyse every operations func (f *Flow) Analyse(w io.Writer, params ...Data) { if w == nil { w = os.Stdout } fmt.Fprintf(w, "Ops analysis:\n") f.operations.Range(func(pk, po interface{}) bool { k, op := pk.(string), po.(*opEntry) fw := bytes.NewBuffer(nil) //fmt.Fprintf(w, " [%s] (%v)", k, op.name) fmt.Fprintf(fw, " [%s] %s(", pk, op.name) for j, in := range op.inputs { //ref := in.(Op) if j != 0 { fmt.Fprintf(fw, ", ") } ires, err := in.Process(params...) if err != nil { fmt.Fprintf(w, "Operator: %s error#%s\n", op.name, err) return false } fmt.Fprintf(fw, " %s[%v](%v)", in.kind, in.id, ires) } fmt.Fprintf(fw, ") - ") // Create OpProcessor and execute // opfn := opFunc(f, k) res, err := opfn.Process(params...) if err != nil { fmt.Fprintf(fw, "ERR\n") } fmt.Fprintf(fw, "%v\n", res) fmt.Fprintf(w, "%s", fw.String()) return true }) } ///////////////////////////// // Serializers inspectors ////////////////////// func (f *Flow) String() string { ret := bytes.NewBuffer(nil) fmt.Fprintf(ret, "Flow\n") // consts fmt.Fprintf(ret, "consts:\n") for i, v := range f.consts { fmt.Fprintf(ret, " [%s] %v\n", i, v) } fmt.Fprintf(ret, "data:\n") // Or variable for k, v := range f.data { fmt.Fprintf(ret, " [%v] %v\n", k, v) } fmt.Fprintf(ret, "funcs:\n") f.operations.Range(func(pk, pv interface{}) bool { k, v := pk.(string), pv.(*opEntry) fmt.Fprintf(ret, " [%s] %s(", k, v.name) for j, in := range v.inputs { if j != 0 { fmt.Fprintf(ret, ", ") } fmt.Fprintf(ret, "%s[%v]", in.kind, in.id) } fmt.Fprintf(ret, ")\n") return true }) return ret.String() } // MarshalJSON implementation func (f *Flow) MarshalJSON() ([]byte, error) { data := map[string]interface{}{} type opMarshal struct { Name string Input []map[string]interface{} } operations := map[string]opMarshal{} f.operations.Range(func(pk, po interface{}) bool { k, o := pk.(string), po.(*opEntry) refs := []map[string]interface{}{} for _, in := range o.inputs { // Switch type? refs = append(refs, map[string]interface{}{ "type": in.kind, "id": in.id, }) } operations[k] = opMarshal{o.name, refs} return true }) data["operations"] = operations data["data"] = f.data data["consts"] = f.consts return json.Marshal(data) } func (f *Flow) allocID(fn func(id string) error) error { genID := func() (string, error) { f.Lock() defer f.Unlock() var id string // generate ID for i := 0; i < 10; i++ { id = f.idGen() if _, ok := f.operations.Load(id); !ok { f.operations.Store(id, nil) // tmp return id, nil } } return "", errors.New("ID Exausted") } // Safe generate an ID id, err := genID() if err != nil { return err } err = fn(id) if err != nil { f.operations.Delete(id) } return nil } func (f *Flow) addEntry(entry *opEntry) (string, error) { f.Lock() defer f.Unlock() // generate ID for i := 0; i < 10; i++ { id := f.idGen() if _, ok := f.operations.Load(id); ok { continue } f.operations.Store(id, entry) return id, nil } return "", errors.New("ID exausted") } ///////////////// // Async data ///// func (f *Flow) getOp(id string) (*opEntry, bool) { o, ok := f.operations.Load(id) if !ok { return nil, false } return o.(*opEntry), true } // GetOp Return an existing operation or return notfound error func (f *Flow) GetOp(id string) Operation { _, ok := f.operations.Load(id) if !ok { return nil } return opFunc(f, id) } ////////////////////////////////////////////// // Experimental event //////////////// // Hook attach the node event hooks func (f *Flow) Hook(hook Hook) { f.hooks.Attach(hook) }