luis 7 years ago
parent
commit
4444ffa2d1

+ 2 - 3
browser/vue-flow/README.md

@@ -4,8 +4,7 @@
 
 ### UI
 
-* Mark selection with better colors to make more noticeable
-
 ### Code & Services
 
-* Possible implementation of centralized state (vuex)
+* IMPLEMENT centralized state - there is an issue where inspector changes the node
+  and reactivity happens fine. but document does not update on server

+ 5 - 3
browser/vue-flow/src/components/flow/manager.vue

@@ -75,7 +75,7 @@
       <button @click="stickySockets=!stickySockets"> {{ stickySockets? 'Hide':'Show' }} sockets </button>
       <button @click="stickyTriggers=!stickyTriggers"> {{ stickyTriggers? 'Hide':'Show' }} triggers </button>
       <button @click="nodeActivity=!nodeActivity"> {{ nodeActivity? 'Hide':'Show' }} activity </button>
-      <button @click="$emit('documentSave')"> Save </button> <!-- should disable until confirmation -->
+      <button @click="$emit('documentSave', nodeData)"> Save </button> <!-- should disable until confirmation -->
       <button v-if="panzoom.x!=0 || panzoom.y!=0 || panzoom.zoom!=1" @click="panzoomReset">
         Reset view
       </button>
@@ -121,6 +121,8 @@ export default {
     // const cloned = JSON.parse(JSON.stringify(this.value)) // initial?
     return {
       panzoom: { x: 0, y: 0, zoom: 1 },
+
+      // Shared state
       nodeData: { nodes: [], links: [], triggers: [] },
 
       dragging: null,
@@ -158,7 +160,6 @@ export default {
           this.nodeRemove(node)
           return
         }
-        console.log('Updating nodeProp based on activity')
         return {
           transform: `translate(${node.x} ${node.y})`,
           id: node.id,
@@ -593,6 +594,7 @@ export default {
       this.sendDocumentUpdate()
     },
     nodeProcess (node) {
+      this.sendDocumentUpdate()
       console.log('Node process demand')
       this.$emit('nodeProcess', node)
     },
@@ -671,7 +673,7 @@ export default {
     sendFlowEvent (type, param) {
       this.$flowService[type](param)
     },
-    sendDocumentUpdate (nodeData) {
+    sendDocumentUpdate () {
       this.$flowService.documentUpdate(this.nodeData, this.$route.params.sessId)
     },
     /* sendDocumentRun () {

+ 19 - 6
browser/vue-flow/src/components/flow/node-activity.vue

@@ -35,7 +35,9 @@ import utils from '@/utils/utils'
 export default {
   name: 'FlowNodeStatus',
   components: {IconWait, IconFail, IconOk, IconQuestion, IconRefresh},
-  props: { activity: {type: Object, default: () => {}} },
+  props: {
+    activity: {type: Object, default: () => {}}
+  },
   data () {
     return {
       finishTime: null
@@ -62,11 +64,22 @@ export default {
       return utils.padStart(min.toFixed(0), 2, '0') + ':' + utils.padStart(sec.toFixed(0), 2, '0')
     }
   },
-  watcher: {
-    activity () {
-      clearTimeout(this._timeOut)
-      this.finishTime = null
-      this.updateTime()
+  watch: {
+    activity (val, oldVal) {
+      const finish = new Date(Date.parse(val.endTime))
+      if (dateIsValid(finish)) {
+        this.finishTime = null
+      }
+
+      /* const startTime = new Date(Date.parse(val.startTime))
+      const endTime = new Date(Date.parse(val.endtime))
+      if (!dateIsValid(startTime) || !dateIsValid(endTime)) {
+        console.log('Date is valid', dateIsValid(startTime), 'end:', dateIsValid(endTime))
+        // reset timer
+        this.finishTime = null
+        clearTimeout(this._timeOut)
+        this._timeOut = setTimeout(this.updateTime, 999)
+      } */
     }
   },
   mounted () {

+ 3 - 2
browser/vue-flow/src/components/main.vue

@@ -237,9 +237,10 @@ export default {
     funcsSizeUpdate (ev, size) {
       this.funcsSize = size
     },
-    documentSave () {
-      this.$flowService.documentSave()
+    documentSave (nodeData) {
+      this.$flowService.documentSave(nodeData)
     }
+
     // Update individual nodes/links
   }
 }

+ 6 - 1
browser/vue-flow/src/components/panel-inspector.vue

@@ -42,7 +42,12 @@
             v-for="(n,i) in registry[nodeInspect.src].inputs"
             :key="i">
             <label>{{ i }}:{{ n.type }}</label>
-            <input ref="inputs" type="text" v-model="nodeInspect.defaultInputs[i]" @input="localChange">
+            <input
+              ref="inputs"
+              type="text"
+              v-model="nodeInspect.defaultInputs[i]"
+              @input="localChange"
+            >
           </div>
         </div>
 

+ 1 - 1
go/src/flowserver/flowbuilder.go

@@ -151,7 +151,7 @@ func FlowBuild(rawData []byte, r *registry.R) (*flow.Flow, error) {
 				log.Println("Grabbing an executor:", t.To)
 				op := f.Res(t.To)
 				log.Println("Processing this operator")
-				go op.Process() // Background
+				go op.Process(name) // Background
 				log.Println("Returning from operator")
 			},
 		})

+ 20 - 3
go/src/flowserver/session.go

@@ -18,6 +18,7 @@ import (
 
 // NodeActivity when nodes are processing
 type NodeActivity struct {
+	ID        string    `json:"id"`
 	Status    string    `json:"status"` // nodeStatus, Running, error, result
 	StartTime time.Time `json:"startTime"`
 	EndTime   time.Time `json:"endTime"`
@@ -139,10 +140,16 @@ func (s *FlowSession) DocumentUpdate(c *websocket.Conn, data []byte) error {
 }
 
 // DocumentSave persist document in a file
-func (s *FlowSession) DocumentSave() error {
+func (s *FlowSession) DocumentSave(data []byte) error {
+	log.Println("Receiving documentSave")
 	s.Lock()
 	defer s.Unlock()
 
+	log.Println("Saving..")
+
+	s.RawDoc = make([]byte, len(data))
+	copy(s.RawDoc, data)
+
 	fpath, err := s.manager.pathFor(s.ID)
 	if err != nil {
 		log.Println("path error", err)
@@ -155,6 +162,7 @@ func (s *FlowSession) DocumentSave() error {
 		return err
 	}
 
+	s.notify("Session saved")
 	return s.broadcast(nil, flowmsg.SendMessage{OP: "documentSave", Data: "saved"})
 }
 
@@ -209,15 +217,18 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 
 				act, ok := s.nodeActivity[ID]
 				if !ok {
-					act = &NodeActivity{}
+					act = &NodeActivity{ID: ID}
 					s.nodeActivity[ID] = act
 				}
 				status := ""
 				switch name {
 				case "Wait":
 					status = "waiting"
+					act.StartTime = time.Time{}
+					act.EndTime = time.Time{}
 				case "Start":
 					status = "running"
+					act.EndTime = time.Time{}
 					act.StartTime = triggerTime
 				case "Finish":
 					status = "finish"
@@ -248,7 +259,13 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 
 // Notify broadcast a notification to clients
 func (s *FlowSession) Notify(v interface{}) error {
-	return s.Broadcast(nil, flowmsg.SendMessage{OP: "sessionNotify", Data: v})
+	s.Lock()
+	defer s.Unlock()
+	return s.notify(v)
+}
+
+func (s *FlowSession) notify(v interface{}) error {
+	return s.broadcast(nil, flowmsg.SendMessage{OP: "sessionNotify", Data: v})
 }
 
 // Write io.Writer implementation to send event logging

+ 1 - 1
go/src/flowserver/sessionmgr.go

@@ -150,7 +150,7 @@ func (fsm *FlowSessionManager) ServeHTTP(w http.ResponseWriter, r *http.Request)
 				if sess == nil {
 					return errors.New("documentSave: invalid session")
 				}
-				return sess.DocumentSave()
+				return sess.DocumentSave(m.Data)
 
 			//////////////////
 			// NODE operations