|
@@ -179,15 +179,24 @@ func (s *FlowSession) Document(c *websocket.Conn) error {
|
|
|
// NodeProcess a node triggering results
|
|
|
// Build a flow and run
|
|
|
func (s *FlowSession) NodeProcess(c *websocket.Conn, data []byte) error {
|
|
|
- ID := string(data[1 : len(data)-1]) // remove " instead of unmarshalling json
|
|
|
+
|
|
|
if s.flow != nil {
|
|
|
- s.Notify("a node is already running")
|
|
|
- return errors.New("node already running")
|
|
|
+ s.Notify("flow is already running")
|
|
|
+ return errors.New("nodes already running")
|
|
|
}
|
|
|
|
|
|
+ ids := []string{}
|
|
|
+ err := json.Unmarshal(data, &ids)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ // *New* 25-02-2018 node Array
|
|
|
+ //ID := string(data[1 : len(data)-1]) // remove " instead of unmarshalling json
|
|
|
+
|
|
|
// Clear activity
|
|
|
s.nodeActivity = map[string]*NodeActivity{}
|
|
|
- s.Broadcast(nil, s.activity()) // Ampty activity
|
|
|
+ s.Broadcast(nil, s.activity()) // Empty activity in clients
|
|
|
|
|
|
build := func() error {
|
|
|
|
|
@@ -201,15 +210,24 @@ func (s *FlowSession) NodeProcess(c *websocket.Conn, data []byte) error {
|
|
|
localR.Add("Log", func() io.Writer {
|
|
|
return s
|
|
|
})
|
|
|
- localR.Add("Output", func(d interface{}) {
|
|
|
-
|
|
|
- r := fmt.Sprint("Result:", d)
|
|
|
+ // this will be disabled
|
|
|
+ localR.Add("Output", func(d interface{}) interface{} {
|
|
|
+ //r := fmt.Sprint("Result:", d)
|
|
|
// Do something
|
|
|
- s.Notify(r)
|
|
|
- s.Write([]byte(r))
|
|
|
+ //s.Notify(r)
|
|
|
+ //s.Write([]byte(r))
|
|
|
+ return d
|
|
|
})
|
|
|
+
|
|
|
builder := flowbuilder.New(localR)
|
|
|
- builder.Load(s.RawDoc).Build(ID)
|
|
|
+ builder.Load(s.RawDoc)
|
|
|
+
|
|
|
+ ops := make([]flow.Operation, len(ids))
|
|
|
+
|
|
|
+ for i, id := range ids {
|
|
|
+ ops[i] = builder.Build(id)
|
|
|
+ }
|
|
|
+ // Multiple ops
|
|
|
if builder.Err != nil {
|
|
|
return builder.Err
|
|
|
}
|
|
@@ -225,13 +243,16 @@ func (s *FlowSession) NodeProcess(c *websocket.Conn, data []byte) error {
|
|
|
defer func() { // After routing gone
|
|
|
s.flow = nil
|
|
|
}()
|
|
|
- // Flow activity
|
|
|
+
|
|
|
+ // Flow hooks
|
|
|
+ // Flow activity TODO: needs improvements as it shouldn't send the overall activity to client
|
|
|
+ // instead should send singular events
|
|
|
s.flow.Hook(flow.Hook{
|
|
|
- Any: func(name string, triggerOp flow.Operation, triggerTime time.Time, extra ...flow.Data) {
|
|
|
+ Any: func(name string, hookOp flow.Operation, triggerTime time.Time, extra ...flow.Data) {
|
|
|
s.Lock()
|
|
|
defer s.Unlock()
|
|
|
|
|
|
- nodeIDs := builder.GetOpIDs(triggerOp)
|
|
|
+ nodeIDs := builder.GetOpIDs(hookOp)
|
|
|
updated := true
|
|
|
for _, nodeID := range nodeIDs {
|
|
|
act, ok := s.nodeActivity[nodeID]
|
|
@@ -254,10 +275,18 @@ func (s *FlowSession) NodeProcess(c *websocket.Conn, data []byte) error {
|
|
|
act.EndTime = triggerTime
|
|
|
// only load data from requested node
|
|
|
// Or if node has the data retrieval flag
|
|
|
- node := builder.Doc.FetchNodeByID(ID)
|
|
|
- if node.Prop["data"] == "true" || nodeID == ID {
|
|
|
- act.Data = extra[0]
|
|
|
+ // if running ids contains the nodeID
|
|
|
+ // we add the data
|
|
|
+ for _, id := range ids {
|
|
|
+ if nodeID == id {
|
|
|
+ act.Data = extra[0]
|
|
|
+ }
|
|
|
}
|
|
|
+ //node := builder.Doc.FetchNodeByID(ID)
|
|
|
+ //log.Println("Should we add data:", ID, node)
|
|
|
+ //if node.Prop["data"] == "true" || nodeID == ID {
|
|
|
+ // act.Data = extra[0]
|
|
|
+ //}
|
|
|
case "Error":
|
|
|
status = "error"
|
|
|
act.EndTime = triggerTime
|
|
@@ -277,16 +306,18 @@ func (s *FlowSession) NodeProcess(c *websocket.Conn, data []byte) error {
|
|
|
},
|
|
|
})
|
|
|
|
|
|
- op, ok := builder.OperationMap[ID]
|
|
|
+ /*op, ok := builder.OperationMap[ID]
|
|
|
if !ok {
|
|
|
return fmt.Errorf("Operation not found %v", ID)
|
|
|
- }
|
|
|
+ }*/
|
|
|
log.Println("Processing operation")
|
|
|
- _, err := op.Process()
|
|
|
+ sess := s.flow.NewSession()
|
|
|
+ _, err = sess.Run(ops...)
|
|
|
if err != nil {
|
|
|
log.Println("Error operation", err)
|
|
|
return err
|
|
|
}
|
|
|
+
|
|
|
log.Println("Experimental storing data to session")
|
|
|
// Copy Data from flow
|
|
|
s.flow.Data.Range(func(k, v interface{}) bool {
|
|
@@ -298,6 +329,7 @@ func (s *FlowSession) NodeProcess(c *websocket.Conn, data []byte) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+ // Background running
|
|
|
go func() {
|
|
|
err := build()
|
|
|
if err != nil {
|
|
@@ -332,11 +364,12 @@ func (s *FlowSession) NodeTrain(c *websocket.Conn, data []byte) error {
|
|
|
localR.Add("Log", func() io.Writer {
|
|
|
return s
|
|
|
})
|
|
|
- localR.Add("Output", func(d interface{}) {
|
|
|
- r := fmt.Sprint("Result:", d)
|
|
|
+ localR.Add("Output", func(d interface{}) interface{} {
|
|
|
+ //r := fmt.Sprint("Result:", d)
|
|
|
// Do something
|
|
|
- s.Notify(r)
|
|
|
- s.Write([]byte(r))
|
|
|
+ //s.Notify(r)
|
|
|
+ //s.Write([]byte(r))
|
|
|
+ return d
|
|
|
})
|
|
|
builder := flowbuilder.New(localR)
|
|
|
builder.Load(s.RawDoc).Build(ID)
|