Selaa lähdekoodia

Concurrency fixes

luis 7 vuotta sitten
vanhempi
commit
fd7a8c2dbd

+ 7 - 3
browser/vue-flow/package.json

@@ -6,8 +6,7 @@
   "license": "MIT",
   "private": true,
   "scripts": {
-    "docker":
-      "docker build --rm -t hexasoftware.com:5000/flow-proto -f ./docker/Dockerfile .",
+    "docker": "docker build --rm -t hexasoftware.com:5000/flow-proto -f ./docker/Dockerfile .",
     "docker-pull": "docker push hexasoftware.com:5000/flow-proto",
     "dev": "cross-env NODE_ENV=development webpack-dev-server --hot",
     "build": "cross-env NODE_ENV=production webpack --hide-modules"
@@ -15,7 +14,11 @@
   "dependencies": {
     "vue": "^2.5.11"
   },
-  "browserslist": ["> 1%", "last 2 versions", "not ie <= 8"],
+  "browserslist": [
+    "> 1%",
+    "last 2 versions",
+    "not ie <= 8"
+  ],
   "devDependencies": {
     "babel-core": "^6.26.0",
     "babel-eslint": "^8.2.1",
@@ -42,6 +45,7 @@
     "sass-loader": "^6.0.6",
     "vue-loader": "^13.0.5",
     "vue-router": "^3.0.1",
+    "vue-svg-loader": "^0.4.0",
     "vue-template-compiler": "^2.4.4",
     "webpack": "^3.6.0",
     "webpack-dev-server": "^2.9.1"

+ 20 - 9
browser/vue-flow/src/components/chat.vue

@@ -65,7 +65,7 @@ export default {
         this.$refs.input.focus()
       }
     },
-    events () {
+    events (val) {
       const height = this.$refs.messages.clientHeight
       if (this.$refs.messages.scrollTop + height >= this.$refs.messages.scrollHeight) {
         this.$nextTick(() => {
@@ -73,7 +73,9 @@ export default {
         })
       }
       if (this.active === false) {
-        this.active = true
+        if (val.some(e => e.type === 'msg')) {
+          this.active = true
+        }
       }
     }
   },
@@ -131,12 +133,14 @@ function pad (n, width, z) {
   height:100%;
   box-sizing:border-box;
   position:relative;
-  width:0px;
-  transition: all .3s;
+  width:0;
+  transition: all 0.3s;
 }
+
 .flow-chat.active {
   width:530px;
 }
+
 .flow-chat__toggle {
   user-select:none;
   cursor: pointer;
@@ -149,14 +153,14 @@ function pad (n, width, z) {
   left:-30px;
   top:calc(50% - 25px);
 
- }
+}
 
 .flow-chat__area {
   height:100%;
   overflow:hidden;
   display:flex;
   flex-flow:row;
-  padding:8px 0px;
+  padding:8px 0;
 }
 
 .flow-chat__container {
@@ -164,8 +168,9 @@ function pad (n, width, z) {
   display:flex;
   flex-flow:column;
   flex:1;
-  padding:0px 8px;
+  padding:0 8px;
 }
+
 .flow-chat__container >*:not(last-child) {
   margin-bottom:10px;
 }
@@ -175,6 +180,7 @@ function pad (n, width, z) {
   padding:20px 8px;
 
 }
+
 .flow-chat__user {
   display:flex;
   flex-flow:row;
@@ -182,10 +188,12 @@ function pad (n, width, z) {
   align-items: center;
 
 }
+
 .flow-chat__user-icon{
   height:10px;
   width:auto;
 }
+
 .flow-chat__user span {
   text-align:center;
   flex:1;
@@ -201,29 +209,32 @@ function pad (n, width, z) {
   min-width:300px;
   flex:1;
 }
+
 .flow-chat__messages .message{
   padding:2px 2px 12px 2px;
 
 }
+
 .flow-chat__messages .handle {
   display:flex;
   flex-flow:row;
-
   justify-content: space-between;
   align-items: center;
   padding-top:2px;
 
 }
+
 .flow-chat__messages .handle .name {
   padding-bottom:4px;
 }
+
 .flow-chat__messages .handle .time{
   font-weight:normal;
   font-size:8px;
 }
 
 .flow-chat__messages .text {
-  padding-top:0px;
+  padding-top:0;
   padding-left:9px;
 }
 

+ 1 - 0
browser/vue-flow/src/components/flow/manager.vue

@@ -58,6 +58,7 @@
       <button @click="$emit('funcsPanelToggle')">Panel</button>
       <button @click="stickySockets=!stickySockets"> {{ stickySockets? 'Hide':'Show' }} sockets </button>
       <button @click="detailed=!detailed"> {{ detailed? 'Hide':'Show' }} detail </button>
+      <button @click="$emit('documentSave')"> Save </button> <!-- should disable until confirmation -->
       <div class="vertical_sep"/>
       <button v-if="panzoom.x!=0 || panzoom.y!=0 || panzoom.zoom!=1" @click="panzoomReset">
         Reset view

+ 4 - 0
browser/vue-flow/src/components/main.vue

@@ -60,6 +60,7 @@
             :registry="registry"
             @funcsPanelToggle="funcsActive=!funcsActive"
             @nodeInspect="nodeInspectStart(...arguments)"
+            @documentSave="documentSave"
 
             width="100%"
             height="100%"/>
@@ -252,6 +253,9 @@ export default {
     },
     funcsSizeUpdate (ev, size) {
       this.funcsSize = size
+    },
+    documentSave () {
+      this.$flowService.documentSave()
     }
     // Update individual nodes/links
   }

+ 1 - 1
browser/vue-flow/src/services/flowservice.js

@@ -38,7 +38,7 @@ export default {
     //
     [
       'sessionNew', 'sessionLoad', // SESSION
-      'documentUpdate', 'documentRun',
+      'documentUpdate', 'documentRun', 'documentSave', // DOCUMENT
       'chatEvent', 'chatJoin', 'chatRename', // CHAT
       'linkAdd', 'linkUpdate', 'linkRemove', // LINK
       'nodeUpdate', 'nodeAdd', 'nodeRemove', 'nodeProcess' // NODE

+ 8 - 1
browser/vue-flow/yarn.lock

@@ -5127,7 +5127,7 @@ supports-color@^4.0.0, supports-color@^4.2.1, supports-color@^4.4.0:
   dependencies:
     has-flag "^2.0.0"
 
-svgo@^0.7.0:
+svgo@^0.7.0, svgo@^0.7.2:
   version "0.7.2"
   resolved "https://registry.yarnpkg.com/svgo/-/svgo-0.7.2.tgz#9f5772413952135c6fefbf40afe6a4faa88b4bb5"
   dependencies:
@@ -5452,6 +5452,13 @@ vue-style-loader@^3.0.0:
     hash-sum "^1.0.2"
     loader-utils "^1.0.2"
 
+vue-svg-loader@^0.4.0:
+  version "0.4.0"
+  resolved "https://registry.yarnpkg.com/vue-svg-loader/-/vue-svg-loader-0.4.0.tgz#56807e75ecc173a511a683785815a972c6853c7c"
+  dependencies:
+    loader-utils "^1.1.0"
+    svgo "^0.7.2"
+
 vue-template-compiler@^2.4.4:
   version "2.5.13"
   resolved "https://registry.yarnpkg.com/vue-template-compiler/-/vue-template-compiler-2.5.13.tgz#12a2aa0ecd6158ac5e5f14d294b0993f399c3d38"

+ 1 - 1
go/Makefile

@@ -3,7 +3,7 @@
 
 GOPATH=$(CURDIR)/deps:$(CURDIR)
 DIST=./DIST
-BUILDENV=CGO_ENABLED=0
+BUILDENV=
 GETENV=
 
 

+ 47 - 28
go/src/flow/flow.go

@@ -8,6 +8,7 @@ import (
 	"io"
 	"os"
 	"reflect"
+	"sync"
 )
 
 // Global
@@ -21,6 +22,7 @@ var (
 type Data = interface{}
 
 type opEntry struct {
+	sync.Mutex
 	name     string
 	inputs   []*operation // still figuring, might be operation
 	executor interface{}
@@ -30,18 +32,17 @@ type opEntry struct {
 // We could Create a single array of operations
 // refs would only mean id, types would be embed in operation
 type Flow struct {
-	registry *registry.Registry
-
+	registry   *registry.Registry
 	consts     map[string]Data
-
 	data       map[string]Data // Should be named, to fetch later
-	operations map[string]opEntry
-
+	operations sync.Map
+	//map[string]*opEntry
 	err   error
 	runID int
 
 	// Experimental run Event
-	handlers []func(name string, payLoad map[string]Data)
+	hooks Hooks
+	//func(name string, payLoad map[string]Data)
 }
 
 // New create a new flow
@@ -51,7 +52,8 @@ func New() *Flow {
 		// Data
 		consts:     map[string]Data{},
 		data:       map[string]Data{},
-		operations: map[string]opEntry{},
+		operations: sync.Map{},
+		//map[string]opEntry{},
 	}
 }
 
@@ -85,7 +87,7 @@ func (f *Flow) DefOp(id string, name string, params ...interface{}) Operation {
 		f.err = err
 		return opNil(f)
 	}
-	f.operations[id] = opEntry{name, inputs, executor}
+	f.operations.Store(id, &opEntry{sync.Mutex{}, name, inputs, executor})
 	return opFunc(f, id)
 }
 
@@ -122,10 +124,10 @@ func (f *Flow) Op(name string, params ...interface{}) Operation {
 	// generate ID
 	for {
 		id := RandString(8)
-		if _, ok := f.operations[id]; ok {
+		if _, ok := f.operations.Load(id); ok {
 			continue
 		}
-		f.operations[id] = opEntry{name, inputs, executor}
+		f.operations.Store(id, &opEntry{sync.Mutex{}, name, inputs, executor})
 		return opFunc(f, id)
 	}
 	// Initialize opfunc maybe
@@ -179,7 +181,10 @@ func (f *Flow) run(cache map[*operation]Data, op Operation, params ...Data) (Dat
 	var r Data
 	// This is wrong since the only source of func should be on operation
 	if o.kind == "func" {
-		op := f.operations[o.id.(string)]
+		op, ok := f.getOp(o.id.(string))
+		if !ok {
+			return nil, fmt.Errorf("Operation %s not found", o.id)
+		}
 		callParam := make([]reflect.Value, len(op.inputs))
 		for i, in := range op.inputs {
 			fr, _ := f.run(cache, in, params...) // ignore error
@@ -199,10 +204,11 @@ func (f *Flow) Analyse(w io.Writer, params ...Data) {
 		w = os.Stdout
 	}
 	fmt.Fprintf(w, "Ops analysis:\n")
-	for k, op := range f.operations {
+	f.operations.Range(func(pk, po interface{}) bool {
+		k, op := pk.(string), po.(*opEntry)
 		fw := bytes.NewBuffer(nil)
 		//fmt.Fprintf(w, "  [%s] (%v)", k, op.name)
-		fmt.Fprintf(fw, "  [%s] %s(", k, op.name)
+		fmt.Fprintf(fw, "  [%s] %s(", pk, op.name)
 		for j, in := range op.inputs {
 			//ref := in.(Op)
 			if j != 0 {
@@ -211,7 +217,7 @@ func (f *Flow) Analyse(w io.Writer, params ...Data) {
 			ires := in.Process(params...)
 			if f.err != nil {
 				fmt.Fprintf(w, "Operator: %s error#%s\n", op.name, f.err.Error())
-				return
+				return false
 			}
 			fmt.Fprintf(fw, " %s[%v](%v)", in.kind, in.id, ires)
 		}
@@ -223,7 +229,8 @@ func (f *Flow) Analyse(w io.Writer, params ...Data) {
 		fmt.Fprintf(fw, "%v\n", res)
 
 		fmt.Fprintf(w, "%s", fw.String())
-	}
+		return true
+	})
 }
 
 /////////////////////////////
@@ -244,7 +251,9 @@ func (f *Flow) String() string {
 	}
 
 	fmt.Fprintf(ret, "funcs:\n")
-	for k, v := range f.operations {
+	f.operations.Range(func(pk, pv interface{}) bool {
+		k, v := pk.(string), pv.(*opEntry)
+
 		fmt.Fprintf(ret, "  [%s] %s(", k, v.name)
 		for j, in := range v.inputs {
 			if j != 0 {
@@ -253,7 +262,8 @@ func (f *Flow) String() string {
 			fmt.Fprintf(ret, "%s[%v]", in.kind, in.id)
 		}
 		fmt.Fprintf(ret, ")\n")
-	}
+		return true
+	})
 
 	return ret.String()
 }
@@ -266,7 +276,8 @@ func (f *Flow) MarshalJSON() ([]byte, error) {
 		Input []map[string]interface{}
 	}
 	operations := map[string]opMarshal{}
-	for k, o := range f.operations {
+	f.operations.Range(func(pk, po interface{}) bool {
+		k, o := pk.(string), po.(*opEntry)
 		refs := []map[string]interface{}{}
 		for _, in := range o.inputs { // Switch type?
 			refs = append(refs, map[string]interface{}{
@@ -275,7 +286,9 @@ func (f *Flow) MarshalJSON() ([]byte, error) {
 			})
 		}
 		operations[k] = opMarshal{o.name, refs}
-	}
+		return true
+	})
+
 	data["operations"] = operations
 	data["data"] = f.data
 	data["consts"] = f.consts
@@ -283,17 +296,23 @@ func (f *Flow) MarshalJSON() ([]byte, error) {
 	return json.Marshal(data)
 }
 
+/////////////////
+// Async data
+/////
+func (f *Flow) getOp(id string) (*opEntry, bool) {
+
+	o, ok := f.operations.Load(id)
+	if !ok {
+		return nil, false
+	}
+	return o.(*opEntry), true
+}
+
 //////////////////////////////////////////////
 // Experimental event
 ////////////////
 
-// Handle attach a handler
-func (f *Flow) Handle(handler func(name string, payLoad map[string]Data)) {
-	f.handlers = append(f.handlers, handler)
-}
-
-func (f *Flow) trigger(name string, payLoad map[string]Data) {
-	for _, h := range f.handlers {
-		h(name, payLoad)
-	}
+// Hook attach the node event hooks
+func (f *Flow) Hook(hook Hook) {
+	f.hooks = append(f.hooks, hook)
 }

+ 5 - 2
go/src/flow/flow_test.go

@@ -132,8 +132,11 @@ func TestDefOp(t *testing.T) {
 
 func TestHandler(t *testing.T) {
 	f, op := prepareComplex()
-	f.Handle(func(name string, payLoad map[string]flow.Data) {
-		t.Log("Something happened:", name, payLoad)
+	f.Hook(flow.Hook{
+		Wait:   func(ID string) { t.Logf("[%s] Wait", ID) },
+		Start:  func(ID string) { t.Logf("[%s]Start", ID) },
+		Finish: func(ID string, res flow.Data) { t.Logf("[%s] Finish %v", ID, res) },
+		Error:  func(ID string, err error) { t.Logf("[%s] Error %v", ID, err) },
 	})
 	op.Process()
 }

+ 44 - 0
go/src/flow/hook.go

@@ -0,0 +1,44 @@
+package flow
+
+type Hooks []Hook
+
+// Hook funcs to handle certain events on the flow
+type Hook struct {
+	Wait   func(ID string)
+	Start  func(ID string)
+	Finish func(ID string, res interface{})
+	Error  func(ID string, err error)
+	Any    func(name string, ID string, extra ...interface{})
+}
+
+// Trigger a hook
+func (hooks Hooks) Trigger(name string, ID string, extra ...Data) {
+	for _, h := range hooks {
+		if h.Any != nil {
+			h.Any(name, ID, extra...)
+		}
+		switch name {
+		case "Wait":
+			if h.Wait != nil {
+				h.Wait(ID)
+			}
+		case "Start":
+			if h.Start != nil {
+				h.Start(ID)
+			}
+		case "Finish":
+			if h.Finish != nil {
+				h.Finish(ID, extra[0])
+			}
+		case "Error":
+			if h.Error != nil {
+				h.Error(ID, extra[0].(error))
+			}
+		}
+	}
+}
+
+func (hooks Hooks) Wait(ID string)             { hooks.Trigger("Wait", ID) }
+func (hooks Hooks) Start(ID string)            { hooks.Trigger("Start", ID) }
+func (hooks Hooks) Finish(ID string, res Data) { hooks.Trigger("Finish", ID, res) }
+func (hooks Hooks) Error(ID string, err error) { hooks.Trigger("Error", ID, err) }

+ 28 - 11
go/src/flow/operation.go

@@ -8,16 +8,17 @@ package flow
 import (
 	"errors"
 	"fmt"
+	"log"
 	"reflect"
 	"sync"
 )
 
 // OpCtx operation Context
-type OpCtx map[Operation]Data
+type OpCtx = *sync.Map
 
 // NewOpCtx creates a running context
 func newOpCtx() OpCtx {
-	return OpCtx{}
+	return &sync.Map{}
 }
 
 // dumbSet
@@ -34,6 +35,7 @@ type Operation interface { // Id perhaps?
 
 //local operation information
 type operation struct {
+	sync.Mutex
 	flow    *Flow
 	id      interface{} // Interface key
 	kind    string
@@ -48,22 +50,28 @@ func (o *operation) ID() string {
 
 // Process operation process wrapper
 func (o *operation) Process(params ...Data) Data {
+	log.Println("Execute internally with ctx")
 	return o.processWithCtx(newOpCtx(), params...)
 }
 
 // Every single one is run with this internally
 func (o *operation) processWithCtx(ctx OpCtx, params ...Data) Data {
 	if o.flow.err != nil {
+		log.Println("Flow has error")
 		return nil
 	}
 	if ctx == nil { // No cache/Context
+		log.Println("context is nil, run directly")
 		return o.process(ctx, params...)
 	}
-	if v, ok := ctx[o]; ok {
+	log.Println("Load var from context")
+	if v, ok := ctx.Load(o); ok {
 		return v
 	}
+	log.Println("Processing executor")
 	res := o.process(ctx, params...)
-	ctx[o] = res
+	log.Println("Storing value")
+	ctx.Store(o, res)
 
 	return res
 }
@@ -111,32 +119,41 @@ func opFunc(f *Flow, id string) *operation {
 		kind: "func",
 		set:  dumbSet,
 		process: func(ctx OpCtx, params ...Data) Data {
-			f.trigger("nodeWait", map[string]Data{"id": id})
-			op, ok := f.operations[id]
+			log.Println("Function executing:", id)
+			op, ok := f.getOp(id)
 			if !ok {
 				f.err = fmt.Errorf("invalid operation '%s'", id)
-				f.trigger("nodeError", map[string]Data{"id": id})
+				f.hooks.Error(id, f.err)
 				return nil
 			}
+			log.Println("Got operation")
 			/////////////////////////////
 			// NEW PARALLEL PROCESSING
 			///////////
+			log.Println("Sending wait")
+			f.hooks.Wait(id)
+			log.Println("Waited")
+
 			callParam := make([]reflect.Value, len(op.inputs))
 
+			log.Println("Create wait group for", len(op.inputs))
 			wg := sync.WaitGroup{}
 			wg.Add(len(op.inputs))
 			for i, in := range op.inputs {
 				go func(i int, in *operation) {
+					log.Println("Running parallel")
 					defer wg.Done()
 					fr := in.processWithCtx(ctx, params...)
 					callParam[i] = reflect.ValueOf(fr)
+					log.Println("Done")
 				}(i, in)
 			}
 			wg.Wait()
-
-			f.trigger("nodeStart", map[string]Data{"id": id})
-			ret := reflect.ValueOf(op.executor).Call(callParam)[0].Interface()
-			f.trigger("nodeFinish", map[string]Data{"id": id, "result": ret})
+			f.hooks.Start(id)
+			fnret := reflect.ValueOf(op.executor).Call(callParam)
+			// check fnret[1] for error
+			ret := fnret[0].Interface()
+			f.hooks.Finish(id, ret)
 			return ret
 		},
 	}

+ 105 - 24
go/src/flowserver/session.go

@@ -5,16 +5,23 @@ import (
 	"errors"
 	"flow"
 	"flowserver/flowmsg"
+	"io/ioutil"
 	"log"
+	"os"
+	"path/filepath"
 	"sync"
 
 	"github.com/gorilla/websocket"
 )
 
+const (
+	storePath = "store"
+)
+
 // NodeActivity when nodes are processing
 type NodeActivity struct {
 	Status string    `json:"status"` // nodeStatus, Running, error, result
-	Result flow.Data `json:"result"`
+	Data   flow.Data `json:"data"`
 }
 
 // FlowSession Create a session and link clients
@@ -26,21 +33,38 @@ type FlowSession struct {
 	Chat    ChatRoom
 
 	RawDoc       []byte // Just share data
-	nodeActivity map[string]NodeActivity
+	nodeActivity map[string]*NodeActivity
 
 	flow *flow.Flow
 }
 
 //NewSession creates and initializes a NewSession
 func NewSession(ID string) *FlowSession {
+	// Or load
+	//
+	//
+	fpath, err := pathFor(ID)
+	if err != nil {
+		log.Println("Error fetching filepath", err)
+	}
+
+	rawDoc, err := ioutil.ReadFile(fpath)
+	if err != nil {
+		log.Println("Warning: unable to read file:", err)
+	}
+	if rawDoc == nil {
+		rawDoc = []byte{}
+	}
+
 	s := &FlowSession{
-		sync.Mutex{},
-		ID,
-		[]*websocket.Conn{},
-		ChatRoom{},
-		[]byte{},
-		map[string]NodeActivity{},
-		nil,
+		Mutex:        sync.Mutex{},
+		ID:           ID,
+		clients:      []*websocket.Conn{},
+		Chat:         ChatRoom{},
+		RawDoc:       rawDoc,
+		nodeActivity: map[string]*NodeActivity{},
+
+		flow: nil,
 	}
 	return s
 }
@@ -99,10 +123,30 @@ func (s *FlowSession) DocumentUpdate(c *websocket.Conn, data []byte) error {
 	s.Lock()
 	defer s.Unlock()
 
-	s.RawDoc = data
+	s.RawDoc = make([]byte, len(data))
+	copy(s.RawDoc, data)
 
 	return s.broadcast(c, flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
+}
+
+// DocumentSave persist document in a file
+func (s *FlowSession) DocumentSave() error {
+	s.Lock()
+	defer s.Unlock()
+
+	fpath, err := pathFor(s.ID)
+	if err != nil {
+		log.Println("path error", err)
+		return err
+	}
+
+	err = ioutil.WriteFile(fpath, s.RawDoc, os.FileMode(0600))
+	if err != nil {
+		log.Println("writing file", err)
+		return err
+	}
 
+	return s.broadcast(nil, flowmsg.SendMessage{OP: "documentSave", Data: "saved"})
 }
 
 // Document send document to client c
@@ -120,12 +164,13 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 	ID := string(data[1 : len(data)-1]) // remove " instead of unmarshalling json
 	log.Println("Node will run", ID)
 	if s.flow != nil {
+		log.Println("Node already running")
 		return errors.New("node already running")
 	}
 	log.Println("Flow--\n", s.flow)
 
 	// Clear activity
-	s.nodeActivity = map[string]NodeActivity{}
+	s.nodeActivity = map[string]*NodeActivity{}
 	s.Broadcast(nil, flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
 
 	go func() {
@@ -139,26 +184,49 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 		}()
 		log.Println("Attaching hooks")
 		//XXX: Transform this function to a typed hook structure
-		s.flow.Handle(func(name string, payload map[string]flow.Data) {
-			switch name {
-			case "nodeWait":
-				s.nodeActivity[payload["id"].(string)] = NodeActivity{Status: "waiting"}
-			case "nodeStart":
-				s.nodeActivity[payload["id"].(string)] = NodeActivity{Status: "running"}
-			case "nodeFinish":
-				s.nodeActivity[payload["id"].(string)] = NodeActivity{Status: "finish", Result: payload["result"]}
-			}
-			// Add to executing nodes
-			log.Println("Inform every one about", name)
-			s.Broadcast(nil, flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
+
+		s.flow.Hook(flow.Hook{
+			Any: func(name string, ID string, extra ...flow.Data) {
+				log.Println("Flow lock")
+				s.Lock()
+				defer s.Unlock()
+
+				act, ok := s.nodeActivity[ID]
+				if !ok {
+					act = &NodeActivity{}
+					s.nodeActivity[ID] = act
+				}
+				status := ""
+				log.Println("Switching funcs:", name)
+				switch name {
+				case "Wait":
+					status = "waiting"
+				case "Start":
+					status = "running"
+				case "Finish":
+					status = "finish"
+					act.Data = extra[0]
+				case "Error":
+					status = "error"
+					act.Data = extra[0]
+				}
+				if act.Status == status {
+					return
+				}
+				act.Status = status
+				log.Println("Broadcasting the message")
+				s.broadcast(nil, flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
+
+			},
 		})
 
-		log.Println("Execute node:", ID)
+		log.Println("Fetching operation:", ID)
 		op := s.flow.Res(ID)
 		if s.flow.Err() != nil {
 			log.Println("error fetching document", s.flow.Err())
 		}
 
+		log.Println("Execute node", op)
 		res := op.Process()
 		if s.flow.Err() != nil {
 			log.Println("error processing node", s.flow.Err())
@@ -191,3 +259,16 @@ func (s *FlowSession) broadcast(c *websocket.Conn, v interface{}) error {
 	return nil
 
 }
+
+func pathFor(ID string) (string, error) {
+	{
+		err := os.MkdirAll(storePath, 0755)
+		if err != nil {
+			return "", err
+		}
+	}
+	fpath := filepath.Clean(ID)
+	_, fpath = filepath.Split(fpath)
+	fpath = filepath.Join(storePath, fpath)
+	return fpath, nil
+}

+ 53 - 47
go/src/flowserver/sessionmgr.go

@@ -7,6 +7,8 @@ import (
 	"flowserver/flowmsg"
 	"log"
 	"net/http"
+	"path/filepath"
+	"runtime"
 	"sync"
 
 	"github.com/gorilla/websocket"
@@ -98,98 +100,102 @@ func (fsm *FlowSessionManager) ServeHTTP(w http.ResponseWriter, r *http.Request)
 
 		m := flowmsg.RecvMessage{}
 		err = json.Unmarshal(data, &m)
-		if err != nil {
+		if e(err) {
 			log.Println("Err parsing message:", err)
 			c.WriteJSON("bye")
 			break
 		}
-		switch m.OP {
-		/////////////////////////////
-		// NEWSESSION request
-		//////////////////
-		case "sessionNew":
-			err = func() error {
+		err = func() error {
+			switch m.OP {
+			/////////////////////////////
+			// NEWSESSION request
+			//////////////////
+			case "sessionNew":
 				log.Println("We want a new session so")
 				sess = fsm.CreateSession()
 				return sess.ClientAdd(c)
-			}()
-			//////////////////////////////////
-			// LOADSESSION request
-			////////////////////////////////////
-		case "sessionLoad":
-			err = func() error {
+				//////////////////////////////////
+				// LOADSESSION request
+				////////////////////////////////////
+			case "sessionLoad":
 				sessID := string(m.ID)
 
 				if sess != nil {
 					sess.ClientRemove(c)
 				}
 				sess, err = fsm.LoadSession(sessID) // Set our session
-				if err != nil {
+				if e(err) {
 					return err
 				}
 				return sess.ClientAdd(c)
-			}()
 
-		///////////////////////
-		// DOCUMENTUPDATE Receive a document
-		//////
-		case "documentUpdate":
-			err = func() error {
+			///////////////////////
+			// DOCUMENTUPDATE Receive a document
+			//////
+			case "documentUpdate":
 				sess, err = fsm.LoadSession(m.ID)
-				if err != nil {
+				if e(err) {
 					return err
 				}
 				return sess.DocumentUpdate(c, m.Data)
-			}()
-		case "nodeUpdate":
-			err = func() error {
+			case "documentSave":
+				if sess == nil {
+					return errors.New("documentSave: invalid session")
+				}
+				return sess.DocumentSave()
+
+			//////////////////
+			// NODE operations
+			/////////
+			case "nodeUpdate":
 				if sess == nil {
 					return errors.New("nodeUpdate: invalid session")
 				}
 				return sess.Broadcast(c, flowmsg.SendMessage{OP: m.OP, Data: m.Data})
-			}()
-		case "nodeProcess":
-			err = func() error {
+			case "nodeProcess":
 				if sess == nil {
 					return errors.New("nodeRun: invalid session")
 				}
 				return sess.NodeRun(c, m.Data)
-			}()
-		case "chatJoin":
-			err = func() error {
+			////////////////////
+			// CHAT operations
+			/////////
+			case "chatJoin":
 				if sess == nil {
-					return nil // just do nothing
+					return nil
 				}
 				var handle string
 				json.Unmarshal(m.Data, &handle)
 				log.Println("Joining with:", handle)
 				sess.ChatJoin(c, handle)
-				return nil
-			}()
-		case "chatRename":
-			err = func() error {
+			case "chatRename":
 				if sess == nil {
-					return nil // just do nothing
+					return nil
 				}
 				var handle string
 				json.Unmarshal(m.Data, &handle)
 				sess.Chat.ClientRename(c, handle)
-				return nil
-			}()
 
-		case "chatEvent":
-			err = func() error {
+			case "chatEvent":
 				if sess == nil {
 					return errors.New("invalid session")
 				}
 				sess.Chat.Event(c, m.Data)
-				return nil
-			}()
-		}
-
-		if err != nil {
-			log.Println("Err Writing", err)
-		}
+			}
+			return nil
+		}()
 	}
 	log.Println("ws Is disconnecting", r.RemoteAddr)
 }
+
+func e(err error) bool {
+	if err == nil {
+		return false
+	}
+
+	_, file, line, _ := runtime.Caller(1)
+	file = filepath.Base(file)
+
+	log.Printf("Err: %s:%d %v", file, line, err)
+	return true
+}