builder.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package flowbuilder
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "flow"
  6. "flow/registry"
  7. "fmt"
  8. "log"
  9. "reflect"
  10. "strconv"
  11. "time"
  12. )
  13. // ErrLoop loop error
  14. var ErrLoop = errors.New("Looping is disabled for now")
  15. // FlowBuilder builds a flow from flow-ui json data
  16. type FlowBuilder struct {
  17. registry *registry.R
  18. Doc *FlowDocument
  19. flow *flow.Flow
  20. OperationMap map[string]flow.Operation
  21. nodeTrack map[string]bool
  22. Err error
  23. }
  24. // New creates a New builder
  25. func New(r *registry.R) *FlowBuilder {
  26. return &FlowBuilder{
  27. registry: r,
  28. OperationMap: map[string]flow.Operation{},
  29. nodeTrack: map[string]bool{},
  30. }
  31. }
  32. // GetOpIDs fetches operation IDs
  33. // with portals we can have several ids pointing to same operation
  34. func (fb *FlowBuilder) GetOpIDs(op flow.Operation) []string {
  35. var ret []string
  36. for k, v := range fb.OperationMap {
  37. if op == v {
  38. ret = append(ret, k)
  39. }
  40. }
  41. return ret
  42. }
  43. // Load document from json into builder
  44. func (fb *FlowBuilder) Load(rawData []byte) *FlowBuilder {
  45. fb.flow = flow.New()
  46. fb.flow.UseRegistry(fb.registry)
  47. doc := &FlowDocument{[]Node{}, []Link{}, []Trigger{}}
  48. log.Println("Loading document from:", string(rawData))
  49. err := json.Unmarshal(rawData, doc)
  50. if err != nil {
  51. fb.Err = err
  52. return fb
  53. }
  54. fb.Doc = doc
  55. return fb
  56. }
  57. // Build a flow starting from node
  58. func (fb *FlowBuilder) Build(ID string) flow.Operation {
  59. if fb.Err != nil {
  60. op := fb.flow.ErrOp(fb.Err)
  61. return op
  62. }
  63. f := fb.flow
  64. r := fb.registry
  65. doc := fb.Doc
  66. if _, ok := fb.nodeTrack[ID]; ok {
  67. fb.Err = ErrLoop //fmt.Errorf("[%v] Looping through nodes is disabled:", ID)
  68. op := fb.flow.ErrOp(fb.Err)
  69. return op
  70. }
  71. // loop detector
  72. fb.nodeTrack[ID] = true
  73. defer delete(fb.nodeTrack, ID)
  74. // If flow already has ID just return
  75. if op, ok := fb.OperationMap[ID]; ok {
  76. return op
  77. }
  78. node := fb.Doc.FetchNodeByID(ID)
  79. if node == nil {
  80. op := fb.flow.ErrOp(fmt.Errorf("node not found [%v]", ID))
  81. return op
  82. }
  83. var op flow.Operation
  84. var inputs []reflect.Type
  85. switch node.Src {
  86. case "Portal From":
  87. nID := node.Prop["portal from"]
  88. n := doc.FetchNodeByID(nID)
  89. if n == nil {
  90. return f.ErrOp(fmt.Errorf("Invalid portal, id: %v", nID))
  91. }
  92. // Fetch existing or build new
  93. op = fb.Build(nID)
  94. fb.OperationMap[node.ID] = op
  95. return op
  96. case "Input":
  97. inputID, err := strconv.Atoi(node.Prop["input"])
  98. if err != nil {
  99. op := f.ErrOp(errors.New("Invalid inputID value, must be a number"))
  100. fb.OperationMap[node.ID] = op
  101. return op
  102. }
  103. op := f.In(inputID) // By id perhaps
  104. fb.OperationMap[node.ID] = op
  105. return op
  106. case "Var":
  107. log.Println("Source is a variable")
  108. var t interface{}
  109. inputs = []reflect.Type{reflect.TypeOf(t)}
  110. case "SetVar":
  111. log.Println("Source is a setvariable")
  112. var t interface{}
  113. inputs = []reflect.Type{reflect.TypeOf(t)}
  114. default:
  115. log.Println("Loading entry:", node.Src)
  116. entry, err := r.Entry(node.Src)
  117. if err != nil {
  118. op = f.ErrOp(err)
  119. fb.OperationMap[node.ID] = op
  120. return op
  121. }
  122. inputs = entry.Inputs
  123. }
  124. //// Build inputs ////
  125. param := make([]flow.Data, len(inputs))
  126. for i := range param {
  127. l := doc.FetchLinkTo(node.ID, i)
  128. if l == nil { // No link we fetch the value inserted
  129. // Direct input entries
  130. v, err := parseValue(inputs[i], node.DefaultInputs[i])
  131. if err != nil {
  132. param[i] = f.ErrOp(err)
  133. continue
  134. }
  135. param[i] = v
  136. continue
  137. }
  138. param[i] = fb.Build(l.From)
  139. }
  140. //Switch again
  141. switch node.Src {
  142. case "Var":
  143. op = f.Var(node.Prop["variable name"], param[0])
  144. case "SetVar":
  145. op = f.SetVar(node.Prop["variable name"], param[0])
  146. default:
  147. op = f.Op(node.Src, param...)
  148. }
  149. fb.OperationMap[node.ID] = op
  150. fb.buildTriggersFor(node, op)
  151. return op
  152. }
  153. func (fb *FlowBuilder) buildTriggersFor(node *Node, targetOp flow.Operation) error {
  154. // Process triggers for this node
  155. triggers := fb.Doc.FetchTriggerFrom(node.ID)
  156. log.Println("Operation has this triggers:", triggers)
  157. for _, t := range triggers {
  158. log.Println("will build for")
  159. op := fb.Build(t.To)
  160. // Register the thing here
  161. fb.flow.Hook(flow.Hook{
  162. Any: func(name string, triggerOp flow.Operation, triggerTime time.Time, extra ...interface{}) {
  163. if name != "Error" && name != "Finish" {
  164. return
  165. }
  166. if triggerOp != targetOp {
  167. log.Printf("ID triggered [%v], I'm t.From: %v", name, t.From)
  168. return
  169. }
  170. exec := false
  171. for _, o := range t.On {
  172. if name == o {
  173. exec = true
  174. break
  175. }
  176. }
  177. if !exec {
  178. log.Println("Mismatching trigger, but its a test")
  179. }
  180. //op := opfb.flow.GetOp(t.To) // Repeating
  181. go op.Process(name) // Background
  182. },
  183. })
  184. }
  185. return nil
  186. }
  187. // Flow returns the build flow
  188. func (fb *FlowBuilder) Flow() *flow.Flow {
  189. return fb.flow
  190. }
  191. // Or give a string
  192. func parseValue(typ reflect.Type, raw string) (flow.Data, error) {
  193. if len(raw) == 0 {
  194. return nil, nil
  195. }
  196. if typ == nil {
  197. var val flow.Data
  198. err := json.Unmarshal([]byte(raw), &val)
  199. if err != nil { // Try to unmarshal as a string?
  200. val = string(raw)
  201. }
  202. return val, nil
  203. }
  204. var ret flow.Data
  205. switch typ.Kind() {
  206. case reflect.Int:
  207. v, err := strconv.Atoi(raw)
  208. if err != nil {
  209. return nil, err
  210. }
  211. ret = v
  212. case reflect.String:
  213. ret = raw
  214. default:
  215. if len(raw) == 0 {
  216. return nil, nil
  217. }
  218. //ret = reflect.Zero(typ)
  219. refVal := reflect.New(typ)
  220. err := json.Unmarshal([]byte(raw), refVal.Interface())
  221. if err != nil {
  222. return nil, err
  223. }
  224. ret = refVal.Elem().Interface()
  225. }
  226. return ret, nil
  227. }