operation.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. package flow
  2. //
  3. // Find a way to improve this mess, maybe it can be merged in one func
  4. //
  5. //
  6. import (
  7. "errors"
  8. "fmt"
  9. "log"
  10. "reflect"
  11. "runtime/debug"
  12. "sync"
  13. )
  14. type executorFunc func(OpCtx, ...Data) (Data, error)
  15. // Operation interface
  16. type Operation interface { // Id perhaps?
  17. //ID() string
  18. //Set(input Data) // Special var method
  19. Process(params ...Data) (Data, error)
  20. }
  21. type operation struct {
  22. sync.Mutex
  23. flow *Flow
  24. name string
  25. kind string
  26. // Could be a simple ID for operation, but it will depend on flow
  27. inputs []*operation // still figuring, might be Operation
  28. fn interface{} // Any func
  29. executor executorFunc
  30. }
  31. // OpCtx operation Context
  32. type OpCtx = *sync.Map
  33. // NewOpCtx creates a running context
  34. func newOpCtx() OpCtx {
  35. return &sync.Map{}
  36. }
  37. //func (o *operation) ID() string { return o.id }
  38. func (o *operation) Process(params ...Data) (Data, error) {
  39. // Create CTX
  40. ctx := newOpCtx()
  41. return o.executor(ctx, params...)
  42. }
  43. // make Executor for func
  44. func (f *Flow) asTrigger(op *operation, fn executorFunc) executorFunc {
  45. return func(ctx OpCtx, params ...Data) (Data, error) {
  46. f.hooks.start(op)
  47. //panic recoverer, since nodes are not our functions
  48. var err error
  49. var res Data
  50. func() {
  51. defer func() {
  52. if r := recover(); r != nil {
  53. log.Println("Panic:", r)
  54. debug.PrintStack()
  55. err = fmt.Errorf("%v", r)
  56. }
  57. }()
  58. res, err = fn(ctx, params...)
  59. }()
  60. if err != nil {
  61. f.hooks.error(op, err)
  62. } else {
  63. f.hooks.finish(op, res)
  64. }
  65. return res, err
  66. }
  67. }
  68. // safe makeExecutor with a bunch of type checks
  69. // we will create a less safer functions to get performance
  70. func (f *Flow) makeExecutor(op *operation, fn interface{}) executorFunc {
  71. // ExecutorFunc
  72. return func(ctx OpCtx, params ...Data) (Data, error) {
  73. /*op := f.GetOp(id)
  74. if op == nil {
  75. return nil, fmt.Errorf("invalid operation '%s'", id)
  76. }*/
  77. op.Lock()
  78. defer op.Unlock()
  79. // Load from cache if any
  80. if ctx != nil {
  81. if v, ok := ctx.Load(op); ok {
  82. return v, nil
  83. }
  84. }
  85. // Change to wait to wait for the inputs
  86. f.hooks.wait(op)
  87. fnval := reflect.ValueOf(fn)
  88. callParam, err := f.processInputs(ctx, op, fnval, params...)
  89. if err != nil {
  90. return nil, err
  91. }
  92. // Start again and execute function
  93. f.hooks.start(op)
  94. fnret := fnval.Call(callParam)
  95. if len(fnret) == 0 {
  96. return nil, nil
  97. }
  98. // Output erroring
  99. if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) {
  100. err, ok := fnret[len(fnret)-1].Interface().(error)
  101. if !ok {
  102. err = errors.New("unknown error")
  103. }
  104. return nil, err
  105. }
  106. // THE RESULT
  107. ret := fnret[0].Interface()
  108. // Store in the cache
  109. if ctx != nil {
  110. ctx.Store(op, ret)
  111. }
  112. return ret, nil
  113. }
  114. }
  115. // processInputs will run a list of operations and return reflect values
  116. // to be processed next
  117. // NEW PARALLEL PROCESSING
  118. func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, params ...Data) ([]reflect.Value, error) {
  119. nInputs := fnval.Type().NumIn()
  120. // Total inputs
  121. callParam := make([]reflect.Value, nInputs)
  122. if nInputs != len(op.inputs) {
  123. return nil, fmt.Errorf("expect %d inputs got %d", nInputs, len(op.inputs))
  124. } //Wait
  125. callErrors := ""
  126. paramMutex := sync.Mutex{}
  127. // Parallel processing if inputs
  128. wg := sync.WaitGroup{}
  129. wg.Add(len(op.inputs))
  130. for i, in := range op.inputs {
  131. go func(i int, in *operation) {
  132. defer wg.Done()
  133. inTyp := fnval.Type().In(i)
  134. /////////////////
  135. // Executor
  136. fr, err := in.executor(ctx, params...)
  137. //log.Println("Executing:", in.id, in.name, fr)
  138. paramMutex.Lock()
  139. defer paramMutex.Unlock()
  140. if err != nil {
  141. callErrors += err.Error() + "\n"
  142. return
  143. }
  144. if fr == nil {
  145. callParam[i] = reflect.Zero(inTyp)
  146. return
  147. }
  148. res := reflect.ValueOf(fr)
  149. var cres reflect.Value
  150. // Conversion effort
  151. switch {
  152. case !res.IsValid():
  153. callErrors += fmt.Sprintf("Input %d invalid\n", i)
  154. return
  155. case !res.Type().ConvertibleTo(inTyp):
  156. if inTyp.Kind() != reflect.String {
  157. callErrors += fmt.Sprintf("Input %d type: %v(%v) cannot be converted to %v\n", i, res.Type(), res.Interface(), inTyp)
  158. log.Println(f)
  159. return
  160. }
  161. cres = reflect.ValueOf(fmt.Sprint(res.Interface()))
  162. default:
  163. cres = res.Convert(inTyp)
  164. }
  165. // CheckError and safelly append
  166. callParam[i] = cres
  167. }(i, in)
  168. }
  169. wg.Wait()
  170. // Check for any error
  171. if callErrors != "" {
  172. log.Println("Call errors:", callErrors)
  173. return nil, errors.New(callErrors)
  174. }
  175. return callParam, nil
  176. }
  177. // Var create a operation
  178. func (f *Flow) Var(name string, initial Data) Operation {
  179. // Input from params
  180. var input *operation
  181. switch v := initial.(type) {
  182. case *operation:
  183. input = v
  184. default:
  185. c := f.Const(v)
  186. input = c.(*operation)
  187. }
  188. op := &operation{
  189. Mutex: sync.Mutex{},
  190. flow: f,
  191. name: fmt.Sprintf("(var)<%s>", name),
  192. kind: "var",
  193. inputs: []*operation{input},
  194. }
  195. op.executor = f.makeExecutor(op, func(initial Data) Data {
  196. val, ok := f.Data[name]
  197. if !ok {
  198. val = initial
  199. f.Data[name] = val
  200. }
  201. return val
  202. })
  203. return op
  204. }
  205. // Var define var operation with optional initial
  206. /*func (f *Flow) Var(name string, initial ...Data) Operation {
  207. // Unique
  208. if _, ok := f.Data[name]; !ok {
  209. var v interface{}
  210. if len(initial) > 0 {
  211. v = initial[0]
  212. }
  213. f.Data[name] = v
  214. }
  215. op := &operation{
  216. Mutex: sync.Mutex{},
  217. flow: f,
  218. name: fmt.Sprintf("(var)<%s>", name),
  219. kind: "var",
  220. inputs: nil,
  221. }
  222. op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) {
  223. // if f.data == nil we set from the init operation
  224. return f.Data[name], nil
  225. })
  226. f.operations = append(f.operations, op)
  227. //f.operations.Store(id, op)
  228. return op
  229. }*/
  230. // Op Manual tag an Operation
  231. // Define operation for ID
  232. func (f *Flow) Op(name string, params ...interface{}) Operation {
  233. inputs := make([]*operation, len(params))
  234. for i, p := range params {
  235. switch v := p.(type) {
  236. case *operation:
  237. inputs[i] = v
  238. default:
  239. c := f.Const(v)
  240. inputs[i], _ = c.(*operation)
  241. }
  242. }
  243. // If special executor we attach our func
  244. // Grab executor here
  245. registryFn, err := f.registry.Get(name)
  246. if err != nil {
  247. return f.ErrOp(err)
  248. }
  249. op := &operation{
  250. Mutex: sync.Mutex{},
  251. flow: f,
  252. name: name,
  253. kind: "func",
  254. inputs: inputs,
  255. }
  256. executor := f.makeExecutor(op, registryFn)
  257. op.executor = f.asTrigger(op, executor)
  258. f.operations = append(f.operations, op)
  259. return op
  260. }
  261. // ErrOp define a nil operation that will return error
  262. // Usefull for builders
  263. func (f *Flow) ErrOp(err error) Operation {
  264. op := &operation{
  265. Mutex: sync.Mutex{},
  266. //id: id,
  267. flow: f,
  268. name: fmt.Sprintf("(error)<%v>", err),
  269. kind: "error",
  270. inputs: nil,
  271. }
  272. op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) { return nil, err })
  273. //f.operations = append(f.operations, op)
  274. return op
  275. }
  276. // Const define a const by defined ID
  277. func (f *Flow) Const(value Data) Operation {
  278. // Optimize this definition
  279. constID := -1
  280. for k, v := range f.consts {
  281. if v == value {
  282. constID = k
  283. break
  284. }
  285. }
  286. if constID == -1 {
  287. constID = len(f.consts)
  288. f.consts = append(f.consts, value)
  289. }
  290. op := &operation{
  291. Mutex: sync.Mutex{},
  292. flow: f,
  293. name: fmt.Sprintf("(const)<%v:%v>", constID, value),
  294. kind: "const",
  295. inputs: nil,
  296. }
  297. op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) { return f.consts[constID], nil })
  298. //f.operations = append(f.operations, op)
  299. return op
  300. }
  301. // In define input operation
  302. func (f *Flow) In(paramID int) Operation {
  303. op := &operation{
  304. Mutex: sync.Mutex{},
  305. flow: f,
  306. name: fmt.Sprintf("(in)<%d>", paramID),
  307. kind: "in",
  308. inputs: nil,
  309. }
  310. op.executor = f.asTrigger(op, func(ctx OpCtx, params ...Data) (Data, error) {
  311. if paramID < 0 || paramID >= len(params) {
  312. return nil, ErrInput
  313. }
  314. return params[paramID], nil
  315. })
  316. f.operations = append(f.operations, op)
  317. return op
  318. }