operation.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package flow
  2. //
  3. // Find a way to improve this mess, maybe it can be merged in one func
  4. //
  5. //
  6. import (
  7. "errors"
  8. "fmt"
  9. "reflect"
  10. )
  11. // OpCtx operation Context
  12. type OpCtx map[Operation]Data
  13. // NewOpCtx creates a running context
  14. func newOpCtx() OpCtx {
  15. return OpCtx{}
  16. }
  17. // dumbSet
  18. func dumbSet(params ...Data) {}
  19. // Operation interface
  20. type Operation interface {
  21. Set(inputs ...Data) // Special var method
  22. Process(params ...Data) Data
  23. }
  24. // Run Context actually not OpCTX
  25. //local operation information
  26. type operation struct {
  27. flow *Flow
  28. id interface{} // Interface key
  29. kind string
  30. set func(params ...Data)
  31. process func(ctx OpCtx, params ...Data) Data
  32. }
  33. // Process operation process wrapper
  34. func (o *operation) Process(params ...Data) Data {
  35. return o.processWithCtx(newOpCtx(), params...)
  36. }
  37. // Every single one is run with this internally
  38. func (o *operation) processWithCtx(ctx OpCtx, params ...Data) Data {
  39. if o.flow.err != nil {
  40. return nil
  41. }
  42. if ctx == nil { // No cache/Context
  43. return o.process(ctx, params...)
  44. }
  45. if v, ok := ctx[o]; ok {
  46. return v
  47. }
  48. res := o.process(ctx, params...)
  49. ctx[o] = res
  50. return res
  51. }
  52. // Set setter for certain operations (Var)
  53. func (o *operation) Set(params ...Data) {
  54. o.set(params...)
  55. }
  56. func opIn(f *Flow, id int) *operation {
  57. return &operation{
  58. flow: f,
  59. id: id,
  60. kind: "in",
  61. set: dumbSet,
  62. process: func(ctx OpCtx, params ...Data) Data {
  63. if id >= len(params) || id < 0 {
  64. f.err = errors.New("invalid input")
  65. return nil
  66. }
  67. return params[id]
  68. },
  69. }
  70. }
  71. func opConst(f *Flow, id int) *operation {
  72. return &operation{
  73. flow: f,
  74. id: id,
  75. kind: "const",
  76. set: dumbSet,
  77. process: func(ctx OpCtx, params ...Data) Data {
  78. ret := f.consts[id]
  79. return ret
  80. },
  81. }
  82. }
  83. func opFunc(f *Flow, id string) *operation {
  84. return &operation{
  85. flow: f,
  86. id: id,
  87. kind: "func",
  88. set: dumbSet,
  89. process: func(ctx OpCtx, params ...Data) Data {
  90. op, ok := f.operations[id]
  91. if !ok {
  92. f.err = fmt.Errorf("invalid operation %s", id)
  93. return nil
  94. }
  95. callParam := make([]reflect.Value, len(op.inputs))
  96. for i, in := range op.inputs {
  97. fr := in.processWithCtx(ctx, params...)
  98. if fr == nil {
  99. f.err = errors.New("returning nil")
  100. return nil
  101. }
  102. callParam[i] = reflect.ValueOf(fr)
  103. }
  104. return reflect.ValueOf(op.executor).Call(callParam)[0].Interface()
  105. },
  106. }
  107. }
  108. func opVar(f *Flow, id string) *operation {
  109. return &operation{
  110. flow: f,
  111. id: id,
  112. kind: "var",
  113. set: func(params ...Data) { f.data[id] = params[0] },
  114. process: func(ctx OpCtx, params ...Data) Data { return f.data[id] },
  115. }
  116. }