浏览代码

Implemented collaboration and server serialization

luis 7 年之前
父节点
当前提交
be46042d0d

+ 24 - 7
Makefile

@@ -1,20 +1,37 @@
 
-.PHONY: all backend frontend clean
-
-all:frontend backend
-	mkdir -p DIST
+all: DIST/flowserver DIST/web
 
 clean:
 	rm -rf DIST
 
-backend:
+DIST:
+	mkdir -p DIST
+
+DIST/flowserver: DIST
 	cd go;make
-	cp go/DIST/* DIST
+	cp go/DIST/* DIST/
 
-frontend:
+DIST/web: DIST
 	mkdir -p DIST/web
 	cd browser/vue-flow;yarn build
 	cp -r browser/vue-flow/dist/* DIST/web
 
+DIST/.dockerized: DIST/flowserver DIST/web
+	docker build --rm -t hexasoftware.com:5000/flow-proto -f ./docker/Dockerfile .
+	touch DIST/.dockerized
+
+docker: DIST/.dockerized
+
+
+push: DIST/.dockerized
+	docker push hexasoftware.com:5000/flow-proto
+
+
+
+frontend: DIST/web
+
+backend: DIST/backend
+
 
+.PHONY: all clean frontend backend
 

+ 0 - 6
browser/vue-flow/docker/Dockerfile

@@ -1,6 +0,0 @@
-FROM joshix/caddy
-COPY dist/ /var/www/html
-COPY docker/Caddyfile /var/www/html
-WORKDIR /var/www/html
-EXPOSE 2015
-CMD ["/bin/caddy"]

+ 1 - 1
browser/vue-flow/package.json

@@ -8,7 +8,7 @@
   "scripts": {
     "docker": "docker build --rm -t hexasoftware.com:5000/flow-proto -f ./docker/Dockerfile .",
     "docker-pull": "docker push hexasoftware.com:5000/flow-proto",
-    "dev": "cross-env NODE_ENV=development webpack-dev-server --open --hot",
+    "dev": "cross-env NODE_ENV=development webpack-dev-server --hot",
     "build": "cross-env NODE_ENV=production webpack --progress --hide-modules"
   },
   "dependencies": {

+ 0 - 1
browser/vue-flow/src/App.vue

@@ -22,7 +22,6 @@ html,body {
   max-height:100%;
 }
 #app {
-  background: #f4f4f4;
   position:relative;
   padding:0;
   display:flex;

+ 36 - 14
browser/vue-flow/src/components/flow/manager.vue

@@ -3,9 +3,9 @@
     <div>
       fn:
       <button
-        :key="i"
-        v-for="(r,k,i) of nodeData.registry"
-        @click="nodeAdd(r)">
+        :key="k"
+        v-for="(r,k) of registry"
+        @click="nodeAdd(k)">
         {{ k }}
       </button>
     </div>
@@ -16,12 +16,12 @@
       :height="height">
       <flow-pan-zoom
         ref="panzoom"
-        v-model="nodeData.panzoom">
+        v-model="panzoom">
         <flow-link
           v-for="(l,i) in nodeData.links"
           :key="i"
           v-bind="linkProps(l)"
-          @click="removeLink(i)"
+          @click="linkRemove(i)"
         />
         <!-- mouse link-->
         <flow-link
@@ -42,7 +42,7 @@
   </div>
 </template>
 <script>
-import nodeData from '../nodedata'
+// import nodeData from '../nodedata'
 import FlowNode from './node'
 import FlowLink from './link'
 import FlowPanZoom from './panzoom'
@@ -51,12 +51,16 @@ export default {
   name: 'FlowManager',
   components: {FlowNode, FlowLink, FlowPanZoom},
   props: {
+    'value': {type: Object, default: () => {}},
+    'registry': {type: Object, default: () => {}},
     'width': {type: String, default: '800px'},
     'height': {type: String, default: '600px'}
   },
   data () {
+    const cloned = JSON.parse(JSON.stringify(this.value)) // initial?
     return {
-      nodeData: nodeData,
+      panzoom: { x: 0, y: 0, zoom: 1 },
+      nodeData: cloned,
       pointerLink: {active: false, props: {}, src: {}}
     }
   },
@@ -71,7 +75,7 @@ export default {
             match = {in: this.pointerLink.src.type}
           }
         }
-        const nodeClass = this.nodeData.registry[node.src]
+        const nodeClass = this.registry[node.src]
         return {
           transform: `translate(${node.x} ${node.y})`,
           id: node.id,
@@ -108,6 +112,17 @@ export default {
     }
 
   },
+  watch: {
+    value: {
+      handler (val) {
+        this.nodeData = JSON.parse(JSON.stringify(this.value)) // deepClone
+        this.$nextTick(() => {
+          this.$forceUpdate()
+        })
+      },
+      deep: true
+    }
+  },
   mounted () {
     this.$nextTick(() => {
       this.$forceUpdate()
@@ -141,9 +156,9 @@ export default {
 
       this.pointerLink.active = true
       if (isInput) {
-        this.pointerLink.src = {nodeId: nodeId, type: this.nodeData.registry[node.src].inputs[socket.in], in: socket.in}
+        this.pointerLink.src = {nodeId: nodeId, type: this.registry[node.src].inputs[socket.in], in: socket.in}
       } else {
-        this.pointerLink.src = {nodeId: nodeId, type: this.nodeData.registry[node.src].output, out: 0}
+        this.pointerLink.src = {nodeId: nodeId, type: this.registry[node.src].output, out: 0}
       }
 
       // What socket is this
@@ -194,8 +209,8 @@ export default {
         const nodeFrom = this.nodeData.nodes.find(n => n.id === link.from)
         const nodeTo = this.nodeData.nodes.find(n => n.id === link.to)
         // Type checking
-        if (this.nodeData.registry[ nodeFrom.src ].output !==
-          this.nodeData.registry[ nodeTo.src ].inputs[link.in]) {
+        if (this.registry[ nodeFrom.src ].output !==
+          this.registry[ nodeTo.src ].inputs[link.in]) {
           console.error('LINK: Invalid type')
           return
         }
@@ -209,6 +224,7 @@ export default {
         }
 
         this.nodeData.links.push(link)
+        this.$emit('input', this.nodeData)
       }
       document.addEventListener('mousemove', drag)
       document.addEventListener('mouseup', drop)
@@ -220,6 +236,7 @@ export default {
         // remove related links
         this.nodeData.links = this.nodeData.links.filter(l => l.from !== tnode.id && l.to !== tnode.id)
         this.nodeData.nodes.splice(i, 1)
+        this.$emit('input', this.nodeData)
         return
       }
       if (e.button !== 0) return // first button
@@ -236,16 +253,18 @@ export default {
         const point = this.transformedPoint(e.clientX, e.clientY)
         tnode.x = point.x - delta.x
         tnode.y = point.y - delta.y
+        // Bad possibly
+        this.$emit('input', this.nodeData)
       }
       const drop = (e) => {
         document.removeEventListener('mousemove', drag)
         document.removeEventListener('mouseup', drop)
+        this.$emit('input', this.nodeData)
       }
       document.addEventListener('mousemove', drag)
       document.addEventListener('mouseup', drop)
     },
     nodeAdd (k) {
-      console.log('Adding:', k)
       this.nodeData.nodes.push({
         id: guid(),
         x: 100,
@@ -253,10 +272,13 @@ export default {
         label: k,
         src: k
       })
+      this.$emit('input', this.nodeData)
     },
-    removeLink (i) {
+    linkRemove (i) {
       this.nodeData.links.splice(i, 1)
+      this.$emit('input', this.nodeData)
     },
+
     createSVGPoint (x, y) {
       const p = this.$refs.svg.createSVGPoint()
       p.x = x; p.y = y

+ 49 - 4
browser/vue-flow/src/components/flowmain.vue

@@ -16,7 +16,12 @@
         </ul>
 
       </div>
+
+      <div class="app-watermark">PROTOTYPE</div>
       <flow-manager
+        :registry="registry"
+        :value="nodeData"
+        @input="documentUpdate"
         width="100%"
         height="100%"/>
     </div>
@@ -26,22 +31,51 @@
 import FlowManager from '@/components/flow/manager'
 import 'reset-css/reset.css'
 import '@/assets/style.css'
+import nodeData from './nodedata'
 
 export default {
   components: {FlowManager},
+  data () {
+    return {
+      registry: nodeData.registry,
+      nodeData: {nodes: [], links: []}
+    }
+  },
 
   mounted () {
-    if (this.$route.params.sessID === undefined) {
-      console.log('Requesting new session')
-      this.$ws.send({op: 'newSession'})
+    // Make this in a service
+    this.$ws.recv((msg) => {
+      switch (msg.op) {
+        case 'sessionSwitch':
+          this.$router.push('/' + msg.data) // Swap to ID
+          break
+        case 'document':
+          // Transform here
+          if (!msg.data) break
+          this.nodeData = {
+            nodes: msg.data.nodes || this.nodeData.nodes,
+            links: msg.data.links || this.nodeData.links
+          }
+          break
+      }
+    })
+    if (this.$route.params.sessId === undefined) {
+      this.$ws.send({op: 'sessionNew'})
+      return
+    }
+    this.$ws.send({op: 'sessionLoad', data: this.$route.params.sessId})
+  },
+  methods: {
+    documentUpdate (nodeData) {
+      this.$ws.send({op: 'documentUpdate', data: nodeData})
     }
   }
-
 }
 
 </script>
 <style>
 .flow-main {
+  background: #f4f4f4;
   height:100%;
   display:flex;
   flex-direction: column;
@@ -75,4 +109,15 @@ export default {
   bottom:10%;
 }
 
+.flow-main .app-watermark {
+  position:absolute;
+  top:40%;
+  text-align:center;
+  width:100%;
+  font-size:100px;
+  color: #f4f4f4;
+  text-shadow: 1px 1px 1px rgba(255,255,255,0.5), -1px -1px 1px rgba(0,0,0,0.05);
+
+}
+
 </style>

+ 1 - 0
browser/vue-flow/src/main.js

@@ -1,6 +1,7 @@
 import Vue from 'vue'
 import App from './App.vue'
 import WsConn from './services/wsconn'
+
 import router from './router'
 
 Vue.use(WsConn, {location: 'ws://' + window.location.host + '/conn'})

+ 1 - 0
browser/vue-flow/src/router/index.js

@@ -7,6 +7,7 @@ Vue.use(Router)
 export default new Router({
   mode: 'history',
   routes: [
+    { path: '/', component: FlowMain },
     { path: '/:sessId', component: FlowMain }
   ]
 

+ 8 - 6
browser/vue-flow/src/services/wsconn.js

@@ -15,29 +15,31 @@ export default {
             return
           }
           this.$emit('send', msg)
+        },
+        recv (cb) {
+          this.$on('message', obj => cb(obj))
         }
       }
     })
 
     const connect = (loc) => {
       ws = new window.WebSocket(loc)
-      ws.onopen = () => { this.connected = true; console.log('Connected'); eventBus.$emit('open') }
-      ws.onmessage = (e) => {}
-      ws.onerror = (e) => { this.connected = false; console.log('Error:', e) }
-      ws.onclose = () => {
+      ws.onopen = () => { connected = true; eventBus.$emit('open') }
+      ws.onerror = (e) => { connected = false }
+      ws.onclose = (e) => {
+        //        console.log('Disconnected', e)
         if (connected === true) { } // emit close
         connected = false
         setTimeout(() => connect(loc), 3000) // Reconnect
       }
       ws.onmessage = (e) => { // receiving message
-        console.log('Message received')
+        // console.log('Message received', e)
         eventBus.$emit('message', JSON.parse(e.data))
       }
     }
     connect(options.location)
 
     eventBus.$on('send', (msg) => {
-      console.log('Sending message', msg)
       ws.send(JSON.stringify(msg))
     })
 

+ 5 - 0
docker/Dockerfile

@@ -0,0 +1,5 @@
+FROM scratch
+COPY DIST/ /app
+WORKDIR /app
+EXPOSE 2015
+CMD ["/app/flowserver"]

+ 2 - 2
go/Makefile

@@ -3,7 +3,7 @@
 
 GOPATH=$(CURDIR)/deps:$(CURDIR)
 DIST=./DIST
-ENV=
+ENV=CGO_ENABLED=0
 
 # Source in packages names
 # What packages to build
@@ -23,7 +23,7 @@ endif
 all: $(BIN)
 	@$(ENV) echo -e "\e[32;01mBuilt for OS: `go env GOOS`, ARCH: `go env GOARCH`\e[0m"
 	
-$(BIN):  deps
+$(BIN):
 	$(ENV) GOPATH="$(GOPATH)" go build -o $@ $(CLI)
 
 # generate

+ 23 - 13
go/src/flowserver/flowserver.go

@@ -15,29 +15,40 @@ import (
 
 //go:generate go get dev.hexasoftware.com/hxs/genversion
 //go:generate genversion -package flowserver -out version.go
+//
+
+var (
+	debug = false
+)
 
 // FlowServer structure
-type FlowServer struct{}
+type FlowServer struct {
+}
 
-//ListenAndServe starts the httpserver
+// ListenAndServe starts the httpserver
+// It will listen on default port 2015 and increase if port is in use
 func (f *FlowServer) ListenAndServe() error {
+	fsm := NewFlowSessionManager()
 
-	fsm := FlowSessionManager{}
-
-	// HttpPart
 	c := chain.New(webu.ChainLogger(prettylog.New("req")))
+
 	mux := http.NewServeMux()
 	mux.Handle("/conn", c.Build(fsm.ServeHTTP))
 
-	proxyURL, err := url.Parse("http://localhost:8081")
-	if err != nil {
-		return err
+	if debug {
+		proxyURL, err := url.Parse("http://localhost:8081")
+		if err != nil {
+			return err
+		}
+		mux.Handle("/", c.Build(httputil.NewSingleHostReverseProxy(proxyURL).ServeHTTP))
+	} else {
+		mux.Handle("/", c.Build(webu.StaticHandler("web", "index.html")))
 	}
 
-	//mux.Handle("/", c.Build(webu.StaticHandler("web", "index.html")))
-	mux.Handle("/", c.Build(httputil.NewSingleHostReverseProxy(proxyURL).ServeHTTP))
-
-	port := 4000
+	////////////////////
+	// Server starter
+	/////
+	port := 2015
 	for {
 		addr := fmt.Sprintf(":%d", port)
 		s, err := net.Listen("tcp", addr)
@@ -47,7 +58,6 @@ func (f *FlowServer) ListenAndServe() error {
 			continue
 		}
 		log.Println("Listening at:", addr)
-
 		err = http.Serve(s, mux)
 		if err != nil {
 			log.Fatal(err)

+ 0 - 1
go/src/flowserver/handlers.go

@@ -1 +0,0 @@
-package flowserver

+ 14 - 0
go/src/flowserver/msg/flowmessage.go

@@ -0,0 +1,14 @@
+package msg
+
+import "encoding/json"
+
+// FlowMessage Main message structure
+type RecvMessage struct {
+	OP   string          `json:"op"`
+	Data json.RawMessage `json:"data"`
+}
+
+type SendMessage struct {
+	OP   string      `json:"op"`
+	Data interface{} `json:"data"`
+}

+ 73 - 1
go/src/flowserver/session.go

@@ -1,13 +1,85 @@
 package flowserver
 
 import (
+	"encoding/json"
+	"flowserver/msg"
+	"sync"
+
 	"github.com/gorilla/websocket"
 )
 
+// This might be better named as a room/project than a session
+
 // FlowSession Create a session and link clients
-//
 type FlowSession struct {
+	sync.Mutex
 	ID string // Random handle for sessionID
 	// List of clients on this session
 	clients []*websocket.Conn
+
+	RawData []byte // Just share data
+}
+
+//NewSession creates and initializes a NewSession
+func NewSession(ID string) *FlowSession {
+	s := &FlowSession{sync.Mutex{}, ID, []*websocket.Conn{}, []byte{}}
+	return s
+}
+
+// ClientAdd add a client to session
+func (f *FlowSession) ClientAdd(c *websocket.Conn) error {
+	f.Lock()
+	defer f.Unlock()
+
+	f.clients = append(f.clients, c)
+
+	dataMap := map[string]interface{}{}
+	json.Unmarshal(f.RawData, &dataMap)
+	return c.WriteJSON(msg.SendMessage{OP: "document", Data: dataMap})
+
+}
+
+// ClientRemove remove client from Session
+func (f *FlowSession) ClientRemove(c *websocket.Conn) {
+	f.Lock()
+	defer f.Unlock()
+
+	for i, cl := range f.clients {
+		if cl == c {
+			f.clients = append(f.clients[:i], f.clients[i+1:]...)
+			break
+		}
+	}
+}
+
+// DocumentUpdate client c Updates the session document
+func (f *FlowSession) DocumentUpdate(c *websocket.Conn, data []byte) error {
+	f.Lock()
+	defer f.Unlock()
+
+	f.RawData = data // Update
+	dataMap := map[string]interface{}{}
+	json.Unmarshal(f.RawData, &dataMap)
+
+	// Send to all except ours
+	for _, sc := range f.clients {
+		if sc == c { // ours
+			continue
+		}
+		err := sc.WriteJSON(msg.SendMessage{OP: "document", Data: dataMap})
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// Document send document to client c
+func (f *FlowSession) Document(c *websocket.Conn) error {
+	f.Lock()
+	defer f.Unlock()
+
+	dataMap := map[string]interface{}{}
+	json.Unmarshal(f.RawData, &dataMap)
+	return c.WriteJSON(msg.SendMessage{OP: "document", Data: dataMap})
 }

+ 0 - 108
go/src/flowserver/sessionmanager.go

@@ -1,108 +0,0 @@
-package flowserver
-
-import (
-	"encoding/json"
-	"log"
-	"net/http"
-	"sync"
-
-	"github.com/gorilla/websocket"
-)
-
-//FlowSessionManager or FlowServerCore
-type FlowSessionManager struct {
-	// List of flow sessions?
-	sessions map[string]*FlowSession
-
-	sync.Mutex
-}
-
-//New creates a New initialized FlowSessionManager
-func New() *FlowSessionManager {
-	return &FlowSessionManager{
-		sessions: map[string]*FlowSession{},
-	}
-}
-func (fsm *FlowSessionManager) CreateSession() *FlowSession {
-	fsm.Lock()
-	defer fsm.Unlock()
-	for {
-		ID := RandString(10)
-		sess, ok := fsm.sessions[ID]
-		if !ok {
-			fsm.sessions[ID] = sess
-			return sess
-		}
-	}
-
-}
-
-//LoadSession loads or creates a new Session
-func (fsm *FlowSessionManager) LoadSession(ID string) *FlowSession {
-	fsm.Lock()
-	defer fsm.Unlock()
-
-	log.Println("Fetching session:", ID)
-	sess, ok := fsm.sessions[ID]
-	if !ok {
-		sess = &FlowSession{ID: ID}
-		fsm.sessions[ID] = sess
-	}
-	return sess
-
-}
-
-// FlowMessage Main message structure
-type FlowMessage struct {
-	OP   string          `json:"op"`
-	Data json.RawMessage `json:"data"`
-}
-
-var upgrader = websocket.Upgrader{}
-
-func (fsm *FlowSessionManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	log.Println("Receiving ws connection")
-	c, err := upgrader.Upgrade(w, r, nil)
-	if err != nil {
-		log.Println("upgrade:", err)
-	}
-	defer c.Close()
-
-	/////////////////
-	// Message handling
-	// ////////
-
-	for {
-		mt, data, err := c.ReadMessage()
-		if err != nil {
-			log.Println("Err:", err)
-			break
-		}
-		if mt != websocket.TextMessage {
-			log.Println("Not a text message?")
-			break
-		}
-
-		fmsg := FlowMessage{}
-		err = json.Unmarshal(data, &fmsg)
-		if err != nil {
-			log.Println("Err parsing message:", err)
-			c.WriteJSON("bye")
-		}
-		switch fmsg.OP {
-		case "newSession":
-			log.Println("We want a new session so")
-			sess := fsm.LoadSession("")
-			sess.clients = append(sess.clients, c)
-		}
-		// Create a Session, assign us to session
-
-		// Switch kind of mesasge etc here
-		// Acquire session ID, and send all the info
-		// Write msg
-		// Create or Load a session
-
-	}
-	log.Println("ws Is disconnecting")
-
-}

+ 149 - 0
go/src/flowserver/sessionmgr.go

@@ -0,0 +1,149 @@
+package flowserver
+
+import (
+	"encoding/json"
+	"errors"
+	"flowserver/msg"
+	"log"
+	"net/http"
+	"sync"
+
+	"github.com/gorilla/websocket"
+)
+
+//FlowSessionManager or FlowServerCore
+type FlowSessionManager struct {
+	// List of flow sessions?
+	sessions map[string]*FlowSession
+
+	sync.Mutex
+}
+
+//NewFlowSessionManager creates a New initialized FlowSessionManager
+func NewFlowSessionManager() *FlowSessionManager {
+	return &FlowSessionManager{
+		sessions: map[string]*FlowSession{},
+	}
+}
+
+//CreateSession creates a new session
+func (fsm *FlowSessionManager) CreateSession() *FlowSession {
+	fsm.Lock()
+	defer fsm.Unlock()
+	for {
+		ID := RandString(10)
+		sess, ok := fsm.sessions[ID]
+		if !ok {
+			sess = NewSession(ID)
+			fsm.sessions[ID] = sess
+			return sess
+		}
+	}
+}
+
+//LoadSession loads or creates a new Session
+func (fsm *FlowSessionManager) LoadSession(ID string) (*FlowSession, error) {
+	fsm.Lock()
+	defer fsm.Unlock()
+	if ID == "" {
+		return nil, errors.New("Cannot be null")
+	}
+	sess, ok := fsm.sessions[ID]
+	if !ok {
+		sess = NewSession(ID)
+		fsm.sessions[ID] = sess
+	}
+	return sess, nil
+
+}
+
+var upgrader = websocket.Upgrader{}
+
+func (fsm *FlowSessionManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+
+	c, err := upgrader.Upgrade(w, r, nil)
+	if err != nil {
+		log.Println("upgrade:", err)
+		return
+	}
+	defer c.Close()
+
+	// Room
+	var sess *FlowSession
+	defer func() {
+		if sess == nil {
+			return
+		}
+		// Remove client on exit
+		sess.ClientRemove(c)
+	}()
+
+	/////////////////
+	// Message handling
+	// ////////
+
+	// Websocket IO loop
+	for {
+		mt, data, err := c.ReadMessage()
+		if err != nil {
+			log.Println("Err:", err)
+			break
+		}
+		if mt != websocket.TextMessage {
+			log.Println("Not a text message?")
+			break
+		}
+
+		fmsg := msg.RecvMessage{}
+		err = json.Unmarshal(data, &fmsg)
+		if err != nil {
+			log.Println("Err parsing message:", err)
+			c.WriteJSON("bye")
+			break
+		}
+		switch fmsg.OP {
+		/////////////////////////////
+		// NEWSESSION request
+		//////////////////
+		case "sessionNew":
+			err = func() error {
+				log.Println("We want a new session so")
+				sess = fsm.CreateSession()
+				sess.ClientAdd(c)
+				return c.WriteJSON(msg.SendMessage{OP: "sessionSwitch", Data: sess.ID})
+			}()
+			//////////////////////////////////
+			// LOADSESSION request
+			////////////////////////////////////
+		case "sessionLoad":
+			err = func() error {
+				sessID := string(fmsg.Data)
+				if sess != nil {
+					sess.ClientRemove(c)
+				}
+				sess, err = fsm.LoadSession(sessID) // Set our session
+				if err != nil {
+					log.Println("Err:", err)
+					return err
+				}
+				return sess.ClientAdd(c)
+			}()
+		///////////////////////
+		// DOCUMENTUPDATE Receive a document
+		//////
+		case "documentUpdate":
+			err = func() error {
+				if sess == nil {
+					return errors.New("There is no session")
+				}
+				return sess.DocumentUpdate(c, fmsg.Data)
+			}()
+		}
+
+		if err != nil {
+			log.Println("Err Writing", err)
+			break
+		}
+	}
+	log.Println("ws Is disconnecting")
+}