Преглед изворни кода

flow registry helpers, preparing demo

luis пре 7 година
родитељ
комит
c53640cc0b

+ 8 - 0
browser/vue-flow/src/assets/style.css

@@ -82,6 +82,14 @@ input {
   padding: 10px;
   padding: 10px;
 }
 }
 
 
+textarea {
+  outline: none;
+  background: #fff;
+  width: 100%;
+  border: none;
+  padding: 10px;
+}
+
 /* Let's get this party started */
 /* Let's get this party started */
 ::-webkit-scrollbar {
 ::-webkit-scrollbar {
   padding-top: 10px;
   padding-top: 10px;

+ 9 - 3
browser/vue-flow/src/components/flow/manager.vue

@@ -76,7 +76,7 @@
 import FlowNode from './node'
 import FlowNode from './node'
 import FlowLink from './link'
 import FlowLink from './link'
 import FlowPanZoom from './panzoom'
 import FlowPanZoom from './panzoom'
-import SvgDefs from './svgdefwrapper.vue'
+import SvgDefs from './svgdefswrapper'
 import utils from '@/utils/utils'
 import utils from '@/utils/utils'
 
 
 export default {
 export default {
@@ -117,6 +117,10 @@ export default {
           }
           }
         }
         }
         const nodeClass = this.registry[node.src]
         const nodeClass = this.registry[node.src]
+        if (!nodeClass) {
+          this.nodeRemove(node)
+          return
+        }
         return {
         return {
           transform: `translate(${node.x} ${node.y})`,
           transform: `translate(${node.x} ${node.y})`,
           id: node.id,
           id: node.id,
@@ -139,6 +143,9 @@ export default {
 
 
         const refFrom = this.$refs.nodes.find(n => n.id === link.from)
         const refFrom = this.$refs.nodes.find(n => n.id === link.from)
         const refTo = this.$refs.nodes.find(n => n.id === link.to)
         const refTo = this.$refs.nodes.find(n => n.id === link.to)
+        if (!refFrom) { // delete link
+          return {}
+        }
 
 
         const fromOutput = refFrom.outputPos(0) // only 1 output
         const fromOutput = refFrom.outputPos(0) // only 1 output
         const toInput = refTo.inputPos(link.in)
         const toInput = refTo.inputPos(link.in)
@@ -324,7 +331,7 @@ export default {
           const output = this.registry[nodeFrom.src].output
           const output = this.registry[nodeFrom.src].output
           const input = this.registry[nodeTo.src].inputs[link.in]
           const input = this.registry[nodeTo.src].inputs[link.in]
           // Type checking
           // Type checking
-          if (!(output === 'any' || output === input || input === 'any')) {
+          if (!(output === 'interface {}' || output === input || input === 'interface {}')) {
             console.error('LINK: Invalid type')
             console.error('LINK: Invalid type')
             return
             return
           }
           }
@@ -414,7 +421,6 @@ export default {
       this.sendDocumentUpdate()
       this.sendDocumentUpdate()
     },
     },
     linkPointerClick (ev, link) {
     linkPointerClick (ev, link) {
-      console.log('Link click')
       ev.preventDefault()
       ev.preventDefault()
       this.linkRemove(link)
       this.linkRemove(link)
     },
     },

+ 0 - 1
browser/vue-flow/src/components/flow/node-activity.vue

@@ -67,7 +67,6 @@ export default {
     }
     }
   },
   },
   mounted () {
   mounted () {
-    console.log('Activity', this.activity)
     this._timeOut = setTimeout(this.updateTime, 999)
     this._timeOut = setTimeout(this.updateTime, 999)
   },
   },
   beforeDestroy () {
   beforeDestroy () {

+ 2 - 2
browser/vue-flow/src/components/flow/node.vue

@@ -72,7 +72,7 @@
       v-bind="inputProps(i)"
       v-bind="inputProps(i)"
       :data-nodeid="id"
       :data-nodeid="id"
       :data-in="i"
       :data-in="i"
-      :class="{'flow-node__socket--match': match.in && (inp == match.in || match.in == 'any' || inp=='any')}"
+      :class="{'flow-node__socket--match': match.in && (inp == match.in || match.in == 'interface {}' || inp=='interface {}')}"
       :key="'in'+i"
       :key="'in'+i"
       @mousedown.stop.prevent="socketPointerDown($event, {in:i})"
       @mousedown.stop.prevent="socketPointerDown($event, {in:i})"
     />
     />
@@ -83,7 +83,7 @@
       :data-nodeid="id"
       :data-nodeid="id"
       :data-out="0"
       :data-out="0"
       :key="'out'+0"
       :key="'out'+0"
-      :class="{ 'flow-node__socket--match': match.out && (output == match.out || match.out == 'any' || output == 'any'), }"
+      :class="{ 'flow-node__socket--match': match.out && (output == match.out || match.out == 'interface {}' || output == 'interface {}'), }"
       v-bind="outputProps(0)"
       v-bind="outputProps(0)"
       @mousedown.stop.prevent="socketPointerDown($event, {out:0})"
       @mousedown.stop.prevent="socketPointerDown($event, {out:0})"
     />
     />

+ 1 - 2
browser/vue-flow/src/components/flow/svgdefs.svg

@@ -1,4 +1,4 @@
-  <defs>
+<defs>
     <marker id="head" class="flow-link__head" orient="auto" markerWidth="4" markerHeight="8" refX="0.1" refY="4">
     <marker id="head" class="flow-link__head" orient="auto" markerWidth="4" markerHeight="8" refX="0.1" refY="4">
       <path d="M0,0 V8 L4,4 Z"/>
       <path d="M0,0 V8 L4,4 Z"/>
     </marker>
     </marker>
@@ -34,5 +34,4 @@
     <filter id="drag-shadow" x="-100%" y="-100%" width="300%" height="300%">
     <filter id="drag-shadow" x="-100%" y="-100%" width="300%" height="300%">
       <feDropShadow dx="0" dy="2" stdDeviation="3"/>
       <feDropShadow dx="0" dy="2" stdDeviation="3"/>
     </filter>
     </filter>
-
   </defs>
   </defs>

+ 0 - 1
browser/vue-flow/src/components/flow/svgdefwrapper.vue

@@ -1,2 +1 @@
-<!-- Simple because -->
 <template src="./svgdefs.svg"/>
 <template src="./svgdefs.svg"/>

+ 4 - 5
browser/vue-flow/src/components/main.vue

@@ -155,23 +155,22 @@ import '@/assets/style.css'
 const defRegistry = {
 const defRegistry = {
   'Input': {
   'Input': {
     categories: ['core'],
     categories: ['core'],
-    output: 'any',
+    output: 'interface {}',
     style: { color: '#686', textColor: '#fff', shape: 'circle' },
     style: { color: '#686', textColor: '#fff', shape: 'circle' },
     props: {} // should be sent in the node
     props: {} // should be sent in the node
   },
   },
   'Variable': {
   'Variable': {
     categories: ['core'],
     categories: ['core'],
-    output: 'any',
+    output: 'interface {}',
     style: { color: '#88a', textColor: '#000' },
     style: { color: '#88a', textColor: '#000' },
     props: {init: ''}
     props: {init: ''}
   },
   },
   'Const': {
   'Const': {
     categories: ['core'],
     categories: ['core'],
-    output: 'any',
+    output: 'interface {}',
     style: {
     style: {
       color: '#555',
       color: '#555',
-      textColor: '#333',
-      shape: 'circle'
+      textColor: '#333'
     },
     },
     props: {value: ''}
     props: {value: ''}
   }
   }

+ 1 - 1
browser/vue-flow/src/components/panel.vue

@@ -78,7 +78,7 @@ export default {
       return (g) => {
       return (g) => {
         const ret = Object.keys(this.registry).filter(v => this.registry[v].categories.includes(g))
         const ret = Object.keys(this.registry).filter(v => this.registry[v].categories.includes(g))
           .map(v => {
           .map(v => {
-            return { src: v, label: v }
+            return { src: v, label: v.split('.').join('<br/>') }
           })
           })
         if (!this.search) {
         if (!this.search) {
           return ret
           return ret

+ 3 - 10
go/src/flow/flow.go

@@ -11,13 +11,6 @@ import (
 	"sync"
 	"sync"
 )
 )
 
 
-// Global
-var (
-	Global       = registry.New()
-	Register     = Global.Register
-	Descriptions = Global.Descriptions
-)
-
 // Data interface
 // Data interface
 type Data = interface{}
 type Data = interface{}
 
 
@@ -32,7 +25,7 @@ type opEntry struct {
 // We could Create a single array of operations
 // We could Create a single array of operations
 // refs would only mean id, types would be embed in operation
 // refs would only mean id, types would be embed in operation
 type Flow struct {
 type Flow struct {
-	registry   *registry.Registry
+	registry   *registry.R
 	consts     map[string]Data
 	consts     map[string]Data
 	data       map[string]Data // Should be named, to fetch later
 	data       map[string]Data // Should be named, to fetch later
 	operations sync.Map
 	operations sync.Map
@@ -48,7 +41,7 @@ type Flow struct {
 // New create a new flow
 // New create a new flow
 func New() *Flow {
 func New() *Flow {
 	return &Flow{
 	return &Flow{
-		registry: Global,
+		registry: registry.Global,
 		// Data
 		// Data
 		consts:     map[string]Data{},
 		consts:     map[string]Data{},
 		data:       map[string]Data{},
 		data:       map[string]Data{},
@@ -63,7 +56,7 @@ func (f *Flow) Err() error {
 }
 }
 
 
 //SetRegistry use the registry specified
 //SetRegistry use the registry specified
-func (f *Flow) SetRegistry(r *registry.Registry) *Flow {
+func (f *Flow) SetRegistry(r *registry.R) *Flow {
 	f.registry = r
 	f.registry = r
 	// chain
 	// chain
 	return f
 	return f

+ 48 - 17
go/src/flow/operation.go

@@ -54,7 +54,12 @@ func (o *operation) Process(params ...Data) Data {
 
 
 // Every single one is run with this internally
 // Every single one is run with this internally
 func (o *operation) processWithCtx(ctx OpCtx, params ...Data) Data {
 func (o *operation) processWithCtx(ctx OpCtx, params ...Data) Data {
-	entry, _ := o.flow.getOp(fmt.Sprint(o.id))
+	return o.process(ctx, params...)
+	/*entry, _ := o.flow.getOp(fmt.Sprint(o.id))
+	if entry == nil {
+		log.Println("Entry is nil for id:", o.id, ", why??")
+		return nil
+	}
 	entry.Lock()
 	entry.Lock()
 	defer entry.Unlock()
 	defer entry.Unlock()
 
 
@@ -62,14 +67,13 @@ func (o *operation) processWithCtx(ctx OpCtx, params ...Data) Data {
 		return nil
 		return nil
 	}
 	}
 	if ctx == nil { // No cache/Context
 	if ctx == nil { // No cache/Context
-		return o.process(ctx, params...)
 	}
 	}
 	if v, ok := ctx.Load(o.id); ok {
 	if v, ok := ctx.Load(o.id); ok {
 		return v
 		return v
 	}
 	}
 	res := o.process(ctx, params...)
 	res := o.process(ctx, params...)
 	ctx.Store(o.id, res)
 	ctx.Store(o.id, res)
-	return res
+	return res*/
 }
 }
 
 
 // Set setter for certain operations (Var)
 // Set setter for certain operations (Var)
@@ -108,6 +112,8 @@ func opConst(f *Flow, id string) *operation {
 		},
 		},
 	}
 	}
 }
 }
+
+// Debug type
 func opFunc(f *Flow, id string) *operation {
 func opFunc(f *Flow, id string) *operation {
 	return &operation{
 	return &operation{
 		flow: f,
 		flow: f,
@@ -123,6 +129,18 @@ func opFunc(f *Flow, id string) *operation {
 				return nil
 				return nil
 			}
 			}
 
 
+			if f.err != nil {
+				return nil
+			}
+			op.Lock()
+			defer op.Unlock()
+			if ctx != nil {
+				if v, ok := ctx.Load(id); ok {
+					return v
+				}
+			}
+
+			// Check inputs
 			fnval := reflect.ValueOf(op.executor)
 			fnval := reflect.ValueOf(op.executor)
 			if fnval.Type().NumIn() != len(op.inputs) {
 			if fnval.Type().NumIn() != len(op.inputs) {
 				f.err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
 				f.err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
@@ -137,6 +155,7 @@ func opFunc(f *Flow, id string) *operation {
 
 
 			callParam := make([]reflect.Value, len(op.inputs))
 			callParam := make([]reflect.Value, len(op.inputs))
 
 
+			// Parallel processing if inputs
 			wg := sync.WaitGroup{}
 			wg := sync.WaitGroup{}
 			wg.Add(len(op.inputs))
 			wg.Add(len(op.inputs))
 			for i, in := range op.inputs {
 			for i, in := range op.inputs {
@@ -147,27 +166,39 @@ func opFunc(f *Flow, id string) *operation {
 				}(i, in)
 				}(i, in)
 			}
 			}
 			wg.Wait()
 			wg.Wait()
-			// Check params
-			for _, p := range callParam {
-				if !p.IsValid() {
-					f.err = fmt.Errorf("Input failed", p)
-					log.Println("Flow err:", f.err)
+
+			f.hooks.start(id)
+
+			// Check params results
+			for i, p := range callParam {
+				f.err = func() error {
+					if !p.IsValid() {
+						return fmt.Errorf("Input is not valid %v", p)
+					}
+					if !p.Type().AssignableTo(fnval.Type().In(i)) {
+						return fmt.Errorf("Input not assignable to %v", p)
+					}
+					return nil
+				}()
+				if f.err != nil {
 					f.hooks.error(id, f.err)
 					f.hooks.error(id, f.err)
 					return nil
 					return nil
 				}
 				}
 			}
 			}
-			f.hooks.start(id)
+
 			fnret := fnval.Call(callParam)
 			fnret := fnval.Call(callParam)
-			if len(fnret) > 1 {
-				if err := fnret[1].Interface().(error); err != nil {
-					f.err = err
-					log.Println("Flow err:", f.err)
-					f.hooks.error(id, f.err)
-					return nil
-				}
+			if len(fnret) > 1 && (fnret[1].Interface().(error) != nil) {
+				f.err = fnret[1].Interface().(error)
+				log.Println("Flow err:", f.err)
+				f.hooks.error(id, f.err)
+				return nil
 			}
 			}
-			// check fnret[1] for error
+
+			// THE RESULT
 			ret := fnret[0].Interface()
 			ret := fnret[0].Interface()
+			if ctx != nil {
+				ctx.Store(id, ret)
+			}
 			f.hooks.finish(id, ret)
 			f.hooks.finish(id, ret)
 			return ret
 			return ret
 		},
 		},

+ 10 - 6
go/src/flow/registry/entry.go

@@ -20,14 +20,16 @@ type Description struct {
 
 
 // Entry contains a function description params etc
 // Entry contains a function description params etc
 type Entry struct {
 type Entry struct {
-	registry    *Registry
+	registry    *R
 	fn          interface{}
 	fn          interface{}
+	Inputs      []reflect.Type
+	Output      reflect.Type
 	Description *Description
 	Description *Description
 	err         error
 	err         error
 }
 }
 
 
 // NewEntry creates and describes a New Entry
 // NewEntry creates and describes a New Entry
-func NewEntry(r *Registry, fn interface{}) *Entry {
+func NewEntry(r *R, fn interface{}) *Entry {
 	e := &Entry{registry: r, fn: fn}
 	e := &Entry{registry: r, fn: fn}
 
 
 	fntyp := reflect.TypeOf(e.fn)
 	fntyp := reflect.TypeOf(e.fn)
@@ -44,9 +46,11 @@ func NewEntry(r *Registry, fn interface{}) *Entry {
 	Inputs := []string{}
 	Inputs := []string{}
 	nInputs := fnTyp.NumIn()
 	nInputs := fnTyp.NumIn()
 	for i := 0; i < nInputs; i++ {
 	for i := 0; i < nInputs; i++ {
+		e.Inputs = append(e.Inputs, fnTyp.In(i))
 		Inputs = append(Inputs, fmt.Sprint(fnTyp.In(i)))
 		Inputs = append(Inputs, fmt.Sprint(fnTyp.In(i)))
 	}
 	}
 	Output := fmt.Sprint(fnTyp.Out(0))
 	Output := fmt.Sprint(fnTyp.Out(0))
+	e.Output = fnTyp.Out(0)
 
 
 	e.Description = &Description{
 	e.Description = &Description{
 		Categories: []string{"generic"},
 		Categories: []string{"generic"},
@@ -72,8 +76,8 @@ func (e *Entry) Categories(cat ...string) *Entry {
 	return e
 	return e
 }
 }
 
 
-// Input description for Input
-func (e *Entry) Input(i int, desc string) *Entry {
+// DescInput description for Input
+func (e *Entry) DescInput(i int, desc string) *Entry {
 	if e.err != nil {
 	if e.err != nil {
 		return e
 		return e
 	}
 	}
@@ -81,8 +85,8 @@ func (e *Entry) Input(i int, desc string) *Entry {
 	return e
 	return e
 }
 }
 
 
-// Output description for Input
-func (e *Entry) Output(desc string) *Entry {
+// DescOutput description for Input
+func (e *Entry) DescOutput(desc string) *Entry {
 	if e.err != nil {
 	if e.err != nil {
 		return e
 		return e
 	}
 	}

+ 4 - 4
go/src/flow/registry/entry_test.go

@@ -28,11 +28,11 @@ func TestDescription(t *testing.T) {
 	a.Eq(e.Err(), nil, "should not fail setting categories")
 	a.Eq(e.Err(), nil, "should not fail setting categories")
 	a.Eq(len(e.Description.Categories), 2, "should have 2 categories")
 	a.Eq(len(e.Description.Categories), 2, "should have 2 categories")
 
 
-	e.Input(0, "input doc")
+	e.DescInput(0, "input doc")
 	a.Eq(e.Err(), nil, "should not fail setting input description")
 	a.Eq(e.Err(), nil, "should not fail setting input description")
 	a.Eq(len(e.Description.InputDesc), 1, "should have 1 input description")
 	a.Eq(len(e.Description.InputDesc), 1, "should have 1 input description")
 
 
-	e.Output("output desc")
+	e.DescOutput("output desc")
 	a.Eq(e.Err(), nil, "should not fail setting input description")
 	a.Eq(e.Err(), nil, "should not fail setting input description")
 	a.Eq(e.Description.OutputDesc, "output desc", "output description should be the same")
 	a.Eq(e.Description.OutputDesc, "output desc", "output description should be the same")
 
 
@@ -47,9 +47,9 @@ func TestDescriptionError(t *testing.T) {
 	a.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
 	a.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
 	e.Categories("a", "b")
 	e.Categories("a", "b")
 	a.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
 	a.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
-	e.Input(0, "input doc")
+	e.DescInput(0, "input doc")
 	a.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
 	a.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
-	e.Output("output desc")
+	e.DescOutput("output desc")
 	a.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
 	a.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
 	e.Extra("test", 123)
 	e.Extra("test", 123)
 	a.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
 	a.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")

+ 32 - 9
go/src/flow/registry/registry.go

@@ -1,22 +1,45 @@
 package registry
 package registry
 
 
 import (
 import (
+	"path"
 	"reflect"
 	"reflect"
+	"runtime"
 )
 )
 
 
-// Registry function registry
-type Registry struct {
+// Global
+var (
+	Global       = New()
+	Register     = Global.Register
+	Descriptions = Global.Descriptions
+	GetEntry     = Global.Entry
+	Add          = Global.Add
+)
+
+// R the function registry
+type R struct {
 	data map[string]*Entry
 	data map[string]*Entry
 }
 }
 
 
 // New creates a new registry
 // New creates a new registry
-func New() *Registry {
-	return &Registry{map[string]*Entry{}}
+func New() *R {
+	return &R{map[string]*Entry{}}
 }
 }
 
 
-//Register should be a func only register
-func (r *Registry) Register(name string, v interface{}) *Entry {
+// Add unnamed function
+func (r *R) Add(fns ...interface{}) Batch {
+
+	b := Batch{}
+	for _, fn := range fns {
+		name := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
+		name = path.Base(name)
+		b = append(b, r.Register(name, fn))
+	}
+
+	return b
+}
 
 
+//Register should be a func only register
+func (r *R) Register(name string, v interface{}) *Entry {
 	e := NewEntry(r, v)
 	e := NewEntry(r, v)
 	if e.err != nil {
 	if e.err != nil {
 		return e
 		return e
@@ -26,7 +49,7 @@ func (r *Registry) Register(name string, v interface{}) *Entry {
 }
 }
 
 
 // Get an entry
 // Get an entry
-func (r *Registry) Get(name string, params ...interface{}) (interface{}, error) {
+func (r *R) Get(name string, params ...interface{}) (interface{}, error) {
 	e, ok := r.data[name]
 	e, ok := r.data[name]
 	if !ok {
 	if !ok {
 		return nil, ErrNotFound
 		return nil, ErrNotFound
@@ -51,7 +74,7 @@ func (r *Registry) Get(name string, params ...interface{}) (interface{}, error)
 }
 }
 
 
 // Entry fetches entries from the register
 // Entry fetches entries from the register
-func (r *Registry) Entry(name string) (*Entry, error) {
+func (r *R) Entry(name string) (*Entry, error) {
 	e, ok := r.data[name]
 	e, ok := r.data[name]
 	if !ok {
 	if !ok {
 		return nil, ErrNotFound
 		return nil, ErrNotFound
@@ -62,7 +85,7 @@ func (r *Registry) Entry(name string) (*Entry, error) {
 //Describe named fn
 //Describe named fn
 
 
 // Descriptions Description list
 // Descriptions Description list
-func (r *Registry) Descriptions() (map[string]Description, error) {
+func (r *R) Descriptions() (map[string]Description, error) {
 	ret := map[string]Description{}
 	ret := map[string]Description{}
 	for k, e := range r.data {
 	for k, e := range r.data {
 		ret[k] = *e.Description
 		ret[k] = *e.Description

+ 29 - 0
go/src/flowserver/cmd/flowserver/floatops/floatops.go

@@ -0,0 +1,29 @@
+package floatops
+
+import (
+	"errors"
+	"flow/registry"
+	"math"
+)
+
+// Register into registry
+func Register(r *registry.R) {
+
+	r.Add(sum, sub, div, mul).Categories("floatops")
+}
+func sum(a, b float32) float32 {
+	return a + b
+}
+func sub(a, b float32) float32 {
+	return a - b
+}
+func div(a, b float32) (float32, error) {
+	if b == 0 {
+		return float32(math.NaN()), errors.New("Division by 0")
+	}
+	return a / b, nil
+}
+
+func mul(a, b float32) float32 {
+	return a * b
+}

+ 26 - 89
go/src/flowserver/cmd/flowserver/main.go

@@ -1,14 +1,13 @@
 package main
 package main
 
 
 import (
 import (
-	"errors"
 	"flow"
 	"flow"
 	"flow/registry"
 	"flow/registry"
 	"flowserver"
 	"flowserver"
+	"flowserver/cmd/flowserver/floatops"
 	"fmt"
 	"fmt"
+	"io"
 	"log"
 	"log"
-	"math/rand"
-	"strings"
 	"time"
 	"time"
 
 
 	"github.com/gohxs/prettylog"
 	"github.com/gohxs/prettylog"
@@ -16,46 +15,19 @@ import (
 
 
 func main() {
 func main() {
 	prettylog.Global()
 	prettylog.Global()
-
 	log.Println("Running version:", flowserver.Version)
 	log.Println("Running version:", flowserver.Version)
 
 
-	registry.Batch{
-		flow.Register("delay.cat", delayCat),
-		flow.Register("delay.reverse", delayReverse),
-		flow.Register("delay.split", delaySplit),
-		flow.Register("delay.join", delayJoin),
-		flow.Register("addduration", duration),
-	}.Categories("string", "test").
-		Extra("style", map[string]string{
-			"color": "#a55",
-		})
+	floatops.Register(registry.Global)
 
 
-	registry.Batch{
-		flow.Register("longduration", duration(10*time.Second)),
-		flow.Register("mediumduration", duration(5*time.Second)),
-		flow.Register("randomduration", randduration(10*time.Second)),
-	}.Categories("time simulation")
+	registry.Add(customStruct, setName, setURL).Categories("test-struct")
 
 
-	registry.Batch{
-		flow.Register("multiplex4", multiplex4),
-		flow.Register("multiplex6", multiplex6),
-		flow.Register("error", func(s string) (string, error) {
-			time.Sleep(4 * time.Second)
-			return "", errors.New("Some error")
-		}),
-	}.Categories("test").
-		Extra("style", map[string]string{
-			"color": "#959",
-		})
+	registry.Batch{registry.Register("stream", stream)}.Categories("streamer")
 
 
 	registry.Batch{
 	registry.Batch{
-		flow.Register("str.cat", strCat),
-		flow.Register("str.reverse", strReverse),
-		flow.Register("str.split", strings.Split),
-		flow.Register("str.join", strings.Join),
-	}.Categories("string").
+		registry.Register("wait", wait),
+	}.Categories("time-test").
 		Extra("style", map[string]string{
 		Extra("style", map[string]string{
-			"color": "#aa5",
+			"color": "#8a5",
 		})
 		})
 
 
 	addr := ":2015"
 	addr := ":2015"
@@ -65,65 +37,30 @@ func main() {
 	f.ListenAndServe(addr)
 	f.ListenAndServe(addr)
 }
 }
 
 
-func multiplex4(a1, a2, a3, a4 string) string {
-	return strings.Join([]string{a1, a2, a3, a4}, " ")
-}
-func multiplex6(a1, a2, a3, a4, a5, a6 string) string {
-	return strings.Join([]string{a1, a2, a3, a4, a5, a6}, " ")
+func wait(data flow.Data, n int) flow.Data {
+	time.Sleep(time.Duration(n) * time.Second) // Simulate
+	return data
 }
 }
 
 
-/* Sample funcs */
-func strCat(a1, a2 string) string {
-	return a1 + " " + a2
+func stream(w io.Writer, val interface{}) interface{} {
+	fmt.Fprint(w, val)
+	return val
 }
 }
 
 
-func strReverse(s string) string {
-	n := len(s)
-	runes := make([]rune, n)
-	for _, r := range s {
-		n--
-		runes[n] = r
-	}
-	return string(runes[n:])
+// CustomStruct testing custom struct passing
+type CustomStruct struct {
+	Name string
+	Url  string
 }
 }
 
 
-func delayCat(a1, a2 string) string {
-	time.Sleep(time.Duration(2+rand.Intn(10)) * time.Second) // Simulate
-	return strCat(a1, a2)
+func customStruct() CustomStruct {
+	return CustomStruct{}
 }
 }
-
-func delayReverse(s string) string {
-	time.Sleep(time.Duration(2+rand.Intn(10)) * time.Second) // Simulate
-	return strReverse(s)
-
-}
-func delaySplit(s, sep string) []string {
-	time.Sleep(time.Duration(2+rand.Intn(10)) * time.Second) // Simulate
-	return strings.Split(s, sep)
-}
-func delayJoin(s []string, join string) string {
-	time.Sleep(time.Duration(2+rand.Intn(10)) * time.Second) // Simulate
-	return strings.Join(s, join)
-}
-
-func longduration() string {
-	mark := time.Now()
-	time.Sleep(25 * time.Second) // Simulate
-	now := time.Now()
-
-	return " took: " + fmt.Sprint(now.Sub(mark))
-
-}
-
-func duration(n time.Duration) func() string {
-	return func() string {
-		time.Sleep(n)
-		return fmt.Sprint("I waited:", n)
-	}
+func setName(a CustomStruct, name string) CustomStruct {
+	a.Name = name
+	return a
 }
 }
-func randduration(n time.Duration) func() string {
-	return func() string {
-		time.Sleep(time.Duration(2 + rand.Intn(int(n)))) // Simulate
-		return fmt.Sprint("I waited:", n)
-	}
+func setURL(a CustomStruct, url string) CustomStruct {
+	a.Url = url
+	return a
 }
 }

+ 27 - 10
go/src/flowserver/flowbuilder.go

@@ -3,6 +3,9 @@ package flowserver
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"flow"
 	"flow"
+	"flow/registry"
+	"log"
+	"reflect"
 )
 )
 
 
 // Node that will contain registry src
 // Node that will contain registry src
@@ -42,16 +45,22 @@ func FlowBuild(rawData []byte) (*flow.Flow, error) {
 
 
 	ninput := 0
 	ninput := 0
 	for _, n := range doc.Nodes {
 	for _, n := range doc.Nodes {
+		if n.Src == "Input" || n.Src == "Variable" || n.Src == "Const" {
+			continue
+		}
+
 		// Find link refered as To
 		// Find link refered as To
-		param := make([]interface{}, 10) // 10 is temporary to test out operations
-		lastParamID := -1
+		entry, err := registry.GetEntry(n.Src)
+		if err != nil {
+			return nil, err
+		}
+
+		param := make([]flow.Data, len(entry.Inputs))
+		// Find links
 		for _, l := range doc.Links {
 		for _, l := range doc.Links {
 			if l.To != n.ID {
 			if l.To != n.ID {
 				continue
 				continue
 			}
 			}
-			if l.In > lastParamID {
-				lastParamID = l.In
-			}
 			// Define operators here
 			// Define operators here
 			from := nodeMap[l.From]
 			from := nodeMap[l.From]
 			switch from.Src {
 			switch from.Src {
@@ -66,15 +75,23 @@ func FlowBuild(rawData []byte) (*flow.Flow, error) {
 			case "Variable":
 			case "Variable":
 				param[l.In] = f.Var(from.ID, from.Prop["init"])
 				param[l.In] = f.Var(from.ID, from.Prop["init"])
 			case "Const":
 			case "Const":
-				param[l.In] = f.Const(from.Prop["value"])
+				// XXX: Automate this in a func
+				newVal := reflect.New(entry.Inputs[l.In])
+				raw := from.Prop["value"]
+				if _, ok := newVal.Interface().(*string); ok {
+					raw = "\"" + from.Prop["value"] + "\""
+				}
+				err := json.Unmarshal([]byte(raw), newVal.Interface())
+				if err != nil {
+					log.Println("Error", err)
+					param[l.In] = nil
+					continue
+				}
+				param[l.In] = f.Const(newVal.Elem().Interface())
 			default:
 			default:
 				param[l.In] = f.Res(from.ID)
 				param[l.In] = f.Res(from.ID)
 			}
 			}
 		}
 		}
-		param = param[:lastParamID+1]
-		if n.Src == "Input" || n.Src == "Variable" || n.Src == "Const" {
-			continue
-		}
 		f.DefOp(n.ID, n.Src, param...)
 		f.DefOp(n.ID, n.Src, param...)
 	}
 	}
 	return f, nil
 	return f, nil

+ 7 - 3
go/src/flowserver/session.go

@@ -4,7 +4,9 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"flow"
 	"flow"
+	"flow/registry"
 	"flowserver/flowmsg"
 	"flowserver/flowmsg"
+	"fmt"
 	"io/ioutil"
 	"io/ioutil"
 	"log"
 	"log"
 	"os"
 	"os"
@@ -25,7 +27,7 @@ type NodeActivity struct {
 	StartTime time.Time `json:"startTime"`
 	StartTime time.Time `json:"startTime"`
 	EndTime   time.Time `json:"endTime"`
 	EndTime   time.Time `json:"endTime"`
 	Data      flow.Data `json:"data"`
 	Data      flow.Data `json:"data"`
-	Error     error     `json:"error"`
+	Error     string    `json:"error"`
 }
 }
 
 
 // FlowSession Create a session and link clients
 // FlowSession Create a session and link clients
@@ -84,7 +86,7 @@ func (s *FlowSession) ClientAdd(c *websocket.Conn) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	desc, err := flow.Descriptions()
+	desc, err := registry.Descriptions()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -102,6 +104,7 @@ func (s *FlowSession) ClientAdd(c *websocket.Conn) error {
 		return err
 		return err
 	}
 	}
 
 
+	// Sending activity
 	return c.WriteJSON(flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
 	return c.WriteJSON(flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
 
 
 	// Send registry
 	// Send registry
@@ -186,6 +189,7 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 		s.flow, err = FlowBuild(s.RawDoc)
 		s.flow, err = FlowBuild(s.RawDoc)
 		if err != nil {
 		if err != nil {
 			log.Println("Flow error:", err)
 			log.Println("Flow error:", err)
+			return
 		}
 		}
 		defer func() { // After routing gone
 		defer func() { // After routing gone
 			s.flow = nil
 			s.flow = nil
@@ -214,7 +218,7 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 				case "Error":
 				case "Error":
 					status = "error"
 					status = "error"
 					act.EndTime = triggerTime
 					act.EndTime = triggerTime
-					act.Error = extra[0].(error)
+					act.Error = fmt.Sprint(extra[0])
 				}
 				}
 				if act.Status == status {
 				if act.Status == status {
 					return
 					return