flow.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package flow
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. )
  7. // Data interface
  8. type Data = interface{}
  9. type opEntry struct {
  10. name string
  11. inputs []operation // still figuring, might be operation
  12. executor interface{}
  13. }
  14. // Flow structure
  15. // We could Create a single array of operations
  16. // refs would only mean id, types would be embed in operation
  17. type Flow struct {
  18. registry *Registry
  19. consts []Data
  20. data []Data
  21. operations []opEntry
  22. err error
  23. }
  24. // New create a new flow
  25. func New() *Flow {
  26. return &Flow{
  27. registry: globalRegistry,
  28. consts: []Data{},
  29. data: []Data{},
  30. operations: []opEntry{},
  31. }
  32. }
  33. // OpFunc return
  34. func (f *Flow) Op(name string, params ...interface{}) operation {
  35. inputs := make([]operation, len(params))
  36. for i, p := range params {
  37. switch v := p.(type) {
  38. case operation:
  39. inputs[i] = v
  40. default:
  41. inputs[i] = f.Const(v)
  42. }
  43. }
  44. // Grab executor here
  45. executor, err := f.registry.Get(name)
  46. if err != nil {
  47. f.err = err
  48. return OpFunc{}
  49. }
  50. f.operations = append(f.operations, opEntry{name, inputs, executor})
  51. refID := len(f.operations) - 1
  52. // Initialize opfunc maybe
  53. return OpFunc{ref{f, refID}}
  54. }
  55. // Const returns a const operation
  56. func (f *Flow) Const(value interface{}) OpConst {
  57. f.consts = append(f.consts, value)
  58. refID := len(f.consts) - 1
  59. return OpConst{ref{f, refID}}
  60. }
  61. // Variable operation
  62. func (f *Flow) Variable(value interface{}) OpVar {
  63. f.data = append(f.consts, value)
  64. refID := len(f.data) - 1
  65. return OpVar{ref{f, refID}}
  66. }
  67. func (f *Flow) In(paramId int) OpIn {
  68. return OpIn{ref{f, paramId}}
  69. }
  70. // Run a batch of operation
  71. func (f *Flow) Run(ops ...operation) ([]Data, error) {
  72. if f.err != nil {
  73. return nil, f.err
  74. }
  75. res := []Data{}
  76. for _, o := range ops {
  77. res = append(res, o.Process())
  78. }
  79. return res, nil
  80. }
  81. // Analyse every operations
  82. func (f *Flow) Analyse(params ...Data) string {
  83. ret := bytes.NewBuffer(nil)
  84. fmt.Fprintf(ret, "Flow analysis:\n")
  85. for i, op := range f.operations {
  86. fmt.Fprintf(ret, " [%d] %s(", i, op.name)
  87. for j, in := range op.inputs {
  88. ref := in.fref()
  89. if j != 0 {
  90. fmt.Fprintf(ret, ", ")
  91. }
  92. ires := in.Process(params...)
  93. fmt.Fprintf(ret, "%s@%d(%v)", typeName(in), ref.ID, ires)
  94. }
  95. fmt.Fprintf(ret, ") - ")
  96. // Create OpProcessor and execute
  97. //
  98. opfn := OpFunc{ref{f, i}}
  99. res := opfn.Process(params...)
  100. fmt.Fprintf(ret, "%v\n", res)
  101. }
  102. return ret.String()
  103. }
  104. func (f *Flow) String() string {
  105. ret := bytes.NewBuffer(nil)
  106. // consts
  107. fmt.Fprintf(ret, "consts:\n")
  108. for i, v := range f.consts {
  109. fmt.Fprintf(ret, " [%d] %v\n", i, v)
  110. }
  111. fmt.Fprintf(ret, "data:\n") // Or variable
  112. for i, v := range f.data {
  113. fmt.Fprintf(ret, " [%d] %v\n", i, v)
  114. }
  115. fmt.Fprintf(ret, "operations:\n")
  116. for i, v := range f.operations {
  117. fmt.Fprintf(ret, " [%d] %s(", i, v.name)
  118. for j, in := range v.inputs {
  119. ref := in.fref()
  120. if j != 0 {
  121. fmt.Fprintf(ret, ", ")
  122. }
  123. fmt.Fprintf(ret, "%s@%d", typeName(in), ref.ID)
  124. }
  125. fmt.Fprintf(ret, ")\n")
  126. }
  127. return ret.String()
  128. }
  129. // Ref?
  130. func (f *Flow) MarshalJSON() ([]byte, error) {
  131. data := map[string]interface{}{}
  132. type opMarshal struct {
  133. Name string
  134. Input []map[string]interface{}
  135. }
  136. operations := make([]opMarshal, len(f.operations))
  137. for i, o := range f.operations {
  138. refs := []map[string]interface{}{}
  139. for _, in := range o.inputs { // Switch type?
  140. refs = append(refs, map[string]interface{}{
  141. "type": typeName(in),
  142. "id": in.fref().ID,
  143. })
  144. }
  145. operations[i] = opMarshal{o.name, refs}
  146. }
  147. data["operations"] = operations
  148. data["data"] = f.data
  149. data["consts"] = f.consts
  150. return json.Marshal(data)
  151. }