Forráskód Böngészése

Starting context menu

luis 7 éve
szülő
commit
6fc3d0796b

+ 6 - 13
browser/vue-flow/README.md

@@ -1,18 +1,11 @@
-# vue-svg
+# flow-ui
 
-> vue-svg-test
+## Things to do
 
-## Build Setup
+### UI
 
-``` bash
-# install dependencies
-npm install
+* Mark selection with better colors to make more noticeable
 
-# serve with hot reload at localhost:8080
-npm run dev
+### Code & Services
 
-# build for production with minification
-npm run build
-```
-
-For detailed explanation on how things work, consult the [docs for vue-loader](http://vuejs.github.io/vue-loader).
+* Possible implementation of centralized state (vuex)

+ 3 - 28
browser/vue-flow/src/assets/default-theme.css

@@ -18,6 +18,7 @@
   --selector-background: rgba(0, 0, 200, 0.1);
   --selector-color: var(--primary);
   --transition-speed: 0.2s;
+  --transition-speed-slow: 0.7s;
 }
 
 .vertical_sep {
@@ -325,6 +326,8 @@ h3 {
   color: var(--normal);
 }
 
+/* End notification */
+
 .fade-enter-active,
 .fade-leave-active {
   transition: opacity 0.5s;
@@ -334,31 +337,3 @@ h3 {
 .fade-leave-to {
   opacity: 0;
 }
-
-.slide-leave-active,
-.slide-enter-active {
-  transition: 1s;
-}
-
-.slide-enter {
-  transform: translate(100%, 0);
-}
-
-.slide-leave-to {
-  transform: translate(-100%, 0);
-}
-
-/*
-.flow-modal .flow-view {
-  background: var(--background);
-}
-
-.flow-modal .hx-modal__container {
-  background: var(--background-secondary);
-  color: var(--normal);
-}
-
-.flow-modal .hx-modal__body input {
-  background: var(--background) !important;
-  color: var(--normal) !important;
-}*/

+ 4 - 4
browser/vue-flow/src/components/chat.vue

@@ -72,11 +72,11 @@ export default {
           this.$refs.messages.scrollTop = this.$refs.messages.scrollHeight
         })
       }
-      if (this.active === false) {
+      /* if (this.active === false) {
         if (val.some(e => e.type === 'msg')) {
           this.active = true
         }
-      }
+      } */
     }
   },
   mounted () {
@@ -105,9 +105,9 @@ export default {
       this.userList = v.data
     },
     recvChatEvent (v) {
+      if (!this.active) this.$notify(`<b>${v.data.handle}:</b> ${v.data.message}`)
       this.events.push(v.data)
     },
-
     sendChatJoin () {
       // Clear messages here
       this.events = []
@@ -120,8 +120,8 @@ export default {
       this.$flowService.chatRename(this.handle)
     }
   }
-
 }
+
 function pad (n, width, z) {
   z = z || '0'
   n = n + ''

+ 4 - 4
browser/vue-flow/src/components/defregistry.js

@@ -17,11 +17,11 @@ export default{
     style: { color: '#555' }
     // , props: {value: ''}
   },
-  'Log': {
-    categories: ['core'],
-    inputs: [{type: 'interface {}'}],
+  'Notify': {
+    categories: ['flow-web'],
+    inputs: [{type: 'interface {}'}, {type: 'string', name: 'msg'}],
     output: {type: 'interface {}'},
-    style: { color: '#665', shape: 'circle' }
+    style: { color: '#665'}
   }
 
 }

+ 46 - 3
browser/vue-flow/src/components/flow/manager.vue

@@ -40,6 +40,8 @@
           @nodePointerDown.prevent="nodePointerDown($event,i)"
           @socketPointerDown="socketPointerDown(n.id,...arguments)"
           @nodeDoubleClick="$emit('nodeDblClick',n)"
+          @nodeRightClick="$refs.menu.open($event,n)"
+
         />
         <!-- mouse link-->
         <flow-link
@@ -67,19 +69,29 @@
       nodes: {{ nodeData.nodes.length }}
       links: {{ nodeData.links.length }}
     </div>
-
+    <hx-context-menu ref="menu">
+      <template slot-scope="d" >
+        <div class="flow-node__context-menu">
+          <button @click="nodeProcess(d.userData)">Run</button>
+          <button @click="nodeRemove(d.userData)">Delete</button>
+          <hr>
+          <button @click="nodeInspect(d.userData)">Inspect</button>
+        </div>
+      </template>
+    </hx-context-menu>
   </div>
 </template>
 <script>
 import FlowNode from './node'
 import FlowLink from './link'
 import FlowPanZoom from './panzoom'
+import HxContextMenu from '@/components/shared/hx-contextmenu'
 import SvgDefs from './svgdefswrapper'
 import utils from '@/utils/utils'
 
 export default {
   name: 'FlowManager',
-  components: {FlowNode, FlowLink, FlowPanZoom, SvgDefs},
+  components: {FlowNode, FlowLink, FlowPanZoom, HxContextMenu, SvgDefs},
   props: {
     'activity': {type: Object, default: () => {}},
     'registry': {type: Object, default: () => {}},
@@ -350,6 +362,10 @@ export default {
         }})
     },
 
+    nodeInspect (tnode) {
+      this.$emit('nodeInspect', tnode)
+    },
+
     nodePointerDown (ev, i) {
       document.activeElement && document.activeElement.blur()
       const tnode = this.nodeData.nodes[i]
@@ -363,7 +379,7 @@ export default {
         this.socketPointerDown(tnode.id, ev, {out: 0})
         return
       }
-      this.$emit('nodeInspect', tnode)
+      this.nodeInspect(tnode)
 
       // Switch selection
       if (!this.nodeSelection[tnode.id] && !ev.ctrlKey) this.nodeSelection = {}
@@ -427,6 +443,10 @@ export default {
       this.sendFlowEvent('nodeUpdate', (nu[newNode.id] = newNode, nu))
       this.sendDocumentUpdate()
     },
+    nodeProcess (node) {
+      console.log('Node process demand')
+      this.$emit('nodeProcess', node)
+    },
     linkPointerClick (ev, link) {
       ev.preventDefault()
       this.linkRemove(link)
@@ -562,4 +582,27 @@ export default {
   opacity:1;
 }
 
+.flow-node__context-menu {
+  box-shadow: 0 1px 4px  rgba(0,0,0,0.2);
+  display:flex;
+  flex-flow: column;
+  justify-content: start;
+  align-items: stretch;
+  width:180px;
+}
+
+.flow-node__context-menu > * {
+  width:100%;
+}
+
+.flow-node__context-menu hr {
+  width:90%;
+  border:none;
+  border-bottom: solid 1px rgba(150,150,150,0.2);
+}
+
+.flow-node__context-menu button {
+  text-align:left;
+}
+
 </style>

+ 1 - 0
browser/vue-flow/src/components/flow/node.vue

@@ -7,6 +7,7 @@
     }"
     :status="status"
     @mousedown.stop.prevent="$emit('nodePointerDown',$event)"
+    @contextmenu.capture.prevent="$emit('nodeRightClick', $event)"
     @dblclick="$emit('nodeDoubleClick',$event)"
   >
 

+ 22 - 5
browser/vue-flow/src/components/main.vue

@@ -78,6 +78,7 @@
             :registry="registry"
             @funcsPanelToggle="funcsActive=!funcsActive"
             @nodeInspect="nodeInspectStart(...arguments)"
+            @nodeProcess="nodeProcess(...arguments)"
             @nodeDblClick="nodeInspectStart(...arguments,true)"
             @documentSave="documentSave"
 
@@ -159,7 +160,7 @@
           <button class="primary" @click="nodeInspect=false">OK</button>
         </div>
       </hx-modal>-->
-
+      <hx-notify/>
     </div>
   </div>
 </template>
@@ -172,6 +173,7 @@ import FlowFuncs from './panel-funcs'
 import FlowInspector from './panel-inspector'
 import HxSplit from '@/components/shared/hx-split'
 import HxModal from '@/components/shared/hx-modal'
+import HxNotify from '@/components/shared/hx-notify'
 import defRegistry from './defregistry'
 import 'reset-css/reset.css'
 
@@ -180,7 +182,17 @@ import '@/assets/style.css'
 // import nodeData from './nodedata'
 
 export default {
-  components: {FlowManager, FlowNode, FlowPanzoom, FlowInspector, FlowFuncs, HxSplit, HxModal, AppChat},
+  components: {
+    FlowManager,
+    FlowNode,
+    FlowPanzoom,
+    FlowInspector,
+    FlowFuncs,
+    HxSplit,
+    HxModal,
+    HxNotify,
+    AppChat
+  },
   data () {
     return {
       registry: JSON.parse(JSON.stringify(defRegistry)),
@@ -238,13 +250,15 @@ export default {
     this.$flowService.on('nodeActivity', (v) => {
       this.activity = v.data || {}
     })
-    this.$flowService.on('sessionLog', (v) => {
+    this.$flowService.on('sessionNotify', (v) => {
+      this.$notify(v.data)
       // Make this elsewhere
-      console.log(v.data)
+      // console.log(v.data)
     })
 
     // Connected
     this.$flowService.connected(() => {
+      this.$notify('Connected')
       // Make this in a service
       if (this.$route.params.sessId === undefined) {
         this.$flowService.sessionNew()
@@ -256,7 +270,10 @@ export default {
   methods: {
     nodeInspectStart (node, changePane) { // node
       this.nodeActive = node
-      if (changePane) this.panel = 'inspector'
+      if (changePane) {
+        this.funcsActive = true
+        this.panel = 'inspector'
+      }
       if (this.panel !== 'inspector') {
         return
       }

+ 9 - 2
browser/vue-flow/src/components/panel-inspector.vue

@@ -27,6 +27,8 @@
         <!-- DESCRIPTIONS -->
         <div class="flow-inspector__area flow-inspector--properties ">
 
+          <label>ID</label>
+          <div class="property">[{{ nodeInspect.id }}]</div>
           <label>Description</label>
           <div class="property">Bogus description</div>
           <label>Help</label>
@@ -104,17 +106,17 @@ export default {
   display:flex;
   flex-flow:column;
   color: var(--normal);
-  padding:10px;
 }
 
 .flow-inspector__container {
+  padding:10px;
   display:flex;
   flex-flow:column;
   white-space: nowrap;
   width:100%;
   flex-basis:100%;
   transition: all var(--transition-speed);
-  overflow:hidden;
+  overflow-y:auto;
 }
 
 .flow-inspector input {
@@ -146,6 +148,10 @@ export default {
   flex-shrink: 0;
 }
 
+.flow-inspector__area .property {
+  white-space: normal;
+}
+
 .flow-inspector--properties-error {
   color: red;
 }
@@ -161,5 +167,6 @@ export default {
 
 .flow-inspector--control button {
   width: 100%;
+  height:50px;
 }
 </style>

+ 79 - 0
browser/vue-flow/src/components/shared/hx-contextmenu.vue

@@ -0,0 +1,79 @@
+<template>
+  <div
+    :class="{visible:isVisible}"
+    class="hx-context-menu"
+    :style="style" tabindex="-1" @blur="close" @click="close" @contextmenu.capture.prevent>
+    <slot :user-data="userData"/>
+  </div>
+</template>
+
+<script>
+
+module.exports = {
+  name: 'hx-context-menu',
+  data () {
+    return {
+      isVisible: false,
+      x: null,
+      y: null,
+      userData: null
+    }
+  },
+  computed: {
+    style () {
+      let x = this.x - 20
+      let y = this.y - 10
+
+      if (!this.$el || !this.$parent || !this.$parent.$el) { return }
+      const parentRect = this.$parent.$el.getBoundingClientRect()
+
+      var right = parentRect.left + parentRect.width
+      const diffX = x + this.$el.offsetWidth - right
+      if (diffX > 0) {
+        x -= diffX + 2
+      }
+      const bottom = parentRect.top + parentRect.height
+      const diffY = y + this.$el.offsetHeight - bottom
+      if (diffY > 0) {
+        y -= diffY + 2
+      }
+
+      return {
+        top: y - document.body.scrollTop + 'px',
+        left: x + 'px'
+      }
+    }
+  },
+  methods: {
+    open (evt, userData) {
+      this.isVisible = true
+      this.x = evt.pageX || evt.clientX
+      this.y = evt.pageY || evt.clientY
+      this.userData = userData
+      this.$nextTick(() => this.$el.focus())
+    },
+    close (evt) {
+      this.isVisible = false
+      this.userData = null
+    }
+  }
+}
+</script>
+
+<style scoped>
+.hx-context-menu {
+  pointer-events: none;
+  opacity:0;
+  position: fixed;
+  z-index: 2000;
+  background: var(--background-secondary);
+  color: var(--normal);
+  transition: opacity var(--transition-speed);
+}
+
+.hx-context-menu:focus {
+  pointer-events: initial;
+  opacity:0.9;
+  outline: none;
+}
+</style>

+ 106 - 0
browser/vue-flow/src/components/shared/hx-notify.vue

@@ -0,0 +1,106 @@
+<template>
+  <div
+    :class="{active: msgs.length>0 && active}"
+    class="hx-notification"
+    @transitionend="transitionend"
+  >
+    <p
+      :key = "i"
+      v-for="(m,i) of msgs"
+      v-html="m"/>
+  </div>
+
+</template>
+
+<script>
+import Vue from 'vue'
+// Component
+export default {
+  name: 'HxNotification',
+  data () {
+    return {
+      msgs: [],
+      active: true
+    }
+  },
+  mounted () {
+    this.$notify.on('notification', this.addMessage)
+  },
+  beforeDestroy () {
+    this.$notify.off('notification', this.addMessage)
+  },
+  methods: {
+    addMessage (m) {
+      if (this.msgs.length > 10) {
+        this.msgs.splice(0, 1)
+      }
+      this.msgs.push(m)
+      this.active = true
+
+      clearTimeout(this.timeout)
+      this.timeout = setTimeout(() => {
+        this.active = false
+      }, 5000)
+    },
+    transitionend (ev) {
+      if (this.active === false) this.msgs = [] // clear  before
+    }
+
+  }
+}
+
+// Plugin
+Vue.use({
+  install (Vue, options) {
+    if (Vue.prototype.$notify) {
+      return
+    }
+    const ebus = new Vue()
+    Vue.prototype.$notify = function (msg) {
+      ebus.$emit('notification', msg)
+    }
+    Vue.prototype.$notify.on = ebus.$on.bind(ebus)
+    Vue.prototype.$notify.once = ebus.$once.bind(ebus)
+    Vue.prototype.$notify.off = ebus.$off.bind(ebus)
+  }
+})
+
+</script>
+<style>
+/* notifications */
+.hx-notification {
+  pointer-events:none;
+  user-select:none;
+  opacity:0;
+  position: fixed;
+  z-index: 1000;
+  padding: 20px;
+  width: 50%;
+  left: 50%;
+  transform: translateX(-50%);
+  bottom: 10%;
+  background: var(--normal);
+  color: var(--background);
+  border: solid 1px rgba(150, 150, 150, 0.3);
+  transition: opacity var(--transition-speed);
+}
+
+.hx-notification b {
+  color: var(--primary-lighter);
+}
+
+/* trick to maintain under mouse */
+.hx-notification:hover {
+  border: solid 1px var(--primary);
+  pointer-events:initial !important;
+  user-select:initial !important;
+  opacity:0.8 !important;
+}
+
+.hx-notification.active {
+  pointer-events:initial;
+  user-select:initial;
+  opacity:0.6;
+}
+
+</style>

+ 1 - 0
browser/vue-flow/src/main.js

@@ -8,6 +8,7 @@ let targetws = 'ws://' + window.location.host + '/conn'
 if (window.location.protocol === 'https:') {
   targetws = 'wss://' + window.location.host + '/conn'
 }
+
 Vue.use(FlowService, {location: targetws})
 
 window.app = new Vue({

+ 29 - 33
go/src/flow/cmd/demo1/main.go

@@ -1,13 +1,14 @@
 package main
 
 import (
+	"errors"
 	"flow"
 	"flow/flowserver"
 	"flow/registry"
 	"fmt"
-	"io"
 	"log"
 	"math"
+	"math/rand"
 	"strings"
 	"time"
 
@@ -18,7 +19,8 @@ func main() {
 	prettylog.Global()
 	log.Println("Running version:", flowserver.Version)
 
-	registry.MakeBatch(
+	// String functions
+	registry.Batch(
 		registry.Add(strings.Split).DescInputs("string", "separator"),
 		registry.Add(strings.Join).DescInputs("", "sep"),
 		registry.Add(strings.Compare, strings.Contains),
@@ -31,16 +33,25 @@ func main() {
 		math.Abs, math.Cos, math.Sin, math.Exp, math.Exp2, math.Tanh, math.Max, math.Min,
 	).Tags("math").Extra("style", registry.M{"color": "#386"})
 
-	// Test functions
-	registry.Add(customNew, customEmpty, customSetName, customSetURL).
-		Tags("testing")
+	// Rand functions
+	registry.Batch(
+		registry.Add(rand.Int, rand.Intn, rand.Float64),
+		registry.Register("Perm", func(n int) []int {
+			if n > 10 { // Limiter for safety
+				n = 10
+			}
+			return rand.Perm(n)
+		}),
+	).Tags("rand").Extra("style", registry.M{"color": "#486"})
 
-	registry.Add(testErrorPanic, testErrorDelayed).
+	// Test functions
+	registry.Add(testErrorPanic, testErrorDelayed, testRandomError).
 		Tags("testing-errors")
 
-	registry.Batch{
+	registry.Batch(
 		registry.Register("wait", wait),
-	}.Tags("testing-time").Extra("style", map[string]string{"color": "#8a5"})
+		registry.Register("waitRandom", waitRandom),
+	).Tags("testing-time").Extra("style", map[string]string{"color": "#8a5"})
 
 	addr := ":2015"
 	log.Println("Starting server  at:", addr)
@@ -53,37 +64,15 @@ func wait(data flow.Data, n int) flow.Data {
 	time.Sleep(time.Duration(n) * time.Second) // Simulate
 	return data
 }
-
-func stream(w io.Writer, val interface{}) interface{} {
-	fmt.Fprint(w, val)
-	return val
+func waitRandom(data flow.Data) flow.Data {
+	time.Sleep(time.Duration(rand.Intn(10)) * time.Second)
+	return data
 }
 
 ////////////////////
 // Testing custom struct operations
 /////
 
-// CustomStruct testing custom struct passing
-type CustomStruct struct {
-	Name string
-	URL  string
-}
-
-func customNew(name string, url string) CustomStruct {
-	return CustomStruct{name, url}
-}
-func customEmpty() CustomStruct {
-	return CustomStruct{}
-}
-func customSetName(a CustomStruct, name string) CustomStruct {
-	a.Name = name
-	return a
-}
-func customSetURL(a CustomStruct, url string) CustomStruct {
-	a.URL = url
-	return a
-}
-
 ////////////////////////
 // testOps
 ////////////////
@@ -98,3 +87,10 @@ func testErrorDelayed(n int) (flow.Data, error) {
 	time.Sleep(dur * time.Second)
 	return nil, fmt.Errorf("I got an error %v", dur)
 }
+func testRandomError(d flow.Data) (flow.Data, error) {
+	r := rand.Intn(10)
+	if r > 5 {
+		return nil, errors.New("I failed on purpose")
+	}
+	return d, nil
+}

+ 45 - 27
go/src/flow/flow.go

@@ -34,7 +34,7 @@ type Flow struct {
 	data       map[string]Data // Should be named, to fetch later
 	operations sync.Map
 	//map[string]*opEntry
-	err   error
+	//err   error // should be a list of errors/report
 	runID int
 
 	// Experimental run Event
@@ -56,7 +56,7 @@ func New() *Flow {
 }
 
 // Err Set or get current error
-func (f *Flow) Err(p ...interface{}) error {
+/*func (f *Flow) Err(p ...interface{}) error {
 	f.Lock()
 	defer f.Unlock()
 
@@ -68,7 +68,7 @@ func (f *Flow) Err(p ...interface{}) error {
 		f.err = err
 	}
 	return f.err
-}
+}*/
 
 //SetRegistry use the registry specified
 func (f *Flow) SetRegistry(r *registry.R) *Flow {
@@ -84,7 +84,7 @@ func (f *Flow) SetIDGen(idGen func() string) {
 }
 
 // DefOp Manual tag an Operation
-func (f *Flow) DefOp(id string, name string, params ...interface{}) Operation {
+func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation, error) {
 	inputs := make([]*operation, len(params))
 	for i, p := range params {
 		switch v := p.(type) {
@@ -92,17 +92,20 @@ func (f *Flow) DefOp(id string, name string, params ...interface{}) Operation {
 			inputs[i] = v
 		default:
 			//log.Println("WARNING defining const with value", v)
-			inputs[i] = f.Const(v).(*operation)
+			c, err := f.Const(v)
+			if err != nil {
+				return nil, err
+			}
+			inputs[i], _ = c.(*operation)
 		}
 	}
 	// Grab executor here
 	executor, err := f.registry.Get(name)
 	if err != nil {
-		f.err = err
-		return opNil(f)
+		return nil, err
 	}
 	f.operations.Store(id, &opEntry{sync.Mutex{}, name, inputs, executor})
-	return opFunc(f, id)
+	return opFunc(f, id), nil
 }
 
 // Res returns a deferred operation result
@@ -115,7 +118,7 @@ func (f *Flow) Res(id string) Operation {
 // Op return an function operator
 //  name - a previous registered function
 //  params - the function inputs
-func (f *Flow) Op(name string, params ...interface{}) Operation {
+func (f *Flow) Op(name string, params ...interface{}) (Operation, error) {
 	// Use this on Set?
 	inputs := make([]*operation, len(params))
 	for i, p := range params {
@@ -123,17 +126,21 @@ func (f *Flow) Op(name string, params ...interface{}) Operation {
 		case *operation:
 			inputs[i] = v
 		default:
+
+			c, err := f.Const(v)
+			if err != nil {
+				return nil, err
+			}
 			// fail here
 			//log.Println("WARNING defining const with value", v)
-			inputs[i] = f.Const(v).(*operation)
+			inputs[i] = c.(*operation)
 		}
 	}
 
 	// Grab executor here
 	executor, err := f.registry.Get(name)
 	if err != nil {
-		f.err = err
-		return opNil(f)
+		return nil, err
 	}
 	// generate ID
 	for i := 0; i < 10; i++ {
@@ -142,15 +149,22 @@ func (f *Flow) Op(name string, params ...interface{}) Operation {
 			continue
 		}
 		f.operations.Store(id, &opEntry{sync.Mutex{}, name, inputs, executor})
-		return opFunc(f, id)
+		return opFunc(f, id), nil
 	}
-	f.err = errors.New("ID exausted")
-	return opNil(f)
+	return nil, errors.New("ID exausted")
 	// Initialize opfunc maybe
 }
 
+// Must it will panic on error
+func (f *Flow) Must(op Operation, err error) Operation {
+	if err != nil {
+		panic(err)
+	}
+	return op
+}
+
 // Const returns a const operation
-func (f *Flow) Const(value Data) Operation {
+func (f *Flow) Const(value Data) (Operation, error) {
 	// generate ID
 	for i := 0; i < 10; i++ {
 		id := f.idGen()
@@ -158,10 +172,10 @@ func (f *Flow) Const(value Data) Operation {
 			continue
 		}
 		f.consts[id] = value
-		return opConst(f, id)
+		return opConst(f, id), nil
 	}
-	f.err = errors.New("ID exausted")
-	return opNil(f)
+
+	return nil, errors.New("ID exausted")
 }
 
 // Var operation
@@ -184,9 +198,6 @@ func (f *Flow) In(paramID int) Operation {
 
 // Run a batch of operation?
 func (f *Flow) Run(op Operation, params ...Data) (Data, error) {
-	if f.err != nil {
-		return nil, f.err
-	}
 	cache := map[*operation]Data{}
 	return f.run(cache, op, params...)
 }
@@ -210,7 +221,11 @@ func (f *Flow) run(cache map[*operation]Data, op Operation, params ...Data) (Dat
 		}
 		r = reflect.ValueOf(op.executor).Call(callParam)[0].Interface()
 	} else {
-		r = o.process(nil, params...)
+		var err error
+		r, err = o.process(nil, params...)
+		if err != nil {
+			return nil, err
+		}
 	}
 	cache[o] = r
 	return r, nil
@@ -232,9 +247,9 @@ func (f *Flow) Analyse(w io.Writer, params ...Data) {
 			if j != 0 {
 				fmt.Fprintf(fw, ", ")
 			}
-			ires := in.Process(params...)
-			if f.err != nil {
-				fmt.Fprintf(w, "Operator: %s error#%s\n", op.name, f.err.Error())
+			ires, err := in.Process(params...)
+			if err != nil {
+				fmt.Fprintf(w, "Operator: %s error#%s\n", op.name, err)
 				return false
 			}
 			fmt.Fprintf(fw, " %s[%v](%v)", in.kind, in.id, ires)
@@ -243,7 +258,10 @@ func (f *Flow) Analyse(w io.Writer, params ...Data) {
 		// Create OpProcessor and execute
 		//
 		opfn := opFunc(f, k)
-		res := opfn.Process(params...)
+		res, err := opfn.Process(params...)
+		if err != nil {
+			fmt.Fprintf(fw, "ERR\n")
+		}
 		fmt.Fprintf(fw, "%v\n", res)
 
 		fmt.Fprintf(w, "%s", fw.String())

+ 66 - 67
go/src/flow/flow_test.go

@@ -3,7 +3,6 @@ package flow_test
 import (
 	"bytes"
 	"encoding/json"
-	"errors"
 	"testing"
 	"time"
 
@@ -18,35 +17,27 @@ import (
 func init() {
 	assert.Quiet = true
 }
-func TestError(t *testing.T) {
-	a := assert.A(t)
-
-	f := flow.New()
-	err := errors.New("test")
-	f.Err(err)
-	a.Eq(f.Err(), err, "error should be the same")
-}
 func TestDefOp(t *testing.T) {
 	a := assert.A(t)
 	f := flow.New()
 
-	f.DefOp("2", "vecadd", []float32{1, 1, 1}, []float32{2, 2, 2}) // r:3 3 3
-	a.Eq(f.Err(), nil, "doing DefOp")
+	var err error
+	_, err = f.DefOp("2", "vecadd", []float32{1, 1, 1}, []float32{2, 2, 2}) // r:3 3 3
+	a.Eq(err, nil, "doing DefOp")
 
-	f.DefOp("1", "vecadd", []float32{1, 2, 3}, f.Res("2")) // r: 4 5 6
-	a.Eq(f.Err(), nil, "doing DefOp")
+	_, err = f.DefOp("1", "vecadd", []float32{1, 2, 3}, f.Res("2")) // r: 4 5 6
+	a.Eq(err, nil, "doing DefOp")
 
-	op := f.Op("vecmul", f.Res("1"), []float32{2, 2, 2}) //r:8 10 12
-	a.Eq(f.Err(), nil, "mul operation")
+	op, err := f.Op("vecmul", f.Res("1"), []float32{2, 2, 2}) //r:8 10 12
+	a.Eq(err, nil, "mul operation")
 	a.NotEq(op, nil, "operation not nil")
 
 	desired := []float32{8, 10, 12}
-	res := op.Process()
+	res, _ := op.Process()
 	a.Eq(desired, res, "vector result should match")
 
-	op = f.DefOp("123", "none")
-	op.Process()
-	a.NotEq(f.Err(), nil, "Error should not be nil")
+	op, err = f.DefOp("123", "none")
+	a.NotEq(err, nil, "Error should not be nil")
 }
 
 func TestIDGen(t *testing.T) {
@@ -63,18 +54,18 @@ func TestIDGen(t *testing.T) {
 		return newID
 	})
 
-	o := f.Op("vecadd", f.In(0), f.In(1))
-	a.Eq(f.Err(), nil, "Should not nil")
+	o, err := f.Op("vecadd", f.In(0), f.In(1))
+	a.Eq(err, nil, "Should not nil")
 	a.Eq(o.ID(), "1", "id should be 1")
 
-	o = f.Op("vecadd", f.In(0), f.In(1))
-	a.Eq(f.Err(), nil, "Should not nil")
+	o, err = f.Op("vecadd", f.In(0), f.In(1))
+	a.Eq(err, nil, "Should not nil")
 	a.Eq(o.ID(), "2", "id should be 2")
 
-	c := f.Const(1)
-	f.Const(1)
-	o = f.Op("vecadd", f.In(0), c)
-	a.NotEq(f.Err(), nil, "Should be nil,id generation exausted")
+	c, _ := f.Const(1)
+	_, err = f.Const(1)
+	a.NotEq(err, nil, "Should not be nil,id generation exausted")
+	o, err = f.Op("vecadd", f.In(0), c)
 
 }
 
@@ -83,25 +74,25 @@ func TestSerialize(t *testing.T) {
 	f := flow.New()
 	var1 := f.Var("var1", []float32{4, 4, 4})
 
-	c1 := f.Const([]float32{1, 2, 3})
-	c2 := f.Const([]float32{2, 2, 2})
+	c1, _ := f.Const([]float32{1, 2, 3})
+	c2, _ := f.Const([]float32{2, 2, 2})
 
-	op1 := f.Op("vecmul", // op:0 - expected: [12,16,20,24]
+	op1, _ := f.Op("vecmul", // op:0 - expected: [12,16,20,24]
 		f.Var("vec1", []float32{4, 4, 4, 4}),
-		f.Op("vecadd", // op:1 - expected: [3,4,5,6]
-			f.Const([]float32{1, 2, 3, 4}),
-			f.Const([]float32{2, 2, 2, 2}),
-		),
+		f.Must(f.Op("vecadd", // op:1 - expected: [3,4,5,6]
+			f.Must(f.Const([]float32{1, 2, 3, 4})),
+			f.Must(f.Const([]float32{2, 2, 2, 2})),
+		)),
 	)
-	mul1 := f.Op("vecmul", c1, op1)       // op:2 - expected 12, 32, 60, 0
-	mul2 := f.Op("vecmul", mul1, var1)    // op:3 - expected 48, 128, 240, 0
-	mul3 := f.Op("vecmul", c2, mul2)      // op:4 - expected 96, 256, 480, 0
-	mul4 := f.Op("vecmul", mul3, f.In(0)) // op:5 - expected 96, 512, 1440,0
+	mul1, _ := f.Op("vecmul", c1, op1)       // op:2 - expected 12, 32, 60, 0
+	mul2, _ := f.Op("vecmul", mul1, var1)    // op:3 - expected 48, 128, 240, 0
+	mul3, _ := f.Op("vecmul", c2, mul2)      // op:4 - expected 96, 256, 480, 0
+	mul4, _ := f.Op("vecmul", mul3, f.In(0)) // op:5 - expected 96, 512, 1440,0
 
 	s := bytes.NewBuffer(nil)
 	f.Analyse(s, []float32{1, 2, 3, 4})
 	t.Log(s)
-	res := mul4.Process([]float32{1, 2, 3, 4})
+	res, _ := mul4.Process([]float32{1, 2, 3, 4})
 
 	t.Log("Res:", res)
 	t.Log("Flow:\n", f)
@@ -117,20 +108,20 @@ func TestConst(t *testing.T) {
 	a := assert.A(t)
 	f := flow.New()
 
-	c := f.Const(1)
-	res := c.Process()
-	//Expect(t).Eq(res, 1)
+	c, _ := f.Const(1)
+	res, err := c.Process()
 	a.Eq(res, 1, "It should be one")
+	a.Eq(err, nil, "const should not error")
 }
 func TestOp(t *testing.T) {
 	a := assert.A(t)
 	f := flow.New()
 
-	add := f.Op("vecadd",
-		f.Op("vecmul",
+	add, err := f.Op("vecadd",
+		f.Must(f.Op("vecmul",
 			[]float32{1, 2, 3},
 			[]float32{2, 2, 2},
-		),
+		)),
 		[]float32{1, 2, 3},
 	)
 	res, err := f.Run(add)
@@ -141,34 +132,42 @@ func TestOp(t *testing.T) {
 }
 
 func TestVariable(t *testing.T) {
+	a := assert.A(t)
 	f := flow.New()
 	v := f.Var("v1", 1)
 
-	res := v.Process()
-	assert.A(t).Eq(res, 1)
+	res, err := v.Process()
+	a.Eq(err, nil)
+	a.Eq(res, 1)
 
 	v.Set(2)
-	res = v.Process()
-	assert.Eq(t, res, 2)
+	res, err = v.Process()
+	a.Eq(err, nil)
+	a.Eq(res, 2)
 }
 
 func TestCache(t *testing.T) {
+	a := assert.A(t)
 	f := flow.New()
 	{
-		r := f.Op("inc")
+		r, err := f.Op("inc")
+		a.Eq(err, nil, "should not error giving operation")
+
 		var res interface{}
 		for i := 1; i < 5; i++ {
-			res = r.Process()
-			assert.Eq(t, res, i)
+			res, err = r.Process()
+			a.Eq(err, nil)
+			a.Eq(res, i)
 		}
 	}
 	{
 		var res flow.Data
-		inc := f.Op("inc")
-		add := f.Op("add", inc, inc)
-		res = add.Process() // 1+1
+		inc, _ := f.Op("inc")
+
+		add, _ := f.Op("add", inc, inc)
+		res, _ = add.Process() // 1+1
 		assert.Eq(t, res, 2)
-		res = add.Process() // 2+2
+		res, _ = add.Process() // 2+2
 		assert.Eq(t, res, 4)
 	}
 }
@@ -185,19 +184,19 @@ func TestHandler(t *testing.T) {
 }
 
 func TestLocalRegistry(t *testing.T) {
-	assert := assert.A(t)
+	a := assert.A(t)
 
 	r := registry.New()
 	e := r.Register("test", func() string { return "" })
-	assert.NotEq(e, nil, "registered in a local register")
+	a.NotEq(e, nil, "registered in a local register")
 
 	f := flow.New()
 	f.SetRegistry(r)
-	op := f.Op("test")
-	assert.NotEq(op, nil, "operation should be valid")
+	op, _ := f.Op("test")
+	a.NotEq(op, nil, "operation should be valid")
 
-	op = f.Op("none")
-	assert.NotEq(f.Err(), nil, "flow should contain an error")
+	op, err := f.Op("none")
+	a.NotEq(err, nil, "flow should contain an error")
 }
 
 func BenchmarkComplex(b *testing.B) {
@@ -235,11 +234,11 @@ func prepareComplex() (*flow.Flow, flow.Operation) {
 	f1 := f.Var("f1", v1)
 	f2 := f.Var("f2", v2)
 
-	mul := f.Op("vecmul", f1, f2)      // Doubles 2,4,6,8...
-	add := f.Op("vecadd", mul, f2)     // Sum 4,8,10,12...
-	mul2 := f.Op("vecmul", mul, add)   // mul again
-	mul3 := f.Op("vecmul", mul2, f1)   // mul with f1
-	div1 := f.Op("vecdiv", mul3, mul2) // div
+	mul, _ := f.Op("vecmul", f1, f2)      // Doubles 2,4,6,8...
+	add, _ := f.Op("vecadd", mul, f2)     // Sum 4,8,10,12...
+	mul2, _ := f.Op("vecmul", mul, add)   // mul again
+	mul3, _ := f.Op("vecmul", mul2, f1)   // mul with f1
+	div1, _ := f.Op("vecdiv", mul3, mul2) // div
 
 	return f, div1
 }

+ 1 - 1
go/src/flow/flowserver/flowbuilder.go

@@ -93,7 +93,7 @@ func FlowBuild(rawData []byte, r *registry.R) (*flow.Flow, error) {
 					//param[l.In] = nil
 					//continue
 				}
-				param[l.In] = f.Const(newVal.Elem().Interface())
+				param[l.In], _ = f.Const(newVal.Elem().Interface())
 			default:
 				param[l.In] = f.Res(from.ID)
 			}

+ 11 - 14
go/src/flow/flowserver/session.go

@@ -189,7 +189,10 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 
 		localr := registry.Global.Clone()
 		//Add our log func that is not in global registry
-		localr.Register("Log", s.Log)
+		localr.Register("Notify", func(v flow.Data, msg string) flow.Data {
+			s.Notify(msg)
+			return v
+		})
 
 		s.flow, err = FlowBuild(s.RawDoc, localr)
 
@@ -236,13 +239,11 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 		})
 
 		op := s.flow.Res(ID)
-		if s.flow.Err() != nil {
-			log.Println("error fetching document", s.flow.Err())
-		}
-		op.Process()
-		if s.flow.Err() != nil {
-			log.Println("error processing node", s.flow.Err())
+		_, err := op.Process()
+		if err != nil {
+			log.Println("error processing node", err)
 		}
+		s.flow = nil
 		/*func() {
 			s.Lock()
 			defer s.Unlock()
@@ -256,13 +257,9 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 	return nil
 }
 
-// Log broadcast a log event
-func (s *FlowSession) Log(v interface{}) (interface{}, error) {
-	err := s.Broadcast(nil, flowmsg.SendMessage{OP: "sessionLog", Data: v})
-	if err != nil {
-		return nil, err
-	}
-	return v, nil
+// Notify broadcast a notification to clients
+func (s *FlowSession) Notify(v interface{}) error {
+	return s.Broadcast(nil, flowmsg.SendMessage{OP: "sessionNotify", Data: v})
 }
 
 // Broadcast broadcast a message in session besides C

+ 44 - 47
go/src/flow/operation.go

@@ -8,7 +8,6 @@ package flow
 import (
 	"errors"
 	"fmt"
-	"log"
 	"reflect"
 	"sync"
 )
@@ -28,7 +27,7 @@ func dumbSet(params ...Data) {}
 type Operation interface { // Id perhaps?
 	ID() string
 	Set(inputs ...Data) // Special var method
-	Process(params ...Data) Data
+	Process(params ...Data) (Data, error)
 }
 
 // Run Context actually not OpCTX
@@ -39,7 +38,7 @@ type operation struct {
 	id      interface{} // Interface key
 	kind    string
 	set     func(params ...Data)
-	process func(ctx OpCtx, params ...Data) Data
+	process func(ctx OpCtx, params ...Data) (Data, error)
 }
 
 // Id returns string Id of the operaton
@@ -48,12 +47,12 @@ func (o *operation) ID() string {
 }
 
 // Process operation process wrapper
-func (o *operation) Process(params ...Data) Data {
+func (o *operation) Process(params ...Data) (Data, error) {
 	return o.processWithCtx(newOpCtx(), params...)
 }
 
 // Every single one is run with this internally
-func (o *operation) processWithCtx(ctx OpCtx, params ...Data) Data {
+func (o *operation) processWithCtx(ctx OpCtx, params ...Data) (Data, error) {
 	return o.process(ctx, params...)
 	/*entry, _ := o.flow.getOp(fmt.Sprint(o.id))
 	if entry == nil {
@@ -91,12 +90,11 @@ func opIn(f *Flow, id int) *operation {
 		id:   id,
 		kind: "in",
 		set:  dumbSet,
-		process: func(ctx OpCtx, params ...Data) Data {
+		process: func(ctx OpCtx, params ...Data) (Data, error) {
 			if id >= len(params) || id < 0 {
-				f.err = errors.New("invalid input")
-				return nil
+				return nil, errors.New("invalid input")
 			}
-			return params[id]
+			return params[id], nil
 		},
 	}
 }
@@ -106,9 +104,9 @@ func opConst(f *Flow, id string) *operation {
 		id:   id,
 		kind: "const",
 		set:  dumbSet,
-		process: func(ctx OpCtx, params ...Data) Data {
+		process: func(ctx OpCtx, params ...Data) (Data, error) {
 			ret := f.consts[id]
-			return ret
+			return ret, nil
 		},
 	}
 }
@@ -120,41 +118,39 @@ func opFunc(f *Flow, id string) *operation {
 		id:   id,
 		kind: "func",
 		set:  dumbSet,
-		process: func(ctx OpCtx, params ...Data) Data {
+		process: func(ctx OpCtx, params ...Data) (ret Data, err error) {
 
 			defer func() {
 				if r := recover(); r != nil {
-					f.Err(fmt.Errorf("Panic: %v", r))
-					f.hooks.error(id, f.Err())
+					err = fmt.Errorf("Panic: %v", r)
+					f.hooks.error(id, err)
 				}
 			}()
 
 			op, ok := f.getOp(id)
 			if !ok {
-				f.err = fmt.Errorf("invalid operation '%s'", id)
-				log.Println("Operation not ok", f.err)
-				f.hooks.error(id, f.err)
-				return nil
+				err = fmt.Errorf("invalid operation '%s'", id)
+				f.hooks.error(id, err)
+				return nil, err
 			}
 
-			if f.err != nil {
+			/*if f.err != nil {
 				return nil
-			}
+			}*/
 			op.Lock()
 			defer op.Unlock()
 			if ctx != nil {
-				if v, ok := ctx.Load(id); ok {
-					return v
+				if v, ok := ctx.Load(id); ok { // Cache
+					return v, nil
 				}
 			}
 
 			// Check inputs
 			fnval := reflect.ValueOf(op.executor)
 			if fnval.Type().NumIn() != len(op.inputs) {
-				err := fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
-				f.hooks.error(id, f.err)
-				f.Err(err)
-				return nil
+				err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
+				f.hooks.error(id, err)
+				return nil, err
 			}
 			/////////////////////////////
 			// NEW PARALLEL PROCESSING
@@ -169,48 +165,49 @@ func opFunc(f *Flow, id string) *operation {
 			for i, in := range op.inputs {
 				go func(i int, in *operation) {
 					defer wg.Done()
-					fr := in.processWithCtx(ctx, params...)
-					p := reflect.ValueOf(fr)
-
-					// Error checking
-					if !p.IsValid() {
-						f.Err(fmt.Errorf("Input %d is not valid", i))
+					fr, err := in.processWithCtx(ctx, params...)
+					if err != nil {
 						return
 					}
-					if !p.Type().AssignableTo(fnval.Type().In(i)) {
-						f.Err(fmt.Errorf("Input %d not assignable to %v", i, p.Type()))
-					}
-
 					callParam[i] = reflect.ValueOf(fr)
 				}(i, in)
 			}
 			wg.Wait()
+			// Return type checking
+			errMsg := ""
+			for i, p := range callParam {
+				// TypeChecking checking
+				if !p.IsValid() {
+					errMsg += fmt.Sprintf("Input %d invalid\n", i)
+				} else if !p.Type().AssignableTo(fnval.Type().In(i)) {
+					errMsg += fmt.Sprintf("Input %d type mismatch expected: %v got :%v\n", i, fnval.Type().In(i), p.Type())
+				}
+			}
+			if len(errMsg) > 0 {
+				err := errors.New(errMsg)
+				f.hooks.error(id, err)
+				return nil, err
+			}
 
 			f.hooks.start(id)
 
-			if f.Err() != nil {
-				f.hooks.error(id, f.err)
-				return nil
-			}
-
 			fnret := fnval.Call(callParam)
 			if len(fnret) > 1 && (fnret[1].Interface() != nil) {
 				err, ok := fnret[1].Interface().(error)
 				if !ok {
 					err = errors.New("unknown error")
 				}
-				f.Err(err)
 				f.hooks.error(id, err)
-				return nil
+				return nil, err
 			}
 
 			// THE RESULT
-			ret := fnret[0].Interface()
+			ret = fnret[0].Interface()
 			if ctx != nil {
 				ctx.Store(id, ret)
 			}
 			f.hooks.finish(id, ret)
-			return ret
+			return ret, nil
 		},
 	}
 }
@@ -221,7 +218,7 @@ func opVar(f *Flow, id string) *operation {
 		id:      id,
 		kind:    "var",
 		set:     func(params ...Data) { f.data[id] = params[0] },
-		process: func(ctx OpCtx, params ...Data) Data { return f.data[id] },
+		process: func(ctx OpCtx, params ...Data) (Data, error) { return f.data[id], nil },
 	}
 }
 
@@ -229,6 +226,6 @@ func opNil(f *Flow) *operation {
 	return &operation{
 		flow:    f,
 		kind:    "nil",
-		process: func(ctx OpCtx, params ...Data) Data { f.err = errors.New("Nil operation"); return nil },
+		process: func(ctx OpCtx, params ...Data) (Data, error) { return nil, errors.New("Nil operation") },
 	}
 }

+ 10 - 10
go/src/flow/registry/batch.go

@@ -1,14 +1,14 @@
 package registry
 
-//Batch helper to batch set properties
-type Batch []*Entry
+//EntryBatch helper to batch set properties
+type EntryBatch []*Entry
 
-// MakeBatch creates a batch
-func MakeBatch(params ...interface{}) Batch {
-	ret := Batch{}
+// Batch returns a batch of entries for easy manipulation
+func Batch(params ...interface{}) EntryBatch {
+	ret := EntryBatch{}
 	for _, el := range params {
 		switch v := el.(type) {
-		case Batch:
+		case EntryBatch:
 			ret = append(ret, v...)
 		case *Entry:
 			ret = append(ret, v)
@@ -18,7 +18,7 @@ func MakeBatch(params ...interface{}) Batch {
 }
 
 //Tags set categories of the group
-func (eg Batch) Tags(cat ...string) Batch {
+func (eg EntryBatch) Tags(cat ...string) EntryBatch {
 	for _, e := range eg {
 		e.Tags(cat...)
 	}
@@ -26,7 +26,7 @@ func (eg Batch) Tags(cat ...string) Batch {
 }
 
 // DescInputs describe inputs
-func (eg Batch) DescInputs(input ...string) Batch {
+func (eg EntryBatch) DescInputs(input ...string) EntryBatch {
 	for _, e := range eg {
 		e.DescInputs(input...)
 	}
@@ -35,7 +35,7 @@ func (eg Batch) DescInputs(input ...string) Batch {
 }
 
 // DescOutput describe inputs
-func (eg Batch) DescOutput(v string) Batch {
+func (eg EntryBatch) DescOutput(v string) EntryBatch {
 	for _, e := range eg {
 		e.DescOutput(v)
 	}
@@ -43,7 +43,7 @@ func (eg Batch) DescOutput(v string) Batch {
 }
 
 // Extra set extras of the group
-func (eg Batch) Extra(name string, value interface{}) Batch {
+func (eg EntryBatch) Extra(name string, value interface{}) EntryBatch {
 	for _, e := range eg {
 		e.Extra(name, value)
 	}

+ 1 - 1
go/src/flow/registry/batch_test.go

@@ -11,7 +11,7 @@ func TestMakeBatch(t *testing.T) {
 	a := assert.A(t)
 	r := registry.New()
 
-	b := registry.MakeBatch(
+	b := registry.Batch(
 		r.Add(strings.Split, strings.Join),
 		r.Add(strings.Compare),
 		r.Register("named", strings.Compare),

+ 2 - 2
go/src/flow/registry/entry_test.go

@@ -68,11 +68,11 @@ func TestEntryBatch(t *testing.T) {
 	a := assert.A(t)
 	r := registry.New()
 
-	b := registry.Batch{
+	b := registry.Batch(
 		registry.NewEntry(r, func() int { return 0 }),
 		registry.NewEntry(r, func() int { return 0 }),
 		registry.NewEntry(r, func() int { return 0 }),
-	}.Tags("test").Extra("name", 1)
+	).Tags("test").Extra("name", 1)
 
 	a.Eq(len(b), 3, "should have 3 items")
 	for _, e := range b {

+ 2 - 2
go/src/flow/registry/registry.go

@@ -38,9 +38,9 @@ func (r *R) Clone() *R {
 }
 
 // Add unnamed function
-func (r *R) Add(fns ...interface{}) Batch {
+func (r *R) Add(fns ...interface{}) EntryBatch {
 
-	b := Batch{}
+	b := EntryBatch{}
 	for _, fn := range fns {
 		if reflect.TypeOf(fn).Kind() != reflect.Func {
 			return nil