123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- package flowbuilder
- import (
- "encoding/json"
- "errors"
- "flow"
- "flow/registry"
- "fmt"
- "log"
- "reflect"
- "strconv"
- "time"
- )
- // ErrLoop loop error
- var ErrLoop = errors.New("Looping is disabled for now")
- // FlowBuilder builds a flow from flow-ui json data
- type FlowBuilder struct {
- registry *registry.R
- doc *FlowDocument
- flow *flow.Flow
- nodeTrack map[string]bool
- Err error
- }
- // New creates a New builder
- func New(r *registry.R) *FlowBuilder {
- return &FlowBuilder{
- registry: r,
- nodeTrack: map[string]bool{},
- }
- }
- // Load document from json into builder
- func (fb *FlowBuilder) Load(rawData []byte) *FlowBuilder {
- fb.flow = flow.New()
- fb.flow.SetRegistry(fb.registry)
- doc := &FlowDocument{[]Node{}, []Link{}, []Trigger{}}
- log.Println("Loading document from:", string(rawData))
- err := json.Unmarshal(rawData, doc)
- if err != nil {
- fb.Err = err
- return fb
- }
- fb.doc = doc
- return fb
- }
- // Build a flow starting from node
- func (fb *FlowBuilder) Build(ID string) flow.Operation {
- if fb.Err != nil {
- op, _ := fb.flow.DefErrOp(ID, fb.Err)
- return op
- }
- f := fb.flow
- r := fb.registry
- doc := fb.doc
- if _, ok := fb.nodeTrack[ID]; ok {
- fb.Err = ErrLoop //fmt.Errorf("[%v] Looping through nodes is disabled:", ID)
- op, _ := fb.flow.DefErrOp(ID, fb.Err)
- return op
- }
- fb.nodeTrack[ID] = true
- defer delete(fb.nodeTrack, ID)
- // If flow already has ID just return
- if op := f.GetOp(ID); op != nil {
- return op
- }
- node := fb.doc.fetchNodeByID(ID)
- if node == nil {
- op, _ := fb.flow.DefErrOp(ID, fmt.Errorf("node not found [%v]", ID))
- return op
- }
- var op flow.Operation
- switch node.Src {
- case "Input":
- inputID, err := strconv.Atoi(node.Prop["input"])
- if err != nil {
- op, _ = f.DefErrOp(node.ID, errors.New("Invalid inputID value, must be a number"))
- } else {
- op, _ = f.In(inputID) // By id perhaps
- }
- /*case "Variable":
- // Input 1 is the var
- raw := node.Prop["init"]
- val, err := parseValue(nil, raw)
- if err != nil {
- op, _ = f.DefErrOp(node.ID, err)
- } else {
- op = f.DefVar(node.ID, node.Label, val)
- }*/
- case "Const":
- raw := node.Label
- val, err := parseValue(nil, raw)
- if err != nil {
- op, _ = f.DefErrOp(node.ID, err)
- } else {
- op, _ = f.DefConst(node.ID, val)
- }
- default:
- // Load entry
- entry, err := r.Entry(node.Src)
- if err != nil {
- op, _ = f.DefErrOp(node.ID, err)
- }
- //// Process inputs ////
- param := make([]flow.Data, len(entry.Inputs))
- for i := range param {
- l := doc.fetchLinkTo(node.ID, i)
- if l == nil { // No link we fetch the value inserted
- // Const value
- v, err := parseValue(entry.Inputs[i], node.DefaultInputs[i])
- if err != nil {
- param[i], _ = f.ErrOp(err)
- continue
- }
- param[i] = v
- continue
- }
- param[i] = fb.Build(l.From)
- }
- op, err = f.DefOp(node.ID, node.Src, param...)
- if err != nil {
- op, _ := f.DefErrOp(node.ID, err)
- return op
- }
- }
- return op
- }
- func (fb *FlowBuilder) addTriggersTo(node Node) error {
- // Process triggers for this node
- triggers := fb.doc.fetchTriggerFrom(node.ID)
- for _, t := range triggers {
- op := fb.Build(t.To)
- // Register the thing here
- fb.flow.Hook(flow.Hook{
- Any: func(name string, ID string, triggerTime time.Time, extra ...interface{}) {
- if name != "Error" && name != "Finish" {
- return
- }
- if ID != t.From {
- log.Printf("ID[%v] triggered [%v], I'm t.From: %v", ID, name, t.From)
- return
- }
- exec := false
- for _, o := range t.On {
- if name == o {
- exec = true
- break
- }
- }
- if !exec {
- log.Println("Mismatching trigger, but its a test")
- }
- //op := opfb.flow.GetOp(t.To) // Repeating
- go op.Process(name) // Background
- },
- })
- }
- return nil
- }
- // Flow returns the build flow
- func (fb *FlowBuilder) Flow() *flow.Flow {
- return fb.flow
- }
- // Or give a string
- func parseValue(typ reflect.Type, raw string) (flow.Data, error) {
- if typ == nil {
- var val flow.Data
- err := json.Unmarshal([]byte(raw), &val)
- if err != nil { // Try to unmarshal as a string?
- val = string(raw)
- }
- return val, nil
- }
- var ret flow.Data
- switch typ.Kind() {
- case reflect.Int:
- v, err := strconv.Atoi(raw)
- if err != nil {
- log.Println("Wrong int conversion", err)
- return nil, err
- }
- ret = v
- case reflect.String:
- ret = raw
- default:
- if len(raw) == 0 {
- ret = reflect.Zero(typ)
- } else {
- refVal := reflect.New(typ)
- err := json.Unmarshal([]byte(raw), refVal.Interface())
- if err != nil {
- return nil, err
- }
- ret = refVal.Elem().Interface()
- }
- }
- return ret, nil
- }
|