flow.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package flow
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "flow/registry"
  7. "fmt"
  8. "io"
  9. "os"
  10. "sync"
  11. )
  12. // Data interface
  13. type Data = interface{}
  14. type opEntry struct {
  15. sync.Mutex
  16. name string
  17. inputs []*operation // still figuring, might be operation
  18. executor interface{}
  19. }
  20. // Flow structure
  21. // We could Create a single array of operations
  22. // refs would only mean id, types would be embed in operation
  23. type Flow struct {
  24. sync.Mutex
  25. registry *registry.R
  26. idGen func() string
  27. consts map[string]Data
  28. data map[string]Data // Should be named, to fetch later
  29. operations sync.Map
  30. runID int
  31. // Experimental run Event
  32. hooks Hooks
  33. }
  34. // New create a new flow
  35. func New() *Flow {
  36. return &Flow{
  37. registry: registry.Global,
  38. idGen: func() string { return RandString(8) },
  39. // Data
  40. consts: map[string]Data{},
  41. data: map[string]Data{},
  42. operations: sync.Map{},
  43. //map[string]opEntry{},
  44. }
  45. }
  46. //SetRegistry use the registry specified
  47. func (f *Flow) SetRegistry(r *registry.R) *Flow {
  48. f.registry = r
  49. // chain
  50. return f
  51. }
  52. //SetIDGen set the id generator that will generate
  53. //ID for new nodes
  54. func (f *Flow) SetIDGen(idGen func() string) {
  55. f.idGen = idGen
  56. }
  57. // Must Helper to return from operations
  58. func (f *Flow) Must(op Operation, err error) Operation {
  59. if err != nil {
  60. panic(err)
  61. }
  62. return op
  63. }
  64. // Res returns a deferred operation result
  65. // passing the Id
  66. func (f *Flow) Res(id string) Operation {
  67. return opFunc(f, id)
  68. }
  69. // DefOp Manual tag an Operation
  70. func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation, error) {
  71. inputs := make([]*operation, len(params))
  72. for i, p := range params {
  73. switch v := p.(type) {
  74. case *operation:
  75. inputs[i] = v
  76. default:
  77. c, err := f.Const(v)
  78. if err != nil {
  79. return nil, err
  80. }
  81. inputs[i], _ = c.(*operation)
  82. }
  83. }
  84. // Grab executor here
  85. executor, err := f.registry.Get(name)
  86. if err != nil {
  87. return nil, err
  88. }
  89. f.operations.Store(id, &opEntry{sync.Mutex{}, name, inputs, executor})
  90. return opFunc(f, id), nil
  91. }
  92. // Op return an function operator
  93. // name - a previous registered function
  94. // params - the function inputs
  95. func (f *Flow) Op(name string, params ...interface{}) (Operation, error) {
  96. var op Operation
  97. var err error
  98. allocErr := f.allocID(func(id string) error {
  99. op, err = f.DefOp(id, name, params...)
  100. return err
  101. })
  102. if allocErr != nil {
  103. return nil, allocErr
  104. }
  105. return op, err
  106. }
  107. // DefErrOp define a nil operation that will return error
  108. // Usefull for builders
  109. func (f *Flow) DefErrOp(id string, err error) (Operation, error) {
  110. executor := func() error { return err }
  111. f.operations.Store(id, &opEntry{sync.Mutex{}, fmt.Sprintf("(error)<%v>", err), nil, executor})
  112. return opFunc(f, id), nil
  113. }
  114. // ErrOp error operation with generated ID
  115. func (f *Flow) ErrOp(operr error) (Operation, error) {
  116. var op Operation
  117. var err error
  118. allocErr := f.allocID(func(id string) error {
  119. op, err = f.DefErrOp(id, operr)
  120. return err
  121. })
  122. if allocErr != nil {
  123. return nil, err
  124. }
  125. return op, err
  126. }
  127. // DefConst define a const by defined ID
  128. func (f *Flow) DefConst(id string, value Data) (Operation, error) {
  129. f.consts[id] = value
  130. executor := func() Data { return f.consts[id] }
  131. f.operations.Store(id, &opEntry{sync.Mutex{}, fmt.Sprintf("(const)<%s>", id), nil, executor})
  132. return opFunc(f, id), nil
  133. }
  134. // Const returns a const operation with generated ID
  135. func (f *Flow) Const(value Data) (Operation, error) {
  136. var op Operation
  137. var err error
  138. allocErr := f.allocID(func(id string) error {
  139. op, err = f.DefConst(id, value)
  140. return err
  141. })
  142. if allocErr != nil {
  143. return nil, allocErr
  144. }
  145. return op, err
  146. }
  147. // Var operation
  148. func (f *Flow) Var(name string, initial ...Data) Operation {
  149. if _, ok := f.data[name]; !ok {
  150. var v interface{}
  151. if len(initial) > 0 {
  152. v = initial[0]
  153. }
  154. f.data[name] = v
  155. }
  156. return opVar(f, name)
  157. }
  158. // In flow input operator
  159. // paramID - index of the parameter
  160. func (f *Flow) In(paramID int) Operation {
  161. return opIn(f, paramID)
  162. }
  163. // Analyse every operations
  164. func (f *Flow) Analyse(w io.Writer, params ...Data) {
  165. if w == nil {
  166. w = os.Stdout
  167. }
  168. fmt.Fprintf(w, "Ops analysis:\n")
  169. f.operations.Range(func(pk, po interface{}) bool {
  170. k, op := pk.(string), po.(*opEntry)
  171. fw := bytes.NewBuffer(nil)
  172. //fmt.Fprintf(w, " [%s] (%v)", k, op.name)
  173. fmt.Fprintf(fw, " [%s] %s(", pk, op.name)
  174. for j, in := range op.inputs {
  175. //ref := in.(Op)
  176. if j != 0 {
  177. fmt.Fprintf(fw, ", ")
  178. }
  179. ires, err := in.Process(params...)
  180. if err != nil {
  181. fmt.Fprintf(w, "Operator: %s error#%s\n", op.name, err)
  182. return false
  183. }
  184. fmt.Fprintf(fw, " %s[%v](%v)", in.kind, in.id, ires)
  185. }
  186. fmt.Fprintf(fw, ") - ")
  187. // Create OpProcessor and execute
  188. //
  189. opfn := opFunc(f, k)
  190. res, err := opfn.Process(params...)
  191. if err != nil {
  192. fmt.Fprintf(fw, "ERR\n")
  193. }
  194. fmt.Fprintf(fw, "%v\n", res)
  195. fmt.Fprintf(w, "%s", fw.String())
  196. return true
  197. })
  198. }
  199. /////////////////////////////
  200. // Serializers inspectors
  201. //////////////////////
  202. func (f *Flow) String() string {
  203. ret := bytes.NewBuffer(nil)
  204. fmt.Fprintf(ret, "Flow\n")
  205. // consts
  206. fmt.Fprintf(ret, "consts:\n")
  207. for i, v := range f.consts {
  208. fmt.Fprintf(ret, " [%s] %v\n", i, v)
  209. }
  210. fmt.Fprintf(ret, "data:\n") // Or variable
  211. for k, v := range f.data {
  212. fmt.Fprintf(ret, " [%v] %v\n", k, v)
  213. }
  214. fmt.Fprintf(ret, "funcs:\n")
  215. f.operations.Range(func(pk, pv interface{}) bool {
  216. k, v := pk.(string), pv.(*opEntry)
  217. fmt.Fprintf(ret, " [%s] %s(", k, v.name)
  218. for j, in := range v.inputs {
  219. if j != 0 {
  220. fmt.Fprintf(ret, ", ")
  221. }
  222. fmt.Fprintf(ret, "%s[%v]", in.kind, in.id)
  223. }
  224. fmt.Fprintf(ret, ")\n")
  225. return true
  226. })
  227. return ret.String()
  228. }
  229. // MarshalJSON implementation
  230. func (f *Flow) MarshalJSON() ([]byte, error) {
  231. data := map[string]interface{}{}
  232. type opMarshal struct {
  233. Name string
  234. Input []map[string]interface{}
  235. }
  236. operations := map[string]opMarshal{}
  237. f.operations.Range(func(pk, po interface{}) bool {
  238. k, o := pk.(string), po.(*opEntry)
  239. refs := []map[string]interface{}{}
  240. for _, in := range o.inputs { // Switch type?
  241. refs = append(refs, map[string]interface{}{
  242. "type": in.kind,
  243. "id": in.id,
  244. })
  245. }
  246. operations[k] = opMarshal{o.name, refs}
  247. return true
  248. })
  249. data["operations"] = operations
  250. data["data"] = f.data
  251. data["consts"] = f.consts
  252. return json.Marshal(data)
  253. }
  254. func (f *Flow) allocID(fn func(id string) error) error {
  255. genID := func() (string, error) {
  256. f.Lock()
  257. defer f.Unlock()
  258. var id string
  259. // generate ID
  260. for i := 0; i < 10; i++ {
  261. id = f.idGen()
  262. if _, ok := f.operations.Load(id); !ok {
  263. f.operations.Store(id, nil) // tmp
  264. return id, nil
  265. }
  266. }
  267. return "", errors.New("ID Exausted")
  268. }
  269. // Safe generate an ID
  270. id, err := genID()
  271. if err != nil {
  272. return err
  273. }
  274. err = fn(id)
  275. if err != nil {
  276. f.operations.Delete(id)
  277. }
  278. return nil
  279. }
  280. func (f *Flow) addEntry(entry *opEntry) (string, error) {
  281. f.Lock()
  282. defer f.Unlock()
  283. // generate ID
  284. for i := 0; i < 10; i++ {
  285. id := f.idGen()
  286. if _, ok := f.operations.Load(id); ok {
  287. continue
  288. }
  289. f.operations.Store(id, entry)
  290. return id, nil
  291. }
  292. return "", errors.New("ID exausted")
  293. }
  294. /////////////////
  295. // Async data
  296. /////
  297. func (f *Flow) getOp(id string) (*opEntry, bool) {
  298. o, ok := f.operations.Load(id)
  299. if !ok {
  300. return nil, false
  301. }
  302. return o.(*opEntry), true
  303. }
  304. // GetOp Return an existing operation or return notfound error
  305. func (f *Flow) GetOp(id string) Operation {
  306. _, ok := f.operations.Load(id)
  307. if !ok {
  308. return nil
  309. }
  310. return opFunc(f, id)
  311. }
  312. //////////////////////////////////////////////
  313. // Experimental event
  314. ////////////////
  315. // Hook attach the node event hooks
  316. func (f *Flow) Hook(hook Hook) {
  317. f.hooks.Attach(hook)
  318. }