operation.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. package flow
  2. //
  3. // Find a way to improve this mess, maybe it can be merged in one func
  4. //
  5. //
  6. import (
  7. "errors"
  8. "fmt"
  9. "log"
  10. "reflect"
  11. "sync"
  12. "github.com/gohxs/prettylog"
  13. )
  14. // OpCtx operation Context
  15. type OpCtx = *sync.Map
  16. // NewOpCtx creates a running context
  17. func newOpCtx() OpCtx {
  18. return &sync.Map{}
  19. }
  20. // dumbSet
  21. func dumbSet(params ...Data) {}
  22. // Operation interface
  23. type Operation interface { // Id perhaps?
  24. ID() string
  25. Set(inputs ...Data) // Special var method
  26. Process(params ...Data) Data
  27. }
  28. // Run Context actually not OpCTX
  29. //local operation information
  30. type operation struct {
  31. sync.Mutex
  32. flow *Flow
  33. id interface{} // Interface key
  34. kind string
  35. src string
  36. set func(params ...Data)
  37. process func(ctx OpCtx, params ...Data) Data
  38. }
  39. // Id returns string Id of the operaton
  40. func (o *operation) ID() string {
  41. return fmt.Sprint(o.id)
  42. }
  43. // Process operation process wrapper
  44. func (o *operation) Process(params ...Data) Data {
  45. return o.processWithCtx(newOpCtx(), params...)
  46. }
  47. // Every single one is run with this internally
  48. func (o *operation) processWithCtx(ctx OpCtx, params ...Data) Data {
  49. entry, _ := o.flow.getOp(fmt.Sprint(o.id))
  50. log := prettylog.New(o.ID() + ":" + entry.name)
  51. log.Printf("Operation waiting")
  52. o.Lock()
  53. defer o.Unlock()
  54. log.Printf("Executing")
  55. log.Println("Context", ctx)
  56. if o.flow.err != nil {
  57. return nil
  58. }
  59. if ctx == nil { // No cache/Context
  60. log.Printf("Processing")
  61. return o.process(ctx, params...)
  62. }
  63. if v, ok := ctx.Load(o); ok {
  64. log.Printf("Cached")
  65. return v
  66. }
  67. log.Printf("Processing")
  68. res := o.process(ctx, params...)
  69. log.Println("Storing", res)
  70. ctx.Store(o, res)
  71. return res
  72. }
  73. // Set setter for certain operations (Var)
  74. func (o *operation) Set(params ...Data) {
  75. o.set(params...)
  76. }
  77. //////////////////////////////////////
  78. // Operators definition
  79. ///////////////
  80. func opIn(f *Flow, id int) *operation {
  81. return &operation{
  82. flow: f,
  83. id: id,
  84. kind: "in",
  85. set: dumbSet,
  86. process: func(ctx OpCtx, params ...Data) Data {
  87. if id >= len(params) || id < 0 {
  88. f.err = errors.New("invalid input")
  89. return nil
  90. }
  91. return params[id]
  92. },
  93. }
  94. }
  95. func opConst(f *Flow, id string) *operation {
  96. return &operation{
  97. flow: f,
  98. id: id,
  99. kind: "const",
  100. set: dumbSet,
  101. process: func(ctx OpCtx, params ...Data) Data {
  102. ret := f.consts[id]
  103. return ret
  104. },
  105. }
  106. }
  107. func opFunc(f *Flow, id string) *operation {
  108. return &operation{
  109. flow: f,
  110. id: id,
  111. kind: "func",
  112. set: dumbSet,
  113. process: func(ctx OpCtx, params ...Data) Data {
  114. op, ok := f.getOp(id)
  115. if !ok {
  116. f.err = fmt.Errorf("invalid operation '%s'", id)
  117. log.Println("Operation not ok", f.err)
  118. f.hooks.error(id, f.err)
  119. return nil
  120. }
  121. fnval := reflect.ValueOf(op.executor)
  122. if fnval.Type().NumIn() != len(op.inputs) {
  123. f.err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
  124. f.hooks.error(id, f.err)
  125. log.Println("Operation not ok", f.err)
  126. return nil
  127. }
  128. /////////////////////////////
  129. // NEW PARALLEL PROCESSING
  130. ///////////
  131. f.hooks.wait(id)
  132. callParam := make([]reflect.Value, len(op.inputs))
  133. wg := sync.WaitGroup{}
  134. wg.Add(len(op.inputs))
  135. for i, in := range op.inputs {
  136. go func(i int, in *operation) {
  137. defer wg.Done()
  138. fr := in.processWithCtx(ctx, params...)
  139. callParam[i] = reflect.ValueOf(fr)
  140. }(i, in)
  141. }
  142. wg.Wait()
  143. // Check params
  144. for _, p := range callParam {
  145. if !p.IsValid() {
  146. f.err = fmt.Errorf("Input failed", p)
  147. log.Println("Flow err:", f.err)
  148. f.hooks.error(id, f.err)
  149. return nil
  150. }
  151. }
  152. f.hooks.start(id)
  153. fnret := fnval.Call(callParam)
  154. if len(fnret) > 1 {
  155. if err := fnret[1].Interface().(error); err != nil {
  156. f.err = err
  157. log.Println("Flow err:", f.err)
  158. f.hooks.error(id, f.err)
  159. return nil
  160. }
  161. }
  162. // check fnret[1] for error
  163. ret := fnret[0].Interface()
  164. f.hooks.finish(id, ret)
  165. return ret
  166. },
  167. }
  168. }
  169. func opVar(f *Flow, id string) *operation {
  170. return &operation{
  171. flow: f,
  172. id: id,
  173. kind: "var",
  174. set: func(params ...Data) { f.data[id] = params[0] },
  175. process: func(ctx OpCtx, params ...Data) Data { return f.data[id] },
  176. }
  177. }
  178. func opNil(f *Flow) *operation {
  179. return &operation{
  180. flow: f,
  181. kind: "nil",
  182. process: func(ctx OpCtx, params ...Data) Data { f.err = errors.New("Nil operation"); return nil },
  183. }
  184. }