Browse Source

Registry, tests, coverage

luis 7 years ago
parent
commit
202bede5f0

+ 2 - 1
.drone.yml

@@ -10,7 +10,8 @@ pipeline:
     commands: 
       - go get -d ./go/src/...
       - go get -d -t ./go/src/...
-      - go test ./go/src/...
+      - go test -coverprofile=cover.out ./go/src/...
+      - go tool cover -func=cover.out
       - CGO_ENABLED=0 go build -o DIST/flowserver flowserver/cmd/flowserver  
   frontend:
     image: node:6

+ 10 - 0
browser/vue-flow/src/assets/default-theme.css

@@ -9,6 +9,7 @@
 
   /*--primary: #aaa;*/
   --primary: #57b;
+  --primary-inverse: #fff;
   --node-label: #fff;
   --node-socket: #444;
   --link-hover: #f00;
@@ -16,6 +17,15 @@
   --selector-color: var(--primary);
 }
 
+.vertical_sep {
+  width: 1px;
+  border-right: var(--primary);
+}
+
+.primary {
+  color: var(--primary);
+}
+
 button {
   color: inherit;
 }

+ 16 - 1
browser/vue-flow/src/components/flow/manager.vue

@@ -58,6 +58,8 @@
       <button @click="$emit('funcsPanelToggle')">Panel</button>
       <button @click="stickySockets=!stickySockets"> {{ stickySockets? 'Hide':'Show' }} sockets </button>
       <button @click="detailed=!detailed"> {{ detailed? 'Hide':'Show' }} detail </button>
+      <div class="vertical_sep"/>
+      <!--<button @click="sendDocumentRun">Run</button>-->
       <button v-if="panzoom.x!=0 || panzoom.y!=0 || panzoom.zoom!=1" @click="panzoomReset">
         Reset view
       </button>
@@ -97,7 +99,9 @@ export default {
       detailed: false,
       pointerLink: {active: false, props: {}, src: {}},
       selector: null,
-      nodeSelection: {}
+
+      nodeSelection: {},
+      nodeStatus: {}
     }
   },
   computed: {
@@ -191,6 +195,12 @@ export default {
         // }
       }
     })
+    this.$flowService.on('nodeStart', (v) => {
+      this.nodeStatus[v.id] = 'started'
+    })
+    this.$flowService.on('nodeFinish', (v) => {
+      this.nodeStatus[v.id] = ''
+    })
 
     this.$nextTick(() => {
       this.$forceUpdate()
@@ -426,6 +436,7 @@ export default {
           this.selector = null
         }})
     },
