operation.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package flow
  2. //
  3. // Find a way to improve this mess, maybe it can be merged in one func
  4. //
  5. //
  6. import (
  7. "errors"
  8. "fmt"
  9. "reflect"
  10. "sync"
  11. )
  12. // OpCtx operation Context
  13. type OpCtx = *sync.Map
  14. // NewOpCtx creates a running context
  15. func newOpCtx() OpCtx {
  16. return &sync.Map{}
  17. }
  18. // dumbSet
  19. func dumbSet(params ...Data) {}
  20. // Operation interface
  21. type Operation interface { // Id perhaps?
  22. ID() string
  23. Set(inputs ...Data) // Special var method
  24. Process(params ...Data) (Data, error)
  25. }
  26. // Run Context actually not OpCTX
  27. //local operation information
  28. type operation struct {
  29. flow *Flow
  30. id interface{} // Interface key
  31. kind string
  32. set func(params ...Data)
  33. process func(ctx OpCtx, params ...Data) (Data, error)
  34. }
  35. // Id returns string Id of the operaton
  36. func (o *operation) ID() string {
  37. return fmt.Sprint(o.id)
  38. }
  39. // Process operation process wrapper
  40. func (o *operation) Process(params ...Data) (Data, error) {
  41. return o.processWithCtx(newOpCtx(), params...)
  42. }
  43. // Every single one is run with this internally
  44. func (o *operation) processWithCtx(ctx OpCtx, params ...Data) (Data, error) {
  45. return o.process(ctx, params...)
  46. /*entry, _ := o.flow.getOp(fmt.Sprint(o.id))
  47. if entry == nil {
  48. log.Println("Entry is nil for id:", o.id, ", why??")
  49. return nil
  50. }
  51. entry.Lock()
  52. defer entry.Unlock()
  53. if o.flow.err != nil {
  54. return nil
  55. }
  56. if ctx == nil { // No cache/Context
  57. }
  58. if v, ok := ctx.Load(o.id); ok {
  59. return v
  60. }
  61. res := o.process(ctx, params...)
  62. ctx.Store(o.id, res)
  63. return res*/
  64. }
  65. // Set setter for certain operations (Var)
  66. func (o *operation) Set(params ...Data) {
  67. o.set(params...)
  68. }
  69. //////////////////////////////////////
  70. // Operators definition
  71. ///////////////
  72. func opIn(f *Flow, id int) *operation {
  73. return &operation{
  74. flow: f,
  75. id: id,
  76. kind: "in",
  77. set: dumbSet,
  78. process: func(ctx OpCtx, params ...Data) (Data, error) {
  79. if id >= len(params) || id < 0 {
  80. return nil, errors.New("invalid input")
  81. }
  82. return params[id], nil
  83. },
  84. }
  85. }
  86. func opConst(f *Flow, id string) *operation {
  87. return &operation{
  88. flow: f,
  89. id: id,
  90. kind: "const",
  91. set: dumbSet,
  92. process: func(ctx OpCtx, params ...Data) (Data, error) {
  93. ret := f.consts[id]
  94. return ret, nil
  95. },
  96. }
  97. }
  98. // Debug type
  99. func opFunc(f *Flow, id string) *operation {
  100. return &operation{
  101. flow: f,
  102. id: id,
  103. kind: "func",
  104. set: dumbSet,
  105. process: func(ctx OpCtx, params ...Data) (ret Data, err error) {
  106. defer func() {
  107. if r := recover(); r != nil {
  108. err = fmt.Errorf("Panic: %v", r)
  109. f.hooks.error(id, err)
  110. }
  111. }()
  112. op, ok := f.getOp(id)
  113. if !ok {
  114. err = fmt.Errorf("invalid operation '%s'", id)
  115. f.hooks.error(id, err)
  116. return nil, err
  117. }
  118. /*if f.err != nil {
  119. return nil
  120. }*/
  121. op.Lock()
  122. defer op.Unlock()
  123. if ctx != nil {
  124. if v, ok := ctx.Load(id); ok { // Cache
  125. return v, nil
  126. }
  127. }
  128. // Check inputs
  129. fnval := reflect.ValueOf(op.executor)
  130. if fnval.Type().NumIn() != len(op.inputs) {
  131. err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
  132. f.hooks.error(id, err)
  133. return nil, err
  134. }
  135. /////////////////////////////
  136. // NEW PARALLEL PROCESSING
  137. ///////////
  138. f.hooks.wait(id)
  139. callParam := make([]reflect.Value, len(op.inputs))
  140. // Parallel processing if inputs
  141. wg := sync.WaitGroup{}
  142. wg.Add(len(op.inputs))
  143. for i, in := range op.inputs {
  144. go func(i int, in *operation) {
  145. defer wg.Done()
  146. fr, err := in.processWithCtx(ctx, params...)
  147. if err != nil {
  148. return
  149. }
  150. callParam[i] = reflect.ValueOf(fr)
  151. }(i, in)
  152. }
  153. wg.Wait()
  154. // Return type checking
  155. errMsg := ""
  156. for i, p := range callParam {
  157. // TypeChecking checking
  158. if !p.IsValid() {
  159. errMsg += fmt.Sprintf("Input %d invalid\n", i)
  160. } else if !p.Type().AssignableTo(fnval.Type().In(i)) {
  161. errMsg += fmt.Sprintf("Input %d type mismatch expected: %v got :%v\n", i, fnval.Type().In(i), p.Type())
  162. }
  163. }
  164. if len(errMsg) > 0 {
  165. err := errors.New(errMsg)
  166. f.hooks.error(id, err)
  167. return nil, err
  168. }
  169. f.hooks.start(id)
  170. fnret := fnval.Call(callParam)
  171. if len(fnret) > 1 && (fnret[1].Interface() != nil) {
  172. err, ok := fnret[1].Interface().(error)
  173. if !ok {
  174. err = errors.New("unknown error")
  175. }
  176. f.hooks.error(id, err)
  177. return nil, err
  178. }
  179. // THE RESULT
  180. ret = fnret[0].Interface()
  181. if ctx != nil {
  182. ctx.Store(id, ret)
  183. }
  184. f.hooks.finish(id, ret)
  185. return ret, nil
  186. },
  187. }
  188. }
  189. func opVar(f *Flow, id string) *operation {
  190. return &operation{
  191. flow: f,
  192. id: id,
  193. kind: "var",
  194. set: func(params ...Data) { f.data[id] = params[0] },
  195. process: func(ctx OpCtx, params ...Data) (Data, error) { return f.data[id], nil },
  196. }
  197. }
  198. func opNil(f *Flow) *operation {
  199. return &operation{
  200. flow: f,
  201. kind: "nil",
  202. process: func(ctx OpCtx, params ...Data) (Data, error) { return nil, errors.New("Nil operation") },
  203. }
  204. }