Переглянути джерело

Time synch for nodeActivity

luis 7 роки тому
батько
коміт
c112744489

+ 4 - 4
browser/vue-flow/src/components/flow/editor.js

@@ -77,8 +77,8 @@ export default {
           // Combine this into one
           match: highlight,
           dragging: this.dragging && !!this.dragging[node.id],
-          activity: this.activity[node.id],
-          nodeStyle: nodeClass.style
+          nodeStyle: nodeClass.style,
+          activity: this.activity && this.activity.nodes && this.activity.nodes[node.id]
         }
       }
     },
@@ -103,7 +103,7 @@ export default {
           y1: nodeFrom.y + fromOutput.y,
           x2: nodeTo.x + toInput.x,
           y2: nodeTo.y + toInput.y,
-          status: this.activity[nodeFrom.id] && this.activity[nodeFrom.id].status
+          status: this.activity.nodes[nodeFrom.id] && this.activity.nodes[nodeFrom.id].status
         }
       }
     },
@@ -134,7 +134,7 @@ export default {
           y1: nodeFrom.y + fromOutput.y,
           x2: nodeTo.x + toInput.x,
           y2: nodeTo.y + toInput.y,
-          status: this.activity[nodeFrom.id] && this.activity[nodeFrom.id].status
+          status: this.activity.nodes[nodeFrom.id] && this.activity.nodes[nodeFrom.id].status
         }
       }
     }

+ 41 - 17
browser/vue-flow/src/components/flow/node-activity.vue

@@ -1,7 +1,7 @@
 <template>
   <g
     class="flow-node__activity"
-    :status="activity.status"
+    :status="nodeActivity.status"
   >
     <rect
       class="flow-node__activity-background"
@@ -12,10 +12,10 @@
       rx="12"
     />
 
-    <icon-refresh v-if="activity.status=='running'" v-bind="iconProps" class="flow-node__activity-icon" />
-    <icon-wait v-else-if="activity.status=='waiting'" v-bind="iconProps" class="flow-node__activity-icon"/>
-    <icon-ok v-else-if="activity.status=='finish'" v-bind="iconProps"class="flow-node__activity-icon"/>
-    <icon-fail v-else-if="activity.status=='error'" v-bind="iconProps"class="flow-node__activity-icon"/>
+    <icon-refresh v-if="nodeActivity.status=='running'" v-bind="iconProps" class="flow-node__activity-icon" />
+    <icon-wait v-else-if="nodeActivity.status=='waiting'" v-bind="iconProps" class="flow-node__activity-icon"/>
+    <icon-ok v-else-if="nodeActivity.status=='finish'" v-bind="iconProps"class="flow-node__activity-icon"/>
+    <icon-fail v-else-if="nodeActivity.status=='error'" v-bind="iconProps"class="flow-node__activity-icon"/>
     <icon-question v-else v-bind="iconProps" class="flow-node__activity-icon" />
 
     <text :class="{active:ellapsed}" class="flow-node__activity-time" x="13" y="4" fill="black">
@@ -24,7 +24,7 @@
   </g>
 </template>
 <script>
-
+import {mapGetters} from 'vuex'
 import IconWait from '@/assets/icons/wait.svg'
 import IconFail from '@/assets/icons/fail.svg'
 import IconOk from '@/assets/icons/ok.svg'
@@ -36,14 +36,20 @@ export default {
   name: 'FlowNodeStatus',
   components: {IconWait, IconFail, IconOk, IconQuestion, IconRefresh},
   props: {
-    activity: {type: Object, default: () => {}}
+    nodeId: {type: String, default: ''}
   },
   data () {
     return {
+      startTime: null,
       finishTime: null
     }
   },
   computed: {
+    ...mapGetters('flow', ['activity']),
+    nodeActivity () { // nodeActivity
+      let ret = this.activity.nodes[this.nodeId]
+      return ret
+    },
     iconProps () {
       return {
         x: -9,
@@ -54,10 +60,8 @@ export default {
       }
     },
     ellapsed () {
-      if (!this.finishTime) return null
-      const s = new Date(Date.parse(this.activity.startTime))
-      if (!utils.dateIsValid(s)) { return null }
-      let intervalms = this.finishTime - s
+      if (!this.startTime || !this.finishTime) return null
+      let intervalms = this.finishTime - this.startTime
       if (intervalms < 0) {
         intervalms = 0
       }
@@ -68,12 +72,12 @@ export default {
     }
   },
   watch: {
-    activity (val, oldVal) {
-      this.finishTime = null
-      this.updateTime()
+    node (val, oldVal) {
+      this.recalcStartTime()
     }
   },
   mounted () {
+    this.recalcStartTime()
     this.updateTime()
     // this._timeOut = setTimeout(this.updateTime, 999)
   },
@@ -81,14 +85,34 @@ export default {
     clearTimeout(this._timeOut)
   },
   methods: {
+    recalcStartTime () {
+      const serverStartTime = new Date(Date.parse(this.nodeActivity.startTime))
+      if (!utils.dateIsValid(serverStartTime)) { return null }
+
+      let serverTime = new Date(Date.parse(this.activity.serverTime))
+      let serverEllapsed = serverTime.getTime() - serverStartTime.getTime()
+
+      let localTime = new Date() // Local
+      // Calculate difference with server time
+      this.startTime = new Date(localTime - serverEllapsed)
+      this.finishTime = null
+
+      this.updateTime()
+    },
     updateTime () {
-      const finish = new Date(Date.parse(this.activity.endTime))
+      if (!utils.dateIsValid(new Date(Date.parse(this.nodeActivity.startTime)))) {
+        return
+      }
+      const finish = new Date(Date.parse(this.nodeActivity.endTime))
       if (utils.dateIsValid(finish)) {
+        // Set time from server
+        this.startTime = new Date(Date.parse(this.nodeActivity.startTime))
         this.finishTime = finish
         return
       }
-      this.finishTime = new Date(new Date().getTime() + 2000) // time correction why?
-      this._timeOut = setTimeout(this.updateTime, 400)
+      this.finishTime = new Date()
+      // this.finishTime = new Date(new Date().getTime() + 2000) // time correction why?
+      this._timeOut = setTimeout(this.updateTime, 500)
     }
 
   }

+ 1 - 2
browser/vue-flow/src/components/flow/node.vue

@@ -144,7 +144,7 @@
 
     <flow-node-activity
       v-if="activity"
-      :activity="activity"
+      :node-id="id"
       :transform="'translate('+bodyProps.width/2 +','+ -bodyProps.height/2 +')'"/>
   </g>
 </template>
@@ -173,7 +173,6 @@ export default {
     'dragging': {type: Boolean, default: false},
     'selected': {type: Boolean, default: false},
     'activity': {type: Object, default: () => {}},
-
     'nodeStyle': {type: Object, default: () => {}}
   },
   data () {

+ 0 - 17
go/src/flow/flow.go

@@ -272,23 +272,6 @@ func (f *Flow) allocID(fn func(id string) error) error {
 
 }
 
-/*func (f *Flow) addEntry(entry *operation) (string, error) {
-	f.Lock()
-	defer f.Unlock()
-
-	// generate ID
-	for i := 0; i < 10; i++ {
-		id := f.idGen()
-		if _, ok := f.operations.Load(id); ok {
-			continue
-		}
-		f.operations.Store(id, entry)
-		return id, nil
-	}
-	return "", errors.New("ID exausted")
-
-}*/
-
 // GetOp Return an existing operation or return notfound error
 func (f *Flow) GetOp(id string) *operation {
 	op, ok := f.operations.Load(id)

+ 18 - 31
go/src/flow/operation.go

@@ -8,7 +8,6 @@ package flow
 import (
 	"errors"
 	"fmt"
-	"log"
 	"reflect"
 	"sync"
 )
@@ -58,16 +57,17 @@ func (o *operation) Set(data Data) {
 }
 
 // make Executor for func
-func (f *Flow) makeTrigger(id string, fn executorFunc) executorFunc {
+func (f *Flow) asTrigger(id string, fn executorFunc) executorFunc {
 	return func(ctx OpCtx, params ...Data) (Data, error) {
 		f.hooks.start(id)
-		d, err := fn(ctx, params...)
+		res, err := fn(ctx, params...)
 		if err != nil {
 			f.hooks.error(id, err)
-			return d, err
+		} else {
+			f.hooks.finish(id, res)
 		}
-		f.hooks.finish(id, d)
-		return d, err
+		return res, err
+
 	}
 }
 
@@ -81,15 +81,12 @@ func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
 		defer func() {
 			if r := recover(); r != nil {
 				err = fmt.Errorf("%v", r)
-				f.hooks.error(id, err)
 			}
 		}()
 
 		op := f.GetOp(id)
 		if op == nil {
-			err = fmt.Errorf("invalid operation '%s'", id)
-			f.hooks.error(id, err)
-			return nil, err
+			return nil, fmt.Errorf("invalid operation '%s'", id)
 		}
 		op.Lock()
 		defer op.Unlock()
@@ -101,41 +98,36 @@ func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
 			}
 		}
 
-		// Wait for inputs
+		// Change to wait
 		f.hooks.wait(id)
 
 		fnval := reflect.ValueOf(fn)
 		callParam, err := f.processInputs(ctx, op, fnval, params...)
 		if err != nil {
-			log.Println("ERR:", err)
-			f.hooks.error(id, err)
 			return nil, err
 		}
 
-		// The actual operation process
-
-		// Func returned starting the process
+		// Start again
 		f.hooks.start(id)
-		// if entry is special we pass the Flow?
 		fnret := fnval.Call(callParam)
+		if len(fnret) == 0 {
+			return nil, nil
+		}
 		// Output erroring
 		if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) {
-			err, ok := fnret[1].Interface().(error)
+			err, ok := fnret[len(fnret)-1].Interface().(error)
 			if !ok {
 				err = errors.New("unknown error")
 			}
-			f.hooks.error(id, err)
 			return nil, err
 		}
 
 		// THE RESULT
 		ret := fnret[0].Interface()
-
 		// Store in the cache
 		if ctx != nil {
 			ctx.Store(id, ret)
 		}
-		f.hooks.finish(id, ret)
 		return ret, nil
 	}
 }
@@ -213,11 +205,6 @@ func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, para
 	if callErrors != "" {
 		return nil, errors.New(callErrors)
 	}
-	/*offs := 0
-	if fnval.Type().NumIn() > 0 && fnval.Type().In(0) == reflect.TypeOf(f) {
-		nInputs--
-		offs = 1
-	}*/
 
 	return OcallParam, nil
 }
@@ -242,7 +229,7 @@ func (f *Flow) DefVar(id string, name string, initial ...Data) Operation {
 		kind:   "var",
 		inputs: nil,
 		setter: setter,
-		executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) {
+		executor: f.asTrigger(id, func(OpCtx, ...Data) (Data, error) {
 			// if f.data == nil we set from the init operation
 			return f.Data[name], nil
 		}),
@@ -280,7 +267,7 @@ func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation,
 		kind:     "func",
 		inputs:   inputs,
 		setter:   nil, // No set
-		executor: executor,
+		executor: f.asTrigger(id, executor),
 	}
 	f.operations.Store(id, op)
 
@@ -298,7 +285,7 @@ func (f *Flow) DefErrOp(id string, err error) (Operation, error) {
 		kind:     "error",
 		inputs:   nil,
 		setter:   nil,
-		executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { return nil, err }),
+		executor: f.asTrigger(id, func(OpCtx, ...Data) (Data, error) { return nil, err }),
 	}
 	f.operations.Store(id, op)
 	return op, nil
@@ -317,7 +304,7 @@ func (f *Flow) DefConst(id string, value Data) (Operation, error) {
 		kind:     "const",
 		inputs:   nil,
 		setter:   nil,
-		executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { return f.consts[id], nil }),
+		executor: f.asTrigger(id, func(OpCtx, ...Data) (Data, error) { return f.consts[id], nil }),
 	}
 	f.operations.Store(id, op)
 
@@ -334,7 +321,7 @@ func (f *Flow) DefIn(id string, paramID int) Operation {
 		kind:     "in",
 		inputs:   nil,
 		setter:   nil,
-		executor: f.makeTrigger(id, func(ctx OpCtx, params ...Data) (Data, error) { return params[paramID], nil }),
+		executor: f.asTrigger(id, func(ctx OpCtx, params ...Data) (Data, error) { return params[paramID], nil }),
 	}
 	f.operations.Store(id, op)
 	return op

+ 12 - 9
go/src/flowserver/flowbuilder/builder.go

@@ -89,14 +89,15 @@ func (fb *FlowBuilder) Build(ID string) flow.Operation {
 		} else {
 			op, _ = f.In(inputID) // By id perhaps
 		}
-	case "Variable":
-		raw := node.Prop["init"]
-		val, err := parseValue(nil, raw)
-		if err != nil {
-			op, _ = f.DefErrOp(node.ID, err)
-		} else {
-			op = f.DefVar(node.ID, node.Label, val)
-		}
+	/*case "Variable":
+	// Input 1 is the var
+	raw := node.Prop["init"]
+	val, err := parseValue(nil, raw)
+	if err != nil {
+		op, _ = f.DefErrOp(node.ID, err)
+	} else {
+		op = f.DefVar(node.ID, node.Label, val)
+	}*/
 	case "Const":
 		raw := node.Label
 		val, err := parseValue(nil, raw)
@@ -115,7 +116,7 @@ func (fb *FlowBuilder) Build(ID string) flow.Operation {
 		param := make([]flow.Data, len(entry.Inputs))
 		for i := range param {
 			l := doc.fetchLinkTo(node.ID, i)
-			if l == nil {
+			if l == nil { // No link we fetch the value inserted
 				// Const value
 				v, err := parseValue(entry.Inputs[i], node.DefaultInputs[i])
 				if err != nil {
@@ -125,8 +126,10 @@ func (fb *FlowBuilder) Build(ID string) flow.Operation {
 				param[i] = v
 				continue
 			}
+
 			param[i] = fb.Build(l.From)
 		}
+
 		op, err = f.DefOp(node.ID, node.Src, param...)
 		if err != nil {
 			op, _ := f.DefErrOp(node.ID, err)

+ 16 - 4
go/src/flowserver/session.go

@@ -102,7 +102,7 @@ func (s *FlowSession) ClientAdd(c *websocket.Conn) error {
 	}
 
 	// Sending activity
-	return c.WriteJSON(flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
+	return c.WriteJSON(s.activity())
 
 	// Send registry
 }
@@ -183,10 +183,9 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 
 	// Clear activity
 	s.nodeActivity = map[string]*NodeActivity{}
-	s.Broadcast(nil, flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
+	s.Broadcast(nil, s.activity()) // Ampty activity
 
 	build := func() error {
-		log.Printf("Building flow from '%s'\n", string(s.RawDoc))
 
 		localR := s.manager.registry.Clone()
 		//Add our log func that is not in global registry
@@ -249,7 +248,8 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 					return
 				}
 				act.Status = status
-				s.broadcast(nil, flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
+				s.broadcast(nil, s.activity())
+
 			},
 		})
 
@@ -274,6 +274,18 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 	return nil
 }
 
+func (s *FlowSession) activity() *flowmsg.SendMessage {
+
+	msg := flowmsg.SendMessage{OP: "nodeActivity",
+		Data: map[string]interface{}{
+			"serverTime": time.Now(),
+			"nodes":      s.nodeActivity,
+		},
+	}
+
+	return &msg
+}
+
 // Notify broadcast a notification to clients
 func (s *FlowSession) Notify(v interface{}) error {
 	s.Lock()