flow.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package flow
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "os"
  9. "reflect"
  10. )
  11. // Data interface
  12. type Data = interface{}
  13. type opEntry struct {
  14. name string
  15. inputs []*operation // still figuring, might be operation
  16. executor interface{}
  17. }
  18. // Flow structure
  19. // We could Create a single array of operations
  20. // refs would only mean id, types would be embed in operation
  21. type Flow struct {
  22. registry *Registry
  23. consts []Data
  24. data map[string]Data // Should be named, to fetch later
  25. operations map[string]opEntry
  26. err error
  27. runID int
  28. // Experimental run Event
  29. handlers []func(name string, payLoad map[string]Data)
  30. }
  31. // New create a new flow
  32. func New() *Flow {
  33. return &Flow{
  34. registry: globalRegistry,
  35. // Data
  36. consts: []Data{},
  37. data: map[string]Data{},
  38. operations: map[string]opEntry{},
  39. }
  40. }
  41. // Err returns internal error state
  42. func (f *Flow) Err() error {
  43. return f.err
  44. }
  45. //SetRegistry use the registry specified
  46. func (f *Flow) SetRegistry(r *Registry) *Flow {
  47. f.registry = r
  48. // chain
  49. return f
  50. }
  51. // DefOp Manual tag an Operation
  52. func (f *Flow) DefOp(id string, name string, params ...interface{}) Operation {
  53. inputs := make([]*operation, len(params))
  54. for i, p := range params {
  55. switch v := p.(type) {
  56. case *operation:
  57. inputs[i] = v
  58. default:
  59. log.Println("WARNING defining const with value", v)
  60. inputs[i] = f.Const(v).(*operation)
  61. }
  62. }
  63. // Grab executor here
  64. executor, err := f.registry.Get(name)
  65. if err != nil {
  66. f.err = err
  67. return nil
  68. }
  69. f.operations[id] = opEntry{name, inputs, executor}
  70. return opFunc(f, id)
  71. }
  72. // Res returns a deferred operation result
  73. // passing the Id
  74. func (f *Flow) Res(id string) Operation {
  75. // Defered operation
  76. return opFunc(f, id)
  77. }
  78. // Op return an function operator
  79. // name - a previous registered function
  80. // params - the function inputs
  81. func (f *Flow) Op(name string, params ...interface{}) Operation {
  82. // Use this on Set?
  83. inputs := make([]*operation, len(params))
  84. for i, p := range params {
  85. switch v := p.(type) {
  86. case *operation:
  87. inputs[i] = v
  88. default:
  89. // fail here
  90. log.Println("WARNING defining const with value", v)
  91. inputs[i] = f.Const(v).(*operation)
  92. }
  93. }
  94. // Grab executor here
  95. executor, err := f.registry.Get(name)
  96. if err != nil {
  97. f.err = err
  98. return nil
  99. }
  100. // generate ID
  101. for {
  102. uuid := puuid()
  103. if _, ok := f.operations[uuid]; ok {
  104. continue
  105. }
  106. f.operations[uuid] = opEntry{name, inputs, executor}
  107. return opFunc(f, uuid)
  108. }
  109. // Initialize opfunc maybe
  110. }
  111. // Const returns a const operation
  112. func (f *Flow) Const(value Data) Operation {
  113. f.consts = append(f.consts, value)
  114. refID := len(f.consts) - 1
  115. return opConst(f, refID)
  116. }
  117. // Var operation
  118. func (f *Flow) Var(name string, initial ...Data) Operation {
  119. if _, ok := f.data[name]; !ok {
  120. var v interface{}
  121. if len(initial) > 0 {
  122. v = initial[0]
  123. }
  124. f.data[name] = v
  125. }
  126. return opVar(f, name)
  127. }
  128. // In flow input operator
  129. // paramID - index of the parameter
  130. func (f *Flow) In(paramID int) Operation {
  131. return opIn(f, paramID)
  132. }
  133. // Run a batch of operation?
  134. func (f *Flow) Run(op Operation, params ...Data) (Data, error) {
  135. if f.err != nil {
  136. return nil, f.err
  137. }
  138. cache := map[*operation]Data{}
  139. return f.run(cache, op, params...)
  140. }
  141. func (f *Flow) run(cache map[*operation]Data, op Operation, params ...Data) (Data, error) {
  142. o := op.(*operation)
  143. if v, ok := cache[o]; ok {
  144. return v, nil
  145. }
  146. // Manually fetch func data because of caching
  147. var r Data
  148. // This is wrong since the only source of func should be on operation
  149. if o.kind == "func" {
  150. op := f.operations[o.id.(string)]
  151. callParam := make([]reflect.Value, len(op.inputs))
  152. for i, in := range op.inputs {
  153. fr, _ := f.run(cache, in, params...) // ignore error
  154. callParam[i] = reflect.ValueOf(fr)
  155. }
  156. r = reflect.ValueOf(op.executor).Call(callParam)[0].Interface()
  157. } else {
  158. r = o.process(nil, params...)
  159. }
  160. cache[o] = r
  161. return r, nil
  162. }
  163. // Analyse every operations
  164. func (f *Flow) Analyse(w io.Writer, params ...Data) {
  165. if w == nil {
  166. w = os.Stdout
  167. }
  168. fmt.Fprintf(w, "Ops analysis:\n")
  169. for k, op := range f.operations {
  170. fw := bytes.NewBuffer(nil)
  171. //fmt.Fprintf(w, " [%s] (%v)", k, op.name)
  172. fmt.Fprintf(fw, " [%s] %s(", k, op.name)
  173. for j, in := range op.inputs {
  174. //ref := in.(Op)
  175. if j != 0 {
  176. fmt.Fprintf(fw, ", ")
  177. }
  178. ires := in.Process(params...)
  179. if f.err != nil {
  180. fmt.Fprintf(w, "Operator: %s error#%s\n", op.name, f.err.Error())
  181. return
  182. }
  183. fmt.Fprintf(fw, " %s[%v](%v)", in.kind, in.id, ires)
  184. }
  185. fmt.Fprintf(fw, ") - ")
  186. // Create OpProcessor and execute
  187. //
  188. opfn := opFunc(f, k)
  189. res := opfn.Process(params...)
  190. fmt.Fprintf(fw, "%v\n", res)
  191. fmt.Fprintf(w, "%s", fw.String())
  192. }
  193. }
  194. /////////////////////////////
  195. // Serializers inspectors
  196. //////////////////////
  197. func (f *Flow) String() string {
  198. ret := bytes.NewBuffer(nil)
  199. // consts
  200. fmt.Fprintf(ret, "consts:\n")
  201. for i, v := range f.consts {
  202. fmt.Fprintf(ret, " [%d] %v\n", i, v)
  203. }
  204. fmt.Fprintf(ret, "data:\n") // Or variable
  205. for k, v := range f.data {
  206. fmt.Fprintf(ret, " [%v] %v\n", k, v)
  207. }
  208. fmt.Fprintf(ret, "operations:\n")
  209. for k, v := range f.operations {
  210. fmt.Fprintf(ret, " [%s] %s(", k, v.name)
  211. for j, in := range v.inputs {
  212. if j != 0 {
  213. fmt.Fprintf(ret, ", ")
  214. }
  215. fmt.Fprintf(ret, "%s[%v]", in.kind, in.id)
  216. }
  217. fmt.Fprintf(ret, ")\n")
  218. }
  219. return ret.String()
  220. }
  221. // MarshalJSON implementation
  222. func (f *Flow) MarshalJSON() ([]byte, error) {
  223. data := map[string]interface{}{}
  224. type opMarshal struct {
  225. Name string
  226. Input []map[string]interface{}
  227. }
  228. operations := map[string]opMarshal{}
  229. for k, o := range f.operations {
  230. refs := []map[string]interface{}{}
  231. for _, in := range o.inputs { // Switch type?
  232. refs = append(refs, map[string]interface{}{
  233. "type": in.kind,
  234. "id": in.id,
  235. })
  236. }
  237. operations[k] = opMarshal{o.name, refs}
  238. }
  239. data["operations"] = operations
  240. data["data"] = f.data
  241. data["consts"] = f.consts
  242. return json.Marshal(data)
  243. }
  244. //////////////////////////////////////////////
  245. // Experimental event
  246. ////////////////
  247. // Handle attach a handler
  248. func (f *Flow) Handle(handler func(name string, payLoad map[string]Data)) {
  249. f.handlers = append(f.handlers, handler)
  250. }
  251. func (f *Flow) trigger(name string, payLoad map[string]Data) {
  252. for _, h := range f.handlers {
  253. h(name, payLoad)
  254. }
  255. }