|
@@ -7,9 +7,7 @@ import (
|
|
|
"flow/registry"
|
|
|
"fmt"
|
|
|
"log"
|
|
|
- "reflect"
|
|
|
"strconv"
|
|
|
- "strings"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
@@ -43,6 +41,15 @@ type FlowDocument struct {
|
|
|
Triggers []Trigger `json:"triggers"`
|
|
|
}
|
|
|
|
|
|
+func (fd *FlowDocument) fetchNodeByID(ID string) *Node {
|
|
|
+ for _, n := range fd.Nodes {
|
|
|
+ if n.ID == ID {
|
|
|
+ return &n
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
func (fd *FlowDocument) fetchTriggerFrom(ID string) []Trigger {
|
|
|
ret := []Trigger{}
|
|
|
for _, t := range fd.Triggers {
|
|
@@ -66,114 +73,86 @@ func (fd *FlowDocument) fetchLinksTo(ID string) []Link {
|
|
|
return ret
|
|
|
}
|
|
|
|
|
|
-var ErrLoop = errors.New("Looping through loops is disabled for now")
|
|
|
+var ErrLoop = errors.New("Looping through is disabled for now")
|
|
|
|
|
|
// FlowBuild build the graph based on an starting node
|
|
|
func FlowBuild(rawData []byte, r *registry.R, startingID string) (*flow.Flow, error) {
|
|
|
|
|
|
- log.Println("Unmarshal document")
|
|
|
doc := FlowDocument{[]Node{}, []Link{}, []Trigger{}}
|
|
|
err := json.Unmarshal(rawData, &doc)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- log.Println("Create new flow and add a registry")
|
|
|
f := flow.New()
|
|
|
f.SetRegistry(r)
|
|
|
|
|
|
- log.Println("Temporarly map the nodes with ID")
|
|
|
- nodeMap := map[string]Node{}
|
|
|
- for _, n := range doc.Nodes {
|
|
|
- log.Println("Mapping:", n.ID)
|
|
|
- nodeMap[n.ID] = n
|
|
|
- }
|
|
|
- //inputMap := map[string]flow.Operation{}
|
|
|
-
|
|
|
nodeTrack := map[string]bool{}
|
|
|
|
|
|
- log.Println("Recursive node matching")
|
|
|
- var fetchInputs func(ID string) error
|
|
|
- fetchInputs = func(ID string) error {
|
|
|
- log.Println("Check for loops")
|
|
|
+ var build func(ID string) (flow.Operation, error)
|
|
|
+ build = func(ID string) (flow.Operation, error) {
|
|
|
if _, ok := nodeTrack[ID]; ok {
|
|
|
- return ErrLoop //fmt.Errorf("[%v] Looping through nodes is disabled:", ID)
|
|
|
+ return nil, ErrLoop //fmt.Errorf("[%v] Looping through nodes is disabled:", ID)
|
|
|
}
|
|
|
nodeTrack[ID] = true
|
|
|
|
|
|
// If flow already has ID just return
|
|
|
- if f.HasOp(ID) {
|
|
|
- log.Println("Entry found, continuing")
|
|
|
- return nil
|
|
|
- }
|
|
|
-
|
|
|
- node, ok := nodeMap[ID]
|
|
|
- if !ok {
|
|
|
- return fmt.Errorf("node not found [%v]", startingID)
|
|
|
+ if op := f.GetOp(ID); op != nil {
|
|
|
+ log.Println("Return operation")
|
|
|
+ return op, nil
|
|
|
}
|
|
|
|
|
|
- entry, err := r.Entry(node.Src)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
+ node := doc.fetchNodeByID(ID)
|
|
|
+ if node == nil {
|
|
|
+ return nil, fmt.Errorf("node not found [%v]", startingID)
|
|
|
}
|
|
|
|
|
|
- inputs := doc.fetchLinksTo(node.ID)
|
|
|
- param := make([]flow.Data, len(entry.Inputs))
|
|
|
- // Build ops
|
|
|
- for _, l := range inputs {
|
|
|
- from := nodeMap[l.From]
|
|
|
+ var op flow.Operation
|
|
|
|
|
|
- switch from.Src {
|
|
|
- case "Input":
|
|
|
- inputID, err := strconv.Atoi(from.Prop["input"])
|
|
|
- if err != nil {
|
|
|
- return errors.New("Invalid inputID value, must be a number")
|
|
|
- }
|
|
|
- param[l.In] = f.In(inputID) // By id perhaps
|
|
|
- case "Variable":
|
|
|
- param[l.In] = f.Var(from.ID, from.Prop["init"])
|
|
|
- case "Const":
|
|
|
- // XXX: Automate this in a func
|
|
|
- raw := from.Label
|
|
|
- //raw := from.Prop["value"]
|
|
|
- raw = strings.TrimSpace(raw)
|
|
|
- if raw[0] == '"' { // its a string
|
|
|
- log.Println("Unmashal string")
|
|
|
- var val string
|
|
|
- json.Unmarshal([]byte(raw), &val)
|
|
|
- param[l.In] = val
|
|
|
- log.Println("next Input")
|
|
|
- continue
|
|
|
- }
|
|
|
- newVal := reflect.New(entry.Inputs[l.In])
|
|
|
- err := json.Unmarshal([]byte(raw), newVal.Interface())
|
|
|
+ switch node.Src {
|
|
|
+ case "Input":
|
|
|
+ inputID, err := strconv.Atoi(node.Prop["input"])
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.New("Invalid inputID value, must be a number")
|
|
|
+ }
|
|
|
+ op = f.In(inputID) // By id perhaps
|
|
|
+ case "Variable":
|
|
|
+ op = f.Var(node.ID, node.Prop["init"])
|
|
|
+ case "Const":
|
|
|
+ var val flow.Data
|
|
|
+ // XXX: Automate this in a func
|
|
|
+ raw := node.Label
|
|
|
+ err := json.Unmarshal([]byte(raw), &val)
|
|
|
+ if err != nil { // Try to unmarshal as a string?
|
|
|
+ val = string(raw)
|
|
|
+ }
|
|
|
+ op, _ = f.DefConst(node.ID, val)
|
|
|
+ default:
|
|
|
+ // Load entry
|
|
|
+ entry, err := r.Entry(node.Src)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ inputs := doc.fetchLinksTo(node.ID)
|
|
|
+ param := make([]flow.Data, len(entry.Inputs))
|
|
|
+ for _, l := range inputs {
|
|
|
+ param[l.In], err = build(l.From)
|
|
|
if err != nil {
|
|
|
- // ignore error?
|
|
|
- log.Println("unmarshalling Error", err)
|
|
|
- return err
|
|
|
- //param[l.In] = nil
|
|
|
- //continue
|
|
|
- }
|
|
|
- param[l.In], _ = f.Const(newVal.Elem().Interface())
|
|
|
- default:
|
|
|
- param[l.In] = f.Res(from.ID)
|
|
|
- // Sub process the child
|
|
|
- if err := fetchInputs(from.ID); err != nil {
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
}
|
|
|
+ op, err = f.DefOp(node.ID, node.Src, param...)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
}
|
|
|
- f.DefOp(node.ID, node.Src, param...)
|
|
|
|
|
|
// Process triggers for this node
|
|
|
triggers := doc.fetchTriggerFrom(node.ID)
|
|
|
for _, t := range triggers {
|
|
|
- log.Println("Trigger:", t)
|
|
|
- log.Println("Registering operator", t.To)
|
|
|
- err := fetchInputs(t.To)
|
|
|
+ _, err := build(t.To)
|
|
|
if err != nil {
|
|
|
- log.Println("Error on trigger input", err)
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
// Register the thing here
|
|
|
f.Hook(flow.Hook{
|
|
@@ -196,18 +175,19 @@ func FlowBuild(rawData []byte, r *registry.R, startingID string) (*flow.Flow, er
|
|
|
log.Println("Mismatching trigger, but its a test")
|
|
|
}
|
|
|
|
|
|
- log.Println("Matching trigger", t.To)
|
|
|
- op := f.Res(t.To)
|
|
|
+ op := f.Res(t.To) // Repeating
|
|
|
go op.Process(name) // Background
|
|
|
},
|
|
|
})
|
|
|
|
|
|
delete(nodeTrack, node.ID)
|
|
|
}
|
|
|
- return nil
|
|
|
+ return op, nil
|
|
|
}
|
|
|
- err = fetchInputs(startingID)
|
|
|
+
|
|
|
+ _, err = build(startingID)
|
|
|
if err != nil {
|
|
|
+ log.Println("Error building")
|
|
|
return nil, err
|
|
|
}
|
|
|
|