builder.go 4.5 KB


  1. package flowbuilder
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "flow"
  6. "flow/registry"
  7. "fmt"
  8. "log"
  9. "reflect"
  10. "strconv"
  11. "time"
  12. )
  13. // ErrLoop loop error
  14. var ErrLoop = errors.New("Looping is disabled for now")
  15. // FlowBuilder builds a flow from flow-ui json data
  16. type FlowBuilder struct {
  17. registry *registry.R
  18. doc *FlowDocument
  19. flow *flow.Flow
  20. nodeTrack map[string]bool
  21. Err error
  22. }
  23. // New creates a New builder
  24. func New(r *registry.R) *FlowBuilder {
  25. return &FlowBuilder{
  26. registry: r,
  27. nodeTrack: map[string]bool{},
  28. }
  29. }
  30. // Load document from json into builder
  31. func (fb *FlowBuilder) Load(rawData []byte) *FlowBuilder {
  32. fb.flow = flow.New()
  33. fb.flow.SetRegistry(fb.registry)
  34. doc := &FlowDocument{[]Node{}, []Link{}, []Trigger{}}
  35. log.Println("Loading document from:", string(rawData))
  36. err := json.Unmarshal(rawData, doc)
  37. if err != nil {
  38. fb.Err = err
  39. return fb
  40. }
  41. fb.doc = doc
  42. return fb
  43. }
  44. // Build a flow starting from node
  45. func (fb *FlowBuilder) Build(ID string) flow.Operation {
  46. if fb.Err != nil {
  47. op, _ := fb.flow.DefErrOp(ID, fb.Err)
  48. return op
  49. }
  50. f := fb.flow
  51. r := fb.registry
  52. doc := fb.doc
  53. if _, ok := fb.nodeTrack[ID]; ok {
  54. fb.Err = ErrLoop //fmt.Errorf("[%v] Looping through nodes is disabled:", ID)
  55. op, _ := fb.flow.DefErrOp(ID, fb.Err)
  56. return op
  57. }
  58. fb.nodeTrack[ID] = true
  59. defer delete(fb.nodeTrack, ID)
  60. // If flow already has ID just return
  61. if op := f.GetOp(ID); op != nil {
  62. return op
  63. }
  64. node := fb.doc.fetchNodeByID(ID)
  65. if node == nil {
  66. op, _ := fb.flow.DefErrOp(ID, fmt.Errorf("node not found [%v]", ID))
  67. return op
  68. }
  69. var op flow.Operation
  70. switch node.Src {
  71. case "Input":
  72. inputID, err := strconv.Atoi(node.Prop["input"])
  73. if err != nil {
  74. op, _ = f.DefErrOp(node.ID, errors.New("Invalid inputID value, must be a number"))
  75. } else {
  76. op, _ = f.In(inputID) // By id perhaps
  77. }
  78. /*case "Variable":
  79. // Input 1 is the var
  80. raw := node.Prop["init"]
  81. val, err := parseValue(nil, raw)
  82. if err != nil {
  83. op, _ = f.DefErrOp(node.ID, err)
  84. } else {
  85. op = f.DefVar(node.ID, node.Label, val)
  86. }*/
  87. case "Const":
  88. raw := node.Label
  89. val, err := parseValue(nil, raw)
  90. if err != nil {
  91. op, _ = f.DefErrOp(node.ID, err)
  92. } else {
  93. op, _ = f.DefConst(node.ID, val)
  94. }
  95. default:
  96. // Load entry
  97. entry, err := r.Entry(node.Src)
  98. if err != nil {
  99. op, _ = f.DefErrOp(node.ID, err)
  100. }
  101. //// Process inputs ////
  102. param := make([]flow.Data, len(entry.Inputs))
  103. for i := range param {
  104. l := doc.fetchLinkTo(node.ID, i)
  105. if l == nil { // No link we fetch the value inserted
  106. // Const value
  107. v, err := parseValue(entry.Inputs[i], node.DefaultInputs[i])
  108. if err != nil {
  109. param[i], _ = f.ErrOp(err)
  110. continue
  111. }
  112. param[i] = v
  113. continue
  114. }
  115. param[i] = fb.Build(l.From)
  116. }
  117. op, err = f.DefOp(node.ID, node.Src, param...)
  118. if err != nil {
  119. op, _ := f.DefErrOp(node.ID, err)
  120. return op
  121. }
  122. }
  123. return op
  124. }
  125. func (fb *FlowBuilder) addTriggersTo(node Node) error {
  126. // Process triggers for this node
  127. triggers := fb.doc.fetchTriggerFrom(node.ID)
  128. for _, t := range triggers {
  129. op := fb.Build(t.To)
  130. // Register the thing here
  131. fb.flow.Hook(flow.Hook{
  132. Any: func(name string, ID string, triggerTime time.Time, extra ...interface{}) {
  133. if name != "Error" && name != "Finish" {
  134. return
  135. }
  136. if ID != t.From {
  137. log.Printf("ID[%v] triggered [%v], I'm t.From: %v", ID, name, t.From)
  138. return
  139. }
  140. exec := false
  141. for _, o := range t.On {
  142. if name == o {
  143. exec = true
  144. break
  145. }
  146. }
  147. if !exec {
  148. log.Println("Mismatching trigger, but its a test")
  149. }
  150. //op := opfb.flow.GetOp(t.To) // Repeating
  151. go op.Process(name) // Background
  152. },
  153. })
  154. }
  155. return nil
  156. }
  157. // Flow returns the build flow
  158. func (fb *FlowBuilder) Flow() *flow.Flow {
  159. return fb.flow
  160. }
  161. // Or give a string
  162. func parseValue(typ reflect.Type, raw string) (flow.Data, error) {
  163. if typ == nil {
  164. var val flow.Data
  165. err := json.Unmarshal([]byte(raw), &val)
  166. if err != nil { // Try to unmarshal as a string?
  167. val = string(raw)
  168. }
  169. return val, nil
  170. }
  171. var ret flow.Data
  172. switch typ.Kind() {
  173. case reflect.Int:
  174. v, err := strconv.Atoi(raw)
  175. if err != nil {
  176. log.Println("Wrong int conversion", err)
  177. return nil, err
  178. }
  179. ret = v
  180. case reflect.String:
  181. ret = raw
  182. default:
  183. if len(raw) == 0 {
  184. ret = reflect.Zero(typ)
  185. } else {
  186. refVal := reflect.New(typ)
  187. err := json.Unmarshal([]byte(raw), refVal.Interface())
  188. if err != nil {
  189. return nil, err
  190. }
  191. ret = refVal.Elem().Interface()
  192. }
  193. }
  194. return ret, nil
  195. }