+
     // service events
     sendFlowEvent (type, param) {
       this.$flowService[type](param)
@@ -433,6 +444,10 @@ export default {
     sendDocumentUpdate (nodeData) {
       this.$flowService.documentUpdate(this.nodeData, this.$route.params.sessId)
     },
+    sendDocumentRun () {
+      console.log('Document running')
+      this.$flowService.documentRun(this.nodeData, this.$route.params.sessId)
+    },
     // HELPERS depending on svg ref
     createSVGPoint (x, y) {
       const p = this.$refs.svg.createSVGPoint()

+ 15 - 19
browser/vue-flow/src/components/flow/node.vue

@@ -25,7 +25,7 @@
         class="flow-node__selection"
         />-->
     <svg
-      v-if="nodeStyle.shape == 'thing'"
+      v-if="style.shape == 'thing'"
       ref="body"
       viewBox="0 0 100 100"
       preserveAspectRatio="XMinYMin"
@@ -33,19 +33,11 @@
       :class="{'flow-node__body--dragging':dragging}"
       v-bind="bodyProps"
     >
-      <path
-        d="
-      M 0 0
-      l 90 0
-      l 10 50
-      l -10 50
-      l -90 0
-      c 10 -20, 10 -80, 0 -100
-      Z
-      "
-    /></svg>
+      <path d=" M 0 0 l 90 0 l 10 50 l -10 50 l -90 0 c 10 -20, 10 -80, 0 -100 Z " />
+    </svg>
+
     <circle
-      v-else-if="nodeStyle.shape == 'circle'"
+      v-else-if="style.shape == 'circle'"
       ref="body"
       class="flow-node__body"
       :class="{'flow-node__body--dragging':dragging}"
@@ -77,7 +69,7 @@
       v-bind="inputProps(i)"
       :data-nodeid="id"
       :data-in="i"
-      :class="{'flow-node__socket--match': match.in && (inp == match.in || match.in == 'any' || input=='any')}"
+      :class="{'flow-node__socket--match': match.in && (inp == match.in || match.in == 'any' || inp=='any')}"
       :key="'in'+i"
       @mousedown.stop.prevent="socketPointerDown($event, {in:i})"
     />
@@ -131,6 +123,7 @@ export default {
     'match': {type: Object, default: () => {}},
     'dragging': {type: Boolean, default: false},
     'selected': {type: Boolean, default: false},
+    'status': {type: String, default: ''},
 
     'nodeStyle': {type: Object, default: () => {}}
   },
@@ -142,9 +135,12 @@ export default {
     }
   },
   computed: {
+    style () {
+      return this.nodeStyle || {}
+    },
     labelWrap () {
       let wrapThreshold = 10 // initial wrap threshold
-      const opt = shapeOpts[this.nodeStyle.shape] || shapeOpts.default
+      const opt = shapeOpts[this.style.shape] || shapeOpts.default
       return utils.textWrap(this.label, wrapThreshold, opt.textWrap)
     },
     labelProps () {
@@ -159,7 +155,7 @@ export default {
     bodyProps () {
       let width = this.labelRect.width + 46
       let height = Math.max(this.labelRect.height + 20, 60)
-      if (this.nodeStyle.shape === 'circle') {
+      if (this.style.shape === 'circle') {
         width = height = Math.max(width, height)
       }
 
@@ -168,10 +164,10 @@ export default {
         y: -height / 2,
         width: width,
         height: height,
-        stroke: this.nodeStyle.color || '#777',
-        fill: this.nodeStyle.color || '#777'
+        stroke: this.style.color || '#777',
+        fill: this.style.color || '#777'
       }
-      if (this.nodeStyle.shape === 'circle') {
+      if (this.style.shape === 'circle') {
         rect.r = Math.max(width / 2, height / 2)
       }
       return rect

+ 40 - 26
browser/vue-flow/src/components/main.vue

@@ -68,6 +68,7 @@
       </div>
       <!-- Node inspector -->
       <!-- Move this to a different place -->
+      <!-- And rename it to inspector -->
       <hx-modal class="flow-modal" v-if="nodeInspectTarget" @close="nodeInspectTarget=null">
         <div slot="header">Node inspector:</div>
         <div slot="body" class="flow-modal__body">
@@ -108,7 +109,10 @@
             @keydown.enter="nodeInspectTarget=null"
           >
         </div>
-        <div slot="footer"><button @click="nodeInspectTarget=false">OK</button></div>
+        <div slot="footer">
+          <button class="primary-invert" @click="nodeProcess(nodeInspectTarget)">Run</button>
+          <button @click="nodeInspectTarget=false">OK</button>
+        </div>
       </hx-modal>
 
     </div>
@@ -128,34 +132,39 @@ import '@/assets/dark-theme.css'
 import '@/assets/style.css'
 // import nodeData from './nodedata'
 
+const defRegistry = {
+  'Input': {
+    categories: ['core'],
+    output: 'any',
+    style: { color: '#686', textColor: '#fff', shape: 'circle' },
+    props: {} // should be sent in the node
+  },
+  'Variable': {
+    categories: ['core'],
+    output: 'any',
+    style: { color: '#88a', textColor: '#000' },
+    props: {init: ''}
+  },
+  'Const': {
+    categories: ['core'],
+    output: 'any',
+    style: {
+      color: '#555',
+      textColor: '#333',
+      shape: 'circle'
+    },
+    props: {value: ''}
+  }
+}
+
 export default {
   components: {FlowManager, FlowPanel, FlowNode, FlowPanzoom, HxSplit, HxModal, AppChat},
   data () {
     return {
-      registry: {
+      registry: JSON.parse(JSON.stringify(defRegistry)),
+
+      /* {
         // Fixed default stuff
-        'Input': {
-          group: 'Generic',
-          output: 'any',
-          style: { color: '#686', textColor: '#fff', shape: 'circle' },
-          props: {} // should be sent in the node
-        },
-        'Variable': {
-          group: 'Generic',
-          output: 'any',
-          style: { color: '#88a', textColor: '#000' },
-          props: {init: '', size: 0}
-        },
-        'Const': {
-          group: 'Generic',
-          output: 'any',
-          style: {
-            color: '#777',
-            textColor: '#333',
-            shape: 'circle'
-          },
-          props: {value: ''}
-        },
         'Test': {
           group: 'Generic',
           output: 'any',
@@ -175,7 +184,7 @@ export default {
 
         'string': { group: 'Visualization', inputs: ['string'], style: {'color': '#9a9'} },
         'lineGraph': { group: 'Visualization', inputs: ['[]float32', '[]float32'], style: {'color': '#9a9'} }
-      },
+      }, */
 
       nodeInspectTarget: false,
 
@@ -184,7 +193,6 @@ export default {
       funcsResizeable: false,
 
       dark: false
-
     }
   },
   watch: {
@@ -208,6 +216,9 @@ export default {
         this.$router.push('/' + v.id) // Swap to ID
       }
     })
+    this.$flowService.on('registry', (v) => {
+      this.registry = Object.assign({}, defRegistry, v.data)
+    })
     // Connected
     this.$flowService.connected(() => {
       // Make this in a service
@@ -227,6 +238,9 @@ export default {
         this.$refs.modalInput.focus()
       })
     },
+    nodeProcess (node) {
+      this.$flowService.nodeProcess(node.id)
+    },
     funcsSizeUpdate (ev, size) {
       this.funcsSize = size
     }

+ 18 - 13
browser/vue-flow/src/components/panel.vue

@@ -58,15 +58,18 @@ export default {
   },
   computed: {
     funcsGroups () {
-      let group = {}
+      // Set
+      let group = new Set()
       for (let r in this.registry) {
-        group[this.registry[r].group] = true
+        for (let c of this.registry[r].categories) {
+          group.add(c)
+        }
       }
-      return Object.keys(group)
+      return [...group]
     },
     funcsGroup () {
       return (g) => {
-        const ret = Object.keys(this.registry).filter(v => this.registry[v].group === g)
+        const ret = Object.keys(this.registry).filter(v => this.registry[v].categories.includes(g))
         return ret
       }
     }
@@ -81,20 +84,23 @@ export default {
 <style>
 .flow-funcs__container {
   white-space: nowrap;
-  width:0px;
-  transition: all .3s;
+  width:0;
+  transition: all 0.3s;
   height:100%;
   overflow-x:hidden;
   overflow-y:auto;
 }
+
 .flow-funcs__container.active {
   width:300px;
 }
+
 .flow-funcs__control {
   display:flex;
   flex-flow:row;
 
 }
+
 .flow-funcs__control .item{
   font-size:14px;
   padding:13px;
@@ -105,7 +111,6 @@ export default {
   text-overflow: ellipsis;
   min-width:50px;
   height:50px;
-
   overflow:hidden;
 }
 
@@ -117,7 +122,7 @@ export default {
 .flow-funcs__inner .hx-collapsible__header {
   font-size:14px;
   padding:5px 10px;
-  transition: all .3s;
+  transition: all 0.3s;
 }
 
 .flow-funcs__group{
@@ -125,32 +130,31 @@ export default {
   flex-flow:column;
   padding:10px;
 }
+
 .flow-funcs__src {
   font-size:12px;
   padding:11px 5px;
-
   margin-top:1px;
   text-align:center;
-  transition: all .3s;
+  transition: all 0.3s;
   position:relative;
-
   display:flex;
   justify-content: center;
   align-items: center;
-
   cursor: move;
   cursor: grab;
   cursor: -moz-grab;
   cursor: -webkit-grab;
 
 }
+
 .flow-funcs__group.blocks {
   flex-flow: row;
   flex-wrap: wrap;
-
   justify-content: flex-start;
   align-content: center;
 }
+
 .flow-funcs__group.blocks .flow-funcs__src {
   text-overflow: ellipsis;
   margin:1px;
@@ -158,6 +162,7 @@ export default {
   height:60px;
   overflow:hidden;
 }
+
 .flow-funcs__toggle {
   margin:0;
   position:relative;

+ 3 - 3
browser/vue-flow/src/services/flowservice.js

@@ -1,6 +1,6 @@
 // FlowWSService
 
-const debug = 0
+const debug = 1
 let log = () => {}
 if (debug) {
   log = console.log.bind(console.log, '%cSVC:', 'color:#0a0')
@@ -50,10 +50,10 @@ export default {
     //
     [
       'sessionNew', 'sessionLoad', // SESSION
-      'documentUpdate',
+      'documentUpdate', 'documentRun',
       'chatEvent', 'chatJoin', 'chatRename', // CHAT
       'linkAdd', 'linkUpdate', 'linkRemove', // LINK
-      'nodeUpdate', 'nodeAdd', 'nodeRemove' // NODE
+      'nodeUpdate', 'nodeAdd', 'nodeRemove', 'nodeProcess' // NODE
     ].forEach(ftyp => {
       service[ftyp] = (param, id) => {
         log('sending:', ftyp, ' -- ', param)

+ 103 - 0
go/src/flow/cover.out

@@ -0,0 +1,103 @@
+mode: set
+flow/flow.go:48.18,56.2 1 1
+flow/flow.go:59.28,61.2 1 1
+flow/flow.go:64.56,68.2 2 0
+flow/flow.go:71.79,73.27 2 1
+flow/flow.go:83.2,84.16 2 1
+flow/flow.go:88.2,89.22 2 1
+flow/flow.go:73.27,74.24 1 1
+flow/flow.go:75.19,76.17 1 0
+flow/flow.go:77.11,79.39 2 1
+flow/flow.go:84.16,87.3 2 0
+flow/flow.go:94.41,97.2 1 1
+flow/flow.go:102.65,105.27 2 1
+flow/flow.go:117.2,118.16 2 1
+flow/flow.go:123.2,123.6 1 1
+flow/flow.go:105.27,106.24 1 1
+flow/flow.go:107.19,108.17 1 1
+flow/flow.go:109.11,112.39 2 1
+flow/flow.go:118.16,121.3 2 1
+flow/flow.go:123.6,125.38 2 1
+flow/flow.go:128.3,129.25 2 1
+flow/flow.go:125.38,126.12 1 0
+flow/flow.go:135.44,139.2 3 1
+flow/flow.go:142.60,143.32 1 1
+flow/flow.go:150.2,150.23 1 1
+flow/flow.go:143.32,145.23 2 1
+flow/flow.go:148.3,148.19 1 1
+flow/flow.go:145.23,147.4 1 1
+flow/flow.go:155.42,157.2 1 1
+flow/flow.go:160.64,161.18 1 1
+flow/flow.go:164.2,165.36 2 1
+flow/flow.go:161.18,163.3 1 0
+flow/flow.go:167.91,169.27 2 1
+flow/flow.go:173.2,175.22 2 1
+flow/flow.go:186.2,187.15 2 1
+flow/flow.go:169.27,171.3 1 0
+flow/flow.go:175.22,178.32 3 1
+flow/flow.go:182.3,182.66 1 1
+flow/flow.go:178.32,181.4 2 1
+flow/flow.go:183.3,185.3 1 1
+flow/flow.go:191.53,192.14 1 1
+flow/flow.go:195.2,196.34 2 1
+flow/flow.go:192.14,194.3 1 0
+flow/flow.go:196.34,200.32 3 1
+flow/flow.go:212.3,219.36 5 1
+flow/flow.go:200.32,202.14 1 1
+flow/flow.go:205.4,206.20 2 1
+flow/flow.go:210.4,210.56 1 1
+flow/flow.go:202.14,204.5 1 1
+flow/flow.go:206.20,209.5 2 0
+flow/flow.go:227.32,231.29 3 1
+flow/flow.go:234.2,235.27 2 1
+flow/flow.go:239.2,240.33 2 1
+flow/flow.go:251.2,251.21 1 1
+flow/flow.go:231.29,233.3 1 1
+flow/flow.go:235.27,237.3 1 1
+flow/flow.go:240.33,242.31 2 1
+flow/flow.go:248.3,248.26 1 1
+flow/flow.go:242.31,243.14 1 1
+flow/flow.go:246.4,246.46 1 1
+flow/flow.go:243.14,245.5 1 1
+flow/flow.go:255.46,262.33 4 1
+flow/flow.go:272.2,276.27 4 1
+flow/flow.go:262.33,264.31 2 1
+flow/flow.go:270.3,270.42 1 1
+flow/flow.go:264.31,269.4 1 1
+flow/flow.go:284.75,286.2 1 1
+flow/flow.go:288.62,289.31 1 1
+flow/flow.go:289.31,291.3 1 1
+flow/operation.go:18.23,20.2 1 1
+flow/operation.go:23.30,23.32 0 0
+flow/operation.go:44.33,46.2 1 0
+flow/operation.go:49.50,51.2 1 1
+flow/operation.go:54.68,55.23 1 1
+flow/operation.go:58.2,58.16 1 1
+flow/operation.go:61.2,61.25 1 1
+flow/operation.go:64.2,67.12 3 1
+flow/operation.go:55.23,57.3 1 1
+flow/operation.go:58.16,60.3 1 0
+flow/operation.go:61.25,63.3 1 1
+flow/operation.go:71.41,73.2 1 1
+flow/operation.go:79.39,85.49 1 1
+flow/operation.go:85.49,86.35 1 1
+flow/operation.go:90.4,90.21 1 1
+flow/operation.go:86.35,89.5 2 0
+flow/operation.go:94.42,100.49 1 1
+flow/operation.go:100.49,103.4 2 1
+flow/operation.go:106.44,112.49 1 1
+flow/operation.go:112.49,114.11 2 1
+flow/operation.go:118.4,119.33 2 1
+flow/operation.go:129.4,138.14 4 1
+flow/operation.go:114.11,117.5 2 0
+flow/operation.go:119.33,121.18 2 1
+flow/operation.go:125.5,125.39 1 1
+flow/operation.go:121.18,124.6 2 0
+flow/operation.go:143.43,148.33 1 1
+flow/operation.go:148.33,148.59 1 1
+flow/operation.go:149.49,149.70 1 1
+flow/operation.go:153.32,157.49 1 1
+flow/operation.go:157.49,157.100 2 0
+flow/utils.go:8.28,12.16 3 1
+flow/utils.go:16.2,18.8 2 1
+flow/utils.go:12.16,15.3 2 0

+ 7 - 6
go/src/flow/flow.go

@@ -11,10 +11,11 @@ import (
 	"reflect"
 )
 
+// Global
 var (
-	globalRegistry = registry.New()
-	// Register a func globally
-	Register = globalRegistry.Register
+	Global       = registry.New()
+	Register     = Global.Register
+	Descriptions = Global.Descriptions
 )
 
 // Data interface
@@ -46,7 +47,7 @@ type Flow struct {
 // New create a new flow
 func New() *Flow {
 	return &Flow{
-		registry: globalRegistry,
+		registry: Global,
 		// Data
 		consts:     []Data{},
 		data:       map[string]Data{},
@@ -82,7 +83,7 @@ func (f *Flow) DefOp(id string, name string, params ...interface{}) Operation {
 	executor, err := f.registry.Get(name)
 	if err != nil {
 		f.err = err
-		return nil
+		return opNil(f)
 	}
 	f.operations[id] = opEntry{name, inputs, executor}
 	return opFunc(f, id)
@@ -116,7 +117,7 @@ func (f *Flow) Op(name string, params ...interface{}) Operation {
 	executor, err := f.registry.Get(name)
 	if err != nil {
 		f.err = err
-		return nil
+		return opNil(f)
 	}
 	// generate ID
 	for {

+ 69 - 11
go/src/flow/flow_test.go

@@ -7,6 +7,8 @@ import (
 
 	"flow"
 
+	vecasm "github.com/gohxs/vec-benchmark/asm"
+
 	. "flow/internal/check"
 )
 
@@ -50,7 +52,7 @@ func TestConst(t *testing.T) {
 
 	c := f.Const(1)
 	res := c.Process()
-	Check(t).Eq(res, 1)
+	Expect(t).Eq(res, 1)
 }
 func TestOp(t *testing.T) {
 	f := flow.New()
@@ -63,10 +65,10 @@ func TestOp(t *testing.T) {
 		[]float32{1, 2, 3},
 	)
 	res, err := f.Run(add)
-	Check(t).Eq(err, nil)
+	Expect(t).Eq(err, nil)
 
 	test := []float32{3, 6, 9}
-	Check(t).Eq(test, res)
+	Expect(t).Eq(test, res)
 }
 
 func TestVariable(t *testing.T) {
@@ -74,11 +76,11 @@ func TestVariable(t *testing.T) {
 	v := f.Var("v1", 1)
 
 	res := v.Process()
-	Check(t).Eq(res, 1)
+	Expect(t).Eq(res, 1)
 
 	v.Set(2)
 	res = v.Process()
-	Check(t).Eq(res, 2)
+	Expect(t).Eq(res, 2)
 }
 
 func TestCache(t *testing.T) {
@@ -88,7 +90,7 @@ func TestCache(t *testing.T) {
 		var res interface{}
 		for i := 1; i < 5; i++ {
 			res = r.Process()
-			Check(t).Eq(res, i)
+			Expect(t).Eq(res, i)
 		}
 	}
 	{
@@ -96,9 +98,9 @@ func TestCache(t *testing.T) {
 		inc := f.Op("inc")
 		add := f.Op("add", inc, inc)
 		res = add.Process() // 1+1
-		Check(t).Eq(res, 2)
+		Expect(t).Eq(res, 2)
 		res = add.Process() // 2+2
-		Check(t).Eq(res, 4)
+		Expect(t).Eq(res, 4)
 	}
 }
 func TestReference(t *testing.T) {
@@ -107,14 +109,14 @@ func TestReference(t *testing.T) {
 	f.DefOp("1", "vecadd", []float32{1, 2, 3}, []float32{1, 2, 3}) // result 2 4 6
 
 	op := f.Op("vecmul", f.Res("1"), []float32{2, 2, 2}) // 4 8 12
-	Check(t).Err(f.Err())
+	Expect(t).NotErr(f.Err())
 
-	Check(t).Diff(nil, op)
+	Expect(t).Diff(nil, op)
 
 	desired := []float32{4, 8, 12}
 
 	res := op.Process()
-	Check(t).Eq(res, desired)
+	Expect(t).Eq(res, desired)
 }
 
 func TestHandler(t *testing.T) {
@@ -123,7 +125,14 @@ func TestHandler(t *testing.T) {
 		t.Log("Something happened:", name, payLoad)
 	})
 	op.Process()
+}
+
+func TestRegistry(t *testing.T) {
+	f := flow.New()
+	op := f.Op("unknown", 1, 2)
+	op.Process()
 
+	Expect(t).Err(f.Err())
 }
 
 func BenchmarkComplex(b *testing.B) {
@@ -169,3 +178,52 @@ func prepareComplex() (*flow.Flow, flow.Operation) {
 
 	return f, div1
 }
+
+func VecMul(a, b []float32) []float32 {
+
+	sz := Min(len(a), len(b))
+
+	out := make([]float32, sz)
+	vecasm.VecMulf32x8(a, b, out)
+	return out
+}
+func VecAdd(a, b []float32) []float32 {
+	sz := Min(len(a), len(b))
+	out := make([]float32, sz)
+	for i := 0; i < sz; i++ {
+		out[i] = a[i] + b[i]
+	}
+	return out
+}
+func VecDiv(a, b []float32) []float32 {
+	sz := Min(len(a), len(b))
+	out := make([]float32, sz)
+	for i := 0; i < sz; i++ {
+		out[i] = a[i] / b[i]
+	}
+	return out
+}
+
+// ScalarInt
+// Every time this operator is called we increase 1
+func Inc() func() int {
+	i := 0
+	return func() int {
+		i++
+		return i
+	}
+}
+func Add(a, b int) int {
+	return a + b
+}
+
+// Utils
+func Min(p ...int) int {
+	min := p[0]
+	for _, v := range p[1:] {
+		if min < v {
+			min = v
+		}
+	}
+	return min
+}

+ 0 - 52
go/src/flow/flowcommon_test.go

@@ -1,52 +0,0 @@
-package flow_test
-
-import vecasm "github.com/gohxs/vec-benchmark/asm"
-
-func VecMul(a, b []float32) []float32 {
-
-	sz := Min(len(a), len(b))
-
-	out := make([]float32, sz)
-	vecasm.VecMulf32x8(a, b, out)
-	return out
-}
-func VecAdd(a, b []float32) []float32 {
-	sz := Min(len(a), len(b))
-	out := make([]float32, sz)
-	for i := 0; i < sz; i++ {
-		out[i] = a[i] + b[i]
-	}
-	return out
-}
-func VecDiv(a, b []float32) []float32 {
-	sz := Min(len(a), len(b))
-	out := make([]float32, sz)
-	for i := 0; i < sz; i++ {
-		out[i] = a[i] / b[i]
-	}
-	return out
-}
-
-// ScalarInt
-// Every time this operator is called we increase 1
-func Inc() func() int {
-	i := 0
-	return func() int {
-		i++
-		return i
-	}
-}
-func Add(a, b int) int {
-	return a + b
-}
-
-// Utils
-func Min(p ...int) int {
-	min := p[0]
-	for _, v := range p[1:] {
-		if min < v {
-			min = v
-		}
-	}
-	return min
-}

+ 24 - 11
go/src/flow/internal/check/check.go

@@ -6,8 +6,8 @@ import (
 	"testing"
 )
 
-//Check return a checker
-func Check(t *testing.T) Checker {
+//Expect return a checker
+func Expect(t *testing.T) Checker {
 	return Checker{T: t}
 }
 
@@ -16,29 +16,42 @@ type Checker struct {
 	*testing.T
 }
 
-//Err check for error
-func (t Checker) Err(err error) {
+//NotErr check for error
+func (t Checker) NotErr(err error) {
 	msg := fmt.Sprintf("Checking for error: %v", err)
 	if err != nil {
-		t.Fatalf("\033[31m[FAIL] %s\033[0m", msg)
+		t.fail(msg)
 	}
-	t.Logf("\033[32m[PASS]\033[m %s", msg)
+	t.pass(msg)
+}
+func (t Checker) Err(err error) {
+	msg := fmt.Sprintf("Checking for error: %v", err)
+	if err == nil {
+		t.fail(msg)
+	}
+	t.pass(msg)
 }
 
 //Eq check if equal
 func (t Checker) Eq(a, b interface{}) {
-	msg := fmt.Sprintf("Expect %v got %v", a, b)
+	msg := fmt.Sprintf("Expect '%v' got '%v'", a, b)
 	if !reflect.DeepEqual(a, b) {
-		t.Fatalf("\033[31m[FAIL] %s\033[0m", msg)
+		t.fail(msg)
 	}
-	t.Logf("\033[32m[PASS]\033[m %s", msg)
+	t.pass(msg)
 }
 
 //Diff check if different
 func (t Checker) Diff(a, b interface{}) {
-	msg := fmt.Sprintf("Expect NOT %v got %v", a, b)
+	msg := fmt.Sprintf("Expect NOT '%v' got '%v'", a, b)
 	if reflect.DeepEqual(a, b) {
-		t.Fatalf("\033[31m[FAIL] %s\033[0m", msg)
+		t.fail(msg)
 	}
+	t.pass(msg)
+}
+func (t Checker) fail(msg string) {
+	t.Fatalf("\033[31m[FAIL] %s\033[0m", msg)
+}
+func (t Checker) pass(msg string) {
 	t.Logf("\033[32m[PASS]\033[m %s", msg)
 }

+ 18 - 5
go/src/flow/operation.go

@@ -24,7 +24,7 @@ func dumbSet(params ...Data) {}
 
 // Operation interface
 type Operation interface { // Id perhaps?
-	Id() string
+	ID() string
 	Set(inputs ...Data) // Special var method
 	Process(params ...Data) Data
 }
@@ -41,7 +41,7 @@ type operation struct {
 }
 
 // Id returns string Id of the operaton
-func (o *operation) Id() string {
+func (o *operation) ID() string {
 	return o.id.(string)
 }
 
@@ -72,6 +72,10 @@ func (o *operation) Set(params ...Data) {
 	o.set(params...)
 }
 
+//////////////////////////////////////
+// Operators definition
+///////////////
+
 func opIn(f *Flow, id int) *operation {
 	return &operation{
 		flow: f,
@@ -108,7 +112,7 @@ func opFunc(f *Flow, id string) *operation {
 		process: func(ctx OpCtx, params ...Data) Data {
 			op, ok := f.operations[id]
 			if !ok {
-				f.err = fmt.Errorf("invalid operation %s", id)
+				f.err = fmt.Errorf("invalid operation '%s'", id)
 				return nil
 			}
 			callParam := make([]reflect.Value, len(op.inputs))
@@ -121,12 +125,13 @@ func opFunc(f *Flow, id string) *operation {
 				callParam[i] = reflect.ValueOf(fr)
 
 			}
+
 			f.trigger("nodeStart", map[string]Data{
-				"op": op,
+				"id": id,
 			})
 			ret := reflect.ValueOf(op.executor).Call(callParam)[0].Interface()
 			f.trigger("nodeFinish", map[string]Data{
-				"op":     op,
+				"id":     id,
 				"result": ret,
 			})
 
@@ -144,3 +149,11 @@ func opVar(f *Flow, id string) *operation {
 		process: func(ctx OpCtx, params ...Data) Data { return f.data[id] },
 	}
 }
+
+func opNil(f *Flow) *operation {
+	return &operation{
+		flow:    f,
+		kind:    "nil",
+		process: func(ctx OpCtx, params ...Data) Data { f.err = errors.New("Nil operation"); return nil },
+	}
+}

+ 58 - 0
go/src/flow/registry/entry.go

@@ -0,0 +1,58 @@
+package registry
+
+import (
+	"fmt"
+	"reflect"
+)
+
+// Entry contains a function description params etc
+type Entry struct {
+	fn          interface{}
+	Description *Description
+	err         error
+}
+
+// NewEntry creates and describes a New Entry
+func NewEntry(fn interface{}) *Entry {
+	e := &Entry{fn: fn}
+
+	fntyp := reflect.TypeOf(e.fn)
+	if fntyp.Kind() != reflect.Func {
+		e.err = ErrNotAFunc
+		return e
+	}
+	if fntyp.NumOut() == 0 {
+		e.err = ErrOutput
+		return e
+	}
+
+	fnTyp := reflect.TypeOf(e.fn)
+	Inputs := []string{}
+	nInputs := fnTyp.NumIn()
+	for i := 0; i < nInputs; i++ {
+		Inputs = append(Inputs, fmt.Sprint(fnTyp.In(i)))
+	}
+	Output := fmt.Sprint(fnTyp.Out(0))
+
+	e.Description = &Description{
+		Categories: []string{"generic"},
+		Inputs:     Inputs,
+		Output:     Output,
+	}
+	return e
+}
+
+// Err returns error of the entry if any
+func (e *Entry) Err() error {
+	return e.err
+}
+
+//Categories of the entry
+func (e *Entry) Categories(cat ...string) *Entry {
+	if e.err != nil {
+		return e
+	}
+	e.Description.Categories = cat
+
+	return e
+}

+ 25 - 0
go/src/flow/registry/entry_test.go

@@ -0,0 +1,25 @@
+package registry_test
+
+import (
+	. "flow/internal/check"
+	"flow/registry"
+	"testing"
+)
+
+func TestRegister(t *testing.T) {
+	exp := Expect(t)
+	r := registry.New()
+	{
+		e := r.Register("func", func(a, b int) int { return 0 })
+		exp.NotErr(e.Err())
+	}
+	{
+		e := r.Register("func", func(a int) int { return 0 })
+		exp.NotErr(e.Err())
+	}
+	{
+		e := r.Register("func", func(a int) {})
+		exp.Err(e.Err())
+	}
+
+}

+ 22 - 49
go/src/flow/registry/registry.go

@@ -5,19 +5,12 @@ import (
 	"reflect"
 )
 
-// Global
-
 // Description of an entry
 type Description struct {
-	Name     string
-	Category []string
-	Inputs   []string
-	Output   string
-}
-
-// Entry contains a function description params etc
-type Entry struct {
-	fn interface{}
+	Name       string   `json:"name"`
+	Categories []string `json:"categories"`
+	Inputs     []string `json:"inputs"`
+	Output     string   `json:"output"`
 }
 
 // Registry function registry
@@ -31,17 +24,16 @@ func New() *Registry {
 }
 
 //Register should be a func only register
-func (r *Registry) Register(name string, v interface{}) error {
-	fntyp := reflect.TypeOf(v)
-	if fntyp.Kind() != reflect.Func {
-		return ErrNotAFunc
-	}
-	if fntyp.NumOut() == 0 {
-		return ErrOutput
+func (r *Registry) Register(name string, v interface{}) *Entry {
+
+	e := NewEntry(v)
+	if e.err != nil {
+		return e
 	}
+	// build description here
 
-	r.data[name] = &Entry{fn: v}
-	return nil
+	r.data[name] = e
+	return e
 }
 
 // Get an entry
@@ -66,41 +58,22 @@ func (r *Registry) Get(name string, params ...interface{}) (interface{}, error)
 	return v, nil
 }
 
-//Describe named fn
-func (r *Registry) Describe(name string) (*Description, error) {
+// Entry fetches entries from the register
+func (r *Registry) Entry(name string) (*Entry, error) {
 	e, ok := r.data[name]
 	if !ok {
-		return nil, fmt.Errorf("Entry '%s' not found", name)
+		return nil, ErrNotFound
 	}
-	fnTyp := reflect.TypeOf(e.fn)
-	Inputs := []string{}
-
-	nInputs := fnTyp.NumIn()
-	for i := 0; i < nInputs; i++ {
-		Inputs = append(Inputs, fmt.Sprint(fnTyp.In(i)))
-	}
-	Output := fmt.Sprint(fnTyp.Out(0))
-	desc := &Description{
-		Name:     name,
-		Category: []string{"generic"},
-		Inputs:   Inputs,
-		Output:   Output,
-	}
-
-	return desc, nil
+	return e, nil
 }
 
+//Describe named fn
+
 // Descriptions Description list
-func (r *Registry) Descriptions() ([]Description, error) {
-	ret := make([]Description, len(r.data))
-	i := 0
-	for k := range r.data {
-		d, err := r.Describe(k)
-		if err != nil {
-			return nil, err
-		}
-		ret[i] = *d
-		i++
+func (r *Registry) Descriptions() (map[string]Description, error) {
+	ret := map[string]Description{}
+	for k, e := range r.data {
+		ret[k] = *e.Description
 	}
 	return ret, nil
 }

+ 14 - 8
go/src/flow/registry/registry_test.go

@@ -8,39 +8,45 @@ import (
 )
 
 func TestRegistryGet(t *testing.T) {
-	c := Check(t)
+	expect := Expect(t)
 
 	r := registry.New()
 	r.Register("vecadd", dummy1)
 
 	fn, err := r.Get("vecadd")
 
-	c.Err(err)
-	c.Diff(nil, fn)
+	expect.NotErr(err)
+	expect.Diff(nil, fn)
 
 }
 
 func TestRegistry(t *testing.T) {
-	c := Check(t)
+	expect := Expect(t)
 
 	r := registry.New()
 	r.Register("vecadd", dummy1)
 
-	d, err := r.Describe("vecadd")
-	c.Err(err)
+	e, err := r.Entry("vecadd")
+	expect.NotErr(err)
+
+	d := e.Description
+
+	expect.Eq(2, len(d.Inputs))
+	expect.Eq(d.Output, "[]float32")
 
 	t.Log(d)
 
 }
 func TestDescriptions(t *testing.T) {
-	c := Check(t)
+	expect := Expect(t)
 
 	r := registry.New()
 	r.Register("vecadd", dummy1)
 	r.Register("vecstr", dummy2)
 
 	d, err := r.Descriptions()
-	c.Err(err)
+	expect.NotErr(err)
+	expect.Eq(2, len(d))
 
 	t.Log(d)
 

+ 56 - 0
go/src/flowserver/cmd/flowserver/main.go

@@ -1,7 +1,12 @@
 package main
 
 import (
+	"flow"
 	"flowserver"
+	"fmt"
+	"math/rand"
+	"strings"
+	"time"
 
 	"github.com/gohxs/prettylog"
 )
@@ -9,7 +14,58 @@ import (
 func main() {
 	prettylog.Global()
 
+	flow.Register("str.cat", strCat)
+	flow.Register("str.reverse", strReverse)
+	flow.Register("str.split", strings.Split)
+	flow.Register("str.join", strings.Join)
+
+	flow.Register("delay.reverse", delayReverse)
+	flow.Register("delay.split", delaySplit)
+	flow.Register("delay.join", delayJoin)
+	flow.Register("duration", duration)
+
 	f := flowserver.FlowServer{}
 	f.ListenAndServe()
+}
+
+/* Sample funcs */
+func strCat(a1, a2 string) string {
+	return a1 + a2
+}
+
+func strReverse(s string) string {
+	n := len(s)
+	runes := make([]rune, n)
+	for _, r := range s {
+		n--
+		runes[n] = r
+	}
+	return string(runes[n:])
+}
+
+func delayCat(a1, a2 string) string {
+	time.Sleep(time.Duration(2+rand.Intn(10)) * time.Second) // Simulate
+	return strCat(a1, a2)
+}
+
+func delayReverse(s string) string {
+	time.Sleep(time.Duration(2+rand.Intn(10)) * time.Second) // Simulate
+	return strReverse(s)
+
+}
+func delaySplit(s, sep string) []string {
+	time.Sleep(time.Duration(2+rand.Intn(10)) * time.Second) // Simulate
+	return strings.Split(s, sep)
+}
+func delayJoin(s []string, join string) string {
+	time.Sleep(time.Duration(2+rand.Intn(10)) * time.Second) // Simulate
+	return strings.Join(s, join)
+}
+
+func duration(s string) string {
+	mark := time.Now()
+	time.Sleep(time.Duration(2+rand.Intn(10)) * time.Second) // Simulate
+	now := time.Now()
 
+	return s + " took: " + fmt.Sprint(now.Sub(mark))
 }

+ 11 - 7
go/src/flowserver/noderunner.go

@@ -25,13 +25,15 @@ type FlowDocument struct {
 	Links []Link `json:"links"`
 }
 
-//NodeRun
-func FlowBuild(rawData []byte) *flow.Flow {
+// FlowBuild build a flowGraph
+func FlowBuild(rawData []byte) (*flow.Flow, error) {
 	doc := FlowDocument{[]Node{}, []Link{}}
-	json.Unmarshal(rawData, &doc)
+	err := json.Unmarshal(rawData, &doc)
+	if err != nil {
+		return nil, err
+	}
 
 	f := flow.New()
-
 	nodeMap := map[string]Node{}
 	for _, n := range doc.Nodes {
 		nodeMap[n.ID] = n
@@ -62,16 +64,18 @@ func FlowBuild(rawData []byte) *flow.Flow {
 				}
 				param[l.In] = inOp // By id perhaps
 			case "Variable":
-				param[l.In] = f.Var(from.ID, "Temporary")
+				param[l.In] = f.Var(from.ID, from.Prop["init"])
+			case "Const":
+				param[l.In] = f.Const(from.Prop["value"])
 			default:
 				param[l.In] = f.Res(from.ID)
 			}
 		}
 		param = param[:lastParamID+1]
-		if n.Src == "Input" || n.Src == "Variable" {
+		if n.Src == "Input" || n.Src == "Variable" || n.Src == "Const" {
 			continue
 		}
 		f.DefOp(n.ID, n.Src, param...)
 	}
-	return f
+	return f, nil
 }

+ 1 - 2
go/src/flowserver/flowserver.go

@@ -19,8 +19,7 @@ import (
 //
 
 // FlowServer structure
-type FlowServer struct {
-}
+type FlowServer struct{}
 
 // ListenAndServe starts the httpserver
 // It will listen on default port 2015 and increase if port is in use

+ 86 - 44
go/src/flowserver/session.go

@@ -2,6 +2,8 @@ package flowserver
 
 import (
 	"encoding/json"
+	"errors"
+	"flow"
 	"flowserver/flowmsg"
 	"log"
 	"sync"
@@ -18,93 +20,133 @@ type FlowSession struct {
 	Chat    ChatRoom
 
 	RawDoc []byte // Just share data
+
+	flow *flow.Flow
 }
 
 //NewSession creates and initializes a NewSession
 func NewSession(ID string) *FlowSession {
-	s := &FlowSession{sync.Mutex{}, ID, []*websocket.Conn{}, ChatRoom{}, []byte{}}
+	s := &FlowSession{sync.Mutex{}, ID, []*websocket.Conn{}, ChatRoom{}, []byte{}, nil}
 	return s
 }
 
 // ClientAdd add a client to session
-func (f *FlowSession) ClientAdd(c *websocket.Conn) error {
-	f.Lock()
-	defer f.Unlock()
-
-	f.clients = append(f.clients, c)
-	dataMap := map[string]interface{}{}
-	json.Unmarshal(f.RawDoc, &dataMap)
+func (s *FlowSession) ClientAdd(c *websocket.Conn) error {
+	s.Lock()
+	defer s.Unlock()
+	err := c.WriteJSON(flowmsg.SendMessage{OP: "sessionJoin", ID: s.ID})
+	if err != nil {
+		return err
+	}
+	desc, err := flow.Descriptions()
+	if err != nil {
+		return err
+	}
+	err = c.WriteJSON(flowmsg.SendMessage{OP: "registry", Data: desc})
+	if err != nil {
+		return err
+	}
+	s.clients = append(s.clients, c)
 
-	return c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: dataMap})
+	if len(s.RawDoc) > 0 {
+		return c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
+	}
+	return nil
 
+	// Send registry
 }
 
 // ClientRemove remove client from Session
-func (f *FlowSession) ClientRemove(c *websocket.Conn) {
-	f.Lock()
-	defer f.Unlock()
-	for i, cl := range f.clients {
+func (s *FlowSession) ClientRemove(c *websocket.Conn) {
+	s.Lock()
+	defer s.Unlock()
+	for i, cl := range s.clients {
 		if cl == c {
-			f.clients = append(f.clients[:i], f.clients[i+1:]...)
+			s.clients = append(s.clients[:i], s.clients[i+1:]...)
 			break
 		}
 	}
-	f.Chat.ClientRemove(c)
+	s.Chat.ClientRemove(c)
 }
 
 // ChatJoin the chat room on this session
-func (f *FlowSession) ChatJoin(c *websocket.Conn, handle string) {
-	f.Chat.ClientAdd(c, handle)
+func (s *FlowSession) ChatJoin(c *websocket.Conn, handle string) {
+	s.Chat.ClientAdd(c, handle)
 }
 
 // DocumentUpdate client c Updates the session document
-func (f *FlowSession) DocumentUpdate(c *websocket.Conn, data []byte) error {
-	f.Lock()
-	defer f.Unlock()
-	log.Println("Document updating")
+func (s *FlowSession) DocumentUpdate(c *websocket.Conn, data []byte) error {
+	s.Lock()
+	defer s.Unlock()
 
-	f.RawDoc = data // Update
-	dataMap := map[string]interface{}{}
+	s.RawDoc = data
 
-	// contextual lock
-	err := json.Unmarshal(f.RawDoc, &dataMap)
-	if err != nil {
-		return err
-	}
+	return s.broadcast(c, flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
 
-	return f.broadcast(c, flowmsg.SendMessage{OP: "document", Data: dataMap})
 }
 
 // Document send document to client c
-func (f *FlowSession) Document(c *websocket.Conn) error {
-	f.Lock()
-	defer f.Unlock()
+func (s *FlowSession) Document(c *websocket.Conn) error {
+	s.Lock()
+	defer s.Unlock()
 
-	dataMap := map[string]interface{}{}
-	json.Unmarshal(f.RawDoc, &dataMap)
-	return c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: dataMap})
+	return c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
 }
 
 // NodeRun a node triggering results
 // Build a flow and run
-func (f *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
-	FlowBuild(f.RawDoc)
+func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
+	var err error
+	ID := string(data[1 : len(data)-1]) // remove " instead of unmarshalling json
+	log.Println("Node will run", ID)
+	if s.flow != nil {
+		return errors.New("node already running")
+	}
+
+	log.Printf("Building flow from '%s'\n", string(s.RawDoc))
+	s.flow, err = FlowBuild(s.RawDoc)
+	if err != nil {
+		return err
+	}
+
+	log.Println("Flow--\n", s.flow)
+	defer func() {
+		s.flow = nil
+	}()
+
+	log.Println("Attaching hooks")
+	s.flow.Handle(func(name string, payload map[string]flow.Data) {
+		// Add to executing nodes
+		log.Println("Exec doc", name)
+		s.Broadcast(nil, flowmsg.SendMessage{OP: name, ID: payload["id"].(string), Data: payload})
+	})
+
+	log.Println("Fetch result")
+	op := s.flow.Res(ID)
+	if s.flow.Err() != nil {
+		return err
+	}
+
+	res := op.Process()
+	if s.flow.Err() != nil {
+		return err
+	}
 
-	//	f := flow.New()
+	log.Println("Node Result:", res)
 
 	return nil
 }
 
 // Broadcast broadcast a message in session besides C
-func (f *FlowSession) Broadcast(c *websocket.Conn, v interface{}) error {
-	f.Lock()
-	defer f.Unlock()
-	return f.broadcast(c, v)
+func (s *FlowSession) Broadcast(c *websocket.Conn, v interface{}) error {
+	s.Lock()
+	defer s.Unlock()
+	return s.broadcast(c, v)
 }
 
 //
-func (f *FlowSession) broadcast(c *websocket.Conn, v interface{}) error {
-	for _, sc := range f.clients {
+func (s *FlowSession) broadcast(c *websocket.Conn, v interface{}) error {
+	for _, sc := range s.clients {
 		if sc == c { // ours
 			continue
 		}

+ 3 - 10
go/src/flowserver/sessionmgr.go

@@ -110,8 +110,7 @@ func (fsm *FlowSessionManager) ServeHTTP(w http.ResponseWriter, r *http.Request)
 			err = func() error {
 				log.Println("We want a new session so")
 				sess = fsm.CreateSession()
-				sess.ClientAdd(c)
-				return c.WriteJSON(flowmsg.SendMessage{OP: "sessionJoin", ID: sess.ID})
+				return sess.ClientAdd(c)
 			}()
 			//////////////////////////////////
 			// LOADSESSION request
@@ -125,14 +124,9 @@ func (fsm *FlowSessionManager) ServeHTTP(w http.ResponseWriter, r *http.Request)
 				}
 				sess, err = fsm.LoadSession(sessID) // Set our session
 				if err != nil {
-					log.Println("Err:", err)
 					return err
 				}
-				err = sess.ClientAdd(c)
-				if err != nil {
-					return err
-				}
-				return c.WriteJSON(flowmsg.SendMessage{OP: "sessionJoin", ID: m.ID})
+				return sess.ClientAdd(c)
 			}()
 
 		///////////////////////
@@ -153,7 +147,7 @@ func (fsm *FlowSessionManager) ServeHTTP(w http.ResponseWriter, r *http.Request)
 				}
 				return sess.Broadcast(c, flowmsg.SendMessage{OP: m.OP, Data: m.Data})
 			}()
-		case "nodeRun":
+		case "nodeProcess":
 			err = func() error {
 				if sess == nil {
 					return errors.New("nodeRun: invalid session")
@@ -194,7 +188,6 @@ func (fsm *FlowSessionManager) ServeHTTP(w http.ResponseWriter, r *http.Request)
 
 		if err != nil {
 			log.Println("Err Writing", err)
-			break
 		}
 	}
 	log.Println("ws Is disconnecting", r.RemoteAddr)