operation.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. package flow
  2. //
  3. // Find a way to improve this mess, maybe it can be merged in one func
  4. //
  5. //
  6. import (
  7. "errors"
  8. "fmt"
  9. "log"
  10. "reflect"
  11. "sync"
  12. )
  13. type executorFunc func(OpCtx, ...Data) (Data, error)
  14. // Operation interface
  15. type Operation interface { // Id perhaps?
  16. ID() string
  17. Set(input Data) // Special var method
  18. Process(params ...Data) (Data, error)
  19. }
  20. // Will be named operation
  21. type operation struct {
  22. sync.Mutex
  23. flow *Flow
  24. id string
  25. name string
  26. kind string
  27. inputs []*operation // still figuring, might be Operation
  28. setter func(Data)
  29. executor executorFunc
  30. }
  31. // OpCtx operation Context
  32. type OpCtx = *sync.Map
  33. // NewOpCtx creates a running context
  34. func newOpCtx() OpCtx {
  35. return &sync.Map{}
  36. }
  37. func (o *operation) ID() string { return o.id }
  38. func (o *operation) Process(params ...Data) (Data, error) {
  39. // Create CTX
  40. ctx := newOpCtx()
  41. return o.executor(ctx, params...)
  42. //return executeOP(o.flow, o.id, ctx, params...)
  43. }
  44. func (o *operation) Set(data Data) {
  45. if o.setter != nil {
  46. o.setter(data)
  47. }
  48. }
  49. // make Executor for func
  50. func (f *Flow) makeTrigger(id string, fn executorFunc) executorFunc {
  51. return func(ctx OpCtx, params ...Data) (Data, error) {
  52. f.hooks.start(id)
  53. d, err := fn(ctx, params...)
  54. if err != nil {
  55. f.hooks.error(id, err)
  56. return d, err
  57. }
  58. f.hooks.finish(id, d)
  59. return d, err
  60. }
  61. }
  62. // save makeExecutor with a bunch of type checks
  63. // we will create a less safer functions to get performance
  64. func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
  65. return func(ctx OpCtx, params ...Data) (Data, error) {
  66. var err error
  67. //panic recoverer, since nodes are not our functions
  68. defer func() {
  69. if r := recover(); r != nil {
  70. err = fmt.Errorf("%v", r)
  71. f.hooks.error(id, err)
  72. }
  73. }()
  74. op := f.GetOp(id)
  75. if op == nil {
  76. err = fmt.Errorf("invalid operation '%s'", id)
  77. f.hooks.error(id, err)
  78. return nil, err
  79. }
  80. op.Lock()
  81. defer op.Unlock()
  82. // Load from cache if any
  83. if ctx != nil {
  84. if v, ok := ctx.Load(id); ok {
  85. return v, nil
  86. }
  87. }
  88. // Wait for inputs
  89. f.hooks.wait(id)
  90. fnval := reflect.ValueOf(fn)
  91. callParam, err := f.processInputs(ctx, op, fnval, params...)
  92. if err != nil {
  93. log.Println("ERR:", err)
  94. f.hooks.error(id, err)
  95. return nil, err
  96. }
  97. // The actual operation process
  98. // Func returned starting the process
  99. f.hooks.start(id)
  100. // if entry is special we pass the Flow?
  101. fnret := fnval.Call(callParam)
  102. // Output erroring
  103. if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) {
  104. err, ok := fnret[1].Interface().(error)
  105. if !ok {
  106. err = errors.New("unknown error")
  107. }
  108. f.hooks.error(id, err)
  109. return nil, err
  110. }
  111. // THE RESULT
  112. ret := fnret[0].Interface()
  113. // Store in the cache
  114. if ctx != nil {
  115. ctx.Store(id, ret)
  116. }
  117. f.hooks.finish(id, ret)
  118. return ret, nil
  119. }
  120. }
  121. /////////////////////////////
  122. // NEW PARALLEL PROCESSING
  123. ///////////
  124. func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, params ...Data) ([]reflect.Value, error) {
  125. // Flow injector
  126. nInputs := fnval.Type().NumIn()
  127. // Total inputs
  128. OcallParam := make([]reflect.Value, nInputs)
  129. callParam := OcallParam
  130. offs := 0
  131. // Inject flow if the first param is of type Flow
  132. if nInputs > 0 && fnval.Type().In(0) == reflect.TypeOf(f) {
  133. OcallParam[0] = reflect.ValueOf(f)
  134. offs = 1
  135. nInputs--
  136. //shift one to process inputs
  137. callParam = OcallParam[1:]
  138. }
  139. if nInputs != len(op.inputs) {
  140. return nil, fmt.Errorf("expect %d inputs got %d", nInputs, len(op.inputs))
  141. } //Wait
  142. callErrors := ""
  143. paramMutex := sync.Mutex{}
  144. // Parallel processing if inputs
  145. wg := sync.WaitGroup{}
  146. wg.Add(len(op.inputs))
  147. for i, in := range op.inputs {
  148. go func(i int, in *operation) {
  149. defer wg.Done()
  150. inTyp := fnval.Type().In(i + offs)
  151. /////////////////
  152. // Executor
  153. fr, err := in.executor(ctx, params...)
  154. paramMutex.Lock()
  155. defer paramMutex.Unlock()
  156. if err != nil {
  157. callErrors += err.Error() + "\n"
  158. return
  159. }
  160. if fr == nil {
  161. callParam[i] = reflect.ValueOf(reflect.Zero(inTyp).Interface())
  162. }
  163. res := reflect.ValueOf(fr)
  164. var cres reflect.Value
  165. // Conversion effort
  166. switch {
  167. case !res.IsValid():
  168. callErrors += fmt.Sprintf("Input %d invalid\n", i)
  169. return
  170. case !res.Type().ConvertibleTo(inTyp):
  171. if inTyp.Kind() != reflect.String {
  172. callErrors += fmt.Sprintf("Input %d type: %v(%v) cannot be converted to %v\n", i, res.Type(), res.Interface(), inTyp)
  173. return
  174. }
  175. cres = reflect.ValueOf(fmt.Sprint(res.Interface()))
  176. default:
  177. cres = res.Convert(inTyp)
  178. }
  179. // CheckError and safelly append
  180. callParam[i] = cres
  181. }(i, in)
  182. }
  183. wg.Wait()
  184. if callErrors != "" {
  185. return nil, errors.New(callErrors)
  186. }
  187. /*offs := 0
  188. if fnval.Type().NumIn() > 0 && fnval.Type().In(0) == reflect.TypeOf(f) {
  189. nInputs--
  190. offs = 1
  191. }*/
  192. return OcallParam, nil
  193. }
  194. // DefVar define var operation with optional initial
  195. func (f *Flow) DefVar(id string, name string, initial ...Data) Operation {
  196. // Unique
  197. if _, ok := f.Data[name]; !ok {
  198. var v interface{}
  199. if len(initial) > 0 {
  200. v = initial[0]
  201. }
  202. f.Data[name] = v
  203. }
  204. setter := func(v Data) { f.Data[name] = v }
  205. opEntry := &operation{
  206. Mutex: sync.Mutex{},
  207. id: id,
  208. flow: f,
  209. name: fmt.Sprintf("(var)<%s>", name),
  210. kind: "var",
  211. inputs: nil,
  212. setter: setter,
  213. executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) {
  214. // if f.data == nil we set from the init operation
  215. return f.Data[name], nil
  216. }),
  217. }
  218. f.operations.Store(id, opEntry)
  219. return opEntry
  220. }
  221. // DefOp Manual tag an Operation
  222. func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation, error) {
  223. inputs := make([]*operation, len(params))
  224. for i, p := range params {
  225. switch v := p.(type) {
  226. case *operation:
  227. inputs[i] = v
  228. default:
  229. c, err := f.Const(v)
  230. if err != nil {
  231. return nil, err
  232. }
  233. inputs[i], _ = c.(*operation)
  234. }
  235. }
  236. // Grab executor here
  237. registryFn, err := f.registry.Get(name)
  238. if err != nil {
  239. return nil, err
  240. }
  241. executor := f.makeExecutor(id, registryFn)
  242. op := &operation{
  243. Mutex: sync.Mutex{},
  244. id: id,
  245. flow: f,
  246. name: name,
  247. kind: "func",
  248. inputs: inputs,
  249. setter: nil, // No set
  250. executor: executor,
  251. }
  252. f.operations.Store(id, op)
  253. return op, nil
  254. }
  255. // DefErrOp define a nil operation that will return error
  256. // Usefull for builders
  257. func (f *Flow) DefErrOp(id string, err error) (Operation, error) {
  258. op := &operation{
  259. Mutex: sync.Mutex{},
  260. id: id,
  261. flow: f,
  262. name: fmt.Sprintf("(error)<%v>", err),
  263. kind: "error",
  264. inputs: nil,
  265. setter: nil,
  266. executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { return nil, err }),
  267. }
  268. f.operations.Store(id, op)
  269. return op, nil
  270. }
  271. // DefConst define a const by defined ID
  272. func (f *Flow) DefConst(id string, value Data) (Operation, error) {
  273. // Optimize this definition
  274. f.consts[id] = value
  275. op := &operation{
  276. id: id,
  277. Mutex: sync.Mutex{},
  278. flow: f,
  279. name: fmt.Sprintf("(const)<%s>", id),
  280. kind: "const",
  281. inputs: nil,
  282. setter: nil,
  283. executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { return f.consts[id], nil }),
  284. }
  285. f.operations.Store(id, op)
  286. return op, nil
  287. }
  288. // DefIn define input operation
  289. func (f *Flow) DefIn(id string, paramID int) Operation {
  290. op := &operation{
  291. id: id,
  292. Mutex: sync.Mutex{},
  293. flow: f,
  294. name: fmt.Sprintf("(in)<%d>", paramID),
  295. kind: "in",
  296. inputs: nil,
  297. setter: nil,
  298. executor: f.makeTrigger(id, func(ctx OpCtx, params ...Data) (Data, error) { return params[paramID], nil }),
  299. }
  300. f.operations.Store(id, op)
  301. return op
  302. }