Explorar o código

Refactored flow package

luis %!s(int64=7) %!d(string=hai) anos
pai
achega
4a9fd810b7

+ 2 - 3
browser/vue-flow/src/components/chat.vue

@@ -9,7 +9,7 @@
           type="text"
           :value="handle"
           @blur="HANDLE_UPDATE($event.target.value)"
-          @keyup.enter="HANDLE_UPDATE($event.target.value)">
+          @keyup.enter="HANDLE_UPDATE($event.target.value);$event.target.blur()">
         <div ref="messages" class="flow-chat__messages">
           <div v-for="m in events" class="message">
             <div class="handle">
@@ -87,7 +87,7 @@ export default {
   created () {
     this.subscription = this.$store.subscribe(mut => {
       if (mut.type === 'chat/EVENT_ADD' && mut.payload.type === 'msg') {
-        // if (this.active) { return }
+        if (this.active) { return }
         this.NOTIFICATION_ADD(`<b>${mut.payload.handle}:</b> ${mut.payload.message}`)
       }
     })
@@ -112,7 +112,6 @@ export default {
 </script>
 <style>
 .flow-chat {
-  z-index:3000;
   height:100%;
   box-sizing:border-box;
   position:relative;

+ 1 - 20
browser/vue-flow/src/components/flow/editor.js

@@ -154,7 +154,7 @@ export default {
   methods: {
     ...mapActions('flow', [
       'DOCUMENT_SYNC',
-      'NODE_RAISE', 'NODE_UPDATE', 'NODE_ADD', 'NODE_REMOVE', 'NODE_INSPECT',
+      'NODE_RAISE', 'NODE_UPDATE', 'NODE_ADD', 'NODE_REMOVE', 'NODE_INSPECT', 'NODE_PROCESS',
       'LINK_ADD', 'LINK_REMOVE',
       'TRIGGER_ADD', 'TRIGGER_REMOVE' ]),
 
@@ -439,10 +439,8 @@ export default {
               y: Math.round((n.y + dragP.y - curP.y) / 10) * 10
             })
           }
-
           // Updating nodes
           this.NODE_UPDATE(nodeUpdate)
-
           this.DOCUMENT_SYNC()
         }
       })
@@ -469,23 +467,6 @@ export default {
       }
       this.NODE_ADD(newNode)
     },
-    nodeProcess (node) {
-      // this.DOCUMENT_DYNC()
-      // console.log('Node process demand')
-      this.$emit('nodeProcess', node)
-    },
-    linkPointerClick (ev, link) {
-      ev.preventDefault()
-      this.LINK_REMOVE(link)
-    },
-
-    /* triggerRemove (trigger) {
-      const i = this.nodeData.triggers.findIndex(l => l === trigger)
-      if (i === -1) return
-      this.nodeData.triggers.splice(i, 1)
-      // this.sendFlowEvent('triggerRemove', trigger)
-      this.DOCUMENT_DYNC()
-    }, */
 
     managerDrop (ev) {
       ev.preventDefault()

+ 4 - 4
browser/vue-flow/src/components/flow/editor.vue

@@ -22,7 +22,7 @@
           v-for="(link,i) in nodeData.links"
           :key="'link' + i"
           v-bind="linkProps(link)"
-          @click="linkPointerClick($event,link)"
+          @mousedown.middle="LINK_REMOVE(link)"
         />
         <!-- trigger links -->
         <flow-trigger-link
@@ -30,7 +30,7 @@
           :key="'trigger'+i"
           label="WIP"
           v-bind="triggerProps(trigger)"
-          @click="TRIGGER_REMOVE(trigger)"
+          @mousedown.middle="TRIGGER_REMOVE(trigger)"
         />
         <!-- nodes -->
         <flow-node
@@ -82,8 +82,8 @@
     <hx-context-menu ref="menu">
       <template slot-scope="d" >
         <div class="flow-node__context-menu">
-          <div class="hover" @click="nodeProcess(d.userData)">Run</div>
-          <div class="hover" tabindex="0" @click="nodeRemove(d.userData)">Delete</div>
+          <div class="hover" @click="NODE_PROCESS(d.userData.id)">Run</div>
+          <div class="hover" @click="NODE_REMOVE([d.userData])">Delete</div>
           <hr>
           <div class="hover" @click="nodeInspect(d.userData,true)">Inspect</div>
         </div>

+ 1 - 1
browser/vue-flow/src/components/flow/link-trigger.vue

@@ -2,7 +2,7 @@
   <g
     class="flow-trigger-link"
     :class="{'flow-trigger-link--pointer':pointer}"
-    @click="$emit('click',$event)">
+    @mousedown="$emit('mousedown',$event)">
     <path
       class="flow-trigger-link__area"
       :d="path"

+ 1 - 1
browser/vue-flow/src/components/flow/link.vue

@@ -1,5 +1,5 @@
 <template>
-  <g class="flow-link" :class="{'flow-link--pointer':pointer}" :status="status" @click="$emit('click',$event)">
+  <g class="flow-link" :class="{'flow-link--pointer':pointer}" :status="status" @mousedown="$emit('mousedown',$event)">
     <path class="flow-link__area" :d="path" />
     <path class="flow-link__visible" :d="path" />
     <path v-if="status" class="flow-link__status" :d="path" />

+ 5 - 4
browser/vue-flow/src/components/flow/node-activity.vue

@@ -62,7 +62,7 @@ export default {
         intervalms = 0
       }
       const min = Math.floor(intervalms / 60000)
-      const sec = (intervalms / 1000) % 60
+      const sec = ((intervalms / 1000) % 60)
 
       return utils.padStart(min.toFixed(0), 2, '0') + ':' + utils.padStart(sec.toFixed(0), 2, '0')
     }
@@ -74,7 +74,8 @@ export default {
     }
   },
   mounted () {
-    this._timeOut = setTimeout(this.updateTime, 999)
+    this.updateTime()
+    // this._timeOut = setTimeout(this.updateTime, 999)
   },
   beforeDestroy () {
     clearTimeout(this._timeOut)
@@ -86,8 +87,8 @@ export default {
         this.finishTime = finish
         return
       }
-      this.finishTime = new Date(new Date().getTime() + 1000)
-      this._timeOut = setTimeout(this.updateTime, 999)
+      this.finishTime = new Date(new Date().getTime() + 2000) // time correction why?
+      this._timeOut = setTimeout(this.updateTime, 400)
     }
 
   }

+ 0 - 31
browser/vue-flow/src/components/main.vue

@@ -64,7 +64,6 @@
               <flow-inspector
                 ref="inspector"
                 v-show="panel=='inspector'"
-                @nodeProcess="nodeProcess($event)"
               />
             </transition>
           </div>
@@ -127,21 +126,6 @@ export default {
     ...mapGetters(['registry', 'activity'])
   },
 
-  created () {
-    /* let ctx = this.$route.params.context
-    let urlPath = [
-      window.location.host,
-      ctx,
-      'conn'
-    ]
-    let targetws = 'ws://' + urlPath.join('/')
-    if (window.location.protocol === 'https:') {
-      targetws = 'wss://' + urlPath.join('/')
-    }
-    this.$flowService.connect(targetws) */
-    // Vue.use(FlowService, {location: targetws})
-  },
-
   mounted () {
     // Handle incoming things
     this.$flowService.on('sessionJoin', (v) => {
@@ -152,18 +136,6 @@ export default {
     this.$flowService.on('sessionLog', (v) => {
       console.log(v.data) // Temporary
     })
-
-    // Connected
-    /* this.$flowService.connected(() => {
-      this.NOTIFICATION_ADD('Connected')
-      // Make this in a service
-      if (this.$route.params.sessId === undefined) {
-        console.log('Creating new session')
-        this.$flowService.sessionNew()
-        return
-      }
-      this.$flowService.sessionLoad(undefined, this.$route.params.sessId)
-    }) */
   },
   methods: {
     ...mapActions('flow', ['NODE_INSPECT', 'NOTIFICATION_ADD']),
@@ -195,9 +167,6 @@ export default {
         targetInput.focus()
       })
     },
-    nodeProcess (node) {
-      this.$flowService.nodeProcess(node.id)
-    },
     funcsSizeUpdate (ev, size) {
       this.funcsSize = size
     },

+ 2 - 2
browser/vue-flow/src/components/panel-inspector.vue

@@ -89,7 +89,7 @@
       <div class="flow-inspector__area flow-inspector--control">
         <button
           class="primary-inverse"
-          @click="$emit('nodeProcess',nodeInspect)">Run</button>
+          @click="NODE_PROCESS(nodeInspect.id)">Run</button>
       </div>
     </template>
     <template v-else>
@@ -120,7 +120,7 @@ export default {
     }
   },
   methods: {
-    ...mapActions('flow', ['NODE_UPDATE', 'DOCUMENT_SYNC']),
+    ...mapActions('flow', ['NODE_UPDATE', 'DOCUMENT_SYNC', 'NODE_PROCESS']),
     localChange () {
       this.NODE_UPDATE([JSON.parse(JSON.stringify(this.nodeInspect))])
       this.DOCUMENT_SYNC()

+ 4 - 1
browser/vue-flow/src/store/flow/actions.js

@@ -7,7 +7,7 @@ export default {
   },
   // Node update full document state somehow
   [m.DOCUMENT_UPDATE] ({commit}, nodeData) {
-    flowService.documentUpdate(nodeData)
+    // flowService.documentUpdate(nodeData)
     // Map the nodes
     // WEBSOCKET
     commit(m.DOCUMENT_UPDATE, nodeData)
@@ -34,6 +34,9 @@ export default {
   [m.NODE_INSPECT] ({commit}, nodeId) {
     commit(m.NODE_INSPECT, nodeId)
   },
+  [m.NODE_PROCESS] (ctx, nodeId) {
+    flowService.nodeProcess(nodeId)
+  },
   [m.LINK_ADD] (ctx, link) {
     ctx.commit(m.LINK_ADD, link)
     ctx.dispatch(m.DOCUMENT_SYNC)

+ 1 - 1
browser/vue-flow/src/store/flow/mutation-types.js

@@ -2,7 +2,7 @@ var actions = [
   'REGISTRY_UPDATE',
   'DOCUMENT_UPDATE', 'DOCUMENT_SYNC',
   'ACTIVITY_UPDATE',
-  'NODE_RAISE', 'NODE_UPDATE', 'NODE_ADD', 'NODE_REMOVE', 'NODE_INSPECT',
+  'NODE_RAISE', 'NODE_UPDATE', 'NODE_ADD', 'NODE_REMOVE', 'NODE_INSPECT', 'NODE_PROCESS',
   'LINK_ADD', 'LINK_REMOVE',
   'TRIGGER_ADD', 'TRIGGER_REMOVE',
   'NOTIFICATION_ADD', 'NOTIFICATION_CLEAR'

+ 136 - 52
go/src/flow/flow.go

@@ -13,13 +13,7 @@ import (
 
 // Data interface
 type Data = interface{}
-
-type opEntry struct {
-	sync.Mutex
-	name     string
-	inputs   []*operation // still figuring, might be operation
-	executor interface{}
-}
+type executorFunc func(...Data) Data
 
 // Flow structure
 // We could Create a single array of operations
@@ -74,9 +68,9 @@ func (f *Flow) Must(op Operation, err error) Operation {
 
 // Res returns a deferred operation result
 // passing the Id
-func (f *Flow) Res(id string) Operation {
-	return opFunc(f, id)
-}
+/*func (f *Flow) Res(id string) Operation {
+	return deferred(f, id)
+}*/
 
 // DefOp Manual tag an Operation
 func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation, error) {
@@ -98,17 +92,115 @@ func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation,
 	if err != nil {
 		return nil, err
 	}
-	f.operations.Store(id, &opEntry{sync.Mutex{}, name, inputs, executor})
-	return opFunc(f, id), nil
+	opEntry := &operation{
+		Mutex:    sync.Mutex{},
+		id:       id,
+		flow:     f,
+		name:     name,
+		kind:     "func",
+		inputs:   inputs,
+		setter:   nil, // No set
+		executor: executor,
+	}
+	f.operations.Store(id, opEntry)
+
+	return opEntry, nil
+}
+
+// DefErrOp define a nil operation that will return error
+// Usefull for builders
+func (f *Flow) DefErrOp(id string, err error) (Operation, error) {
+	executor := func() error { return err }
+	opEntry := &operation{
+		Mutex:    sync.Mutex{},
+		id:       id,
+		flow:     f,
+		name:     fmt.Sprintf("(error)<%v>", err),
+		kind:     "error",
+		inputs:   nil,
+		setter:   nil,
+		executor: executor,
+	}
+	f.operations.Store(id, opEntry)
+	return opEntry, nil
+}
+
+// DefConst define a const by defined ID
+func (f *Flow) DefConst(id string, value Data) (Operation, error) {
+	// Optimize this definition
+	f.consts[id] = value
+	executor := func() Data { return f.consts[id] }
+	opEntry := &operation{
+		id:       id,
+		Mutex:    sync.Mutex{},
+		flow:     f,
+		name:     fmt.Sprintf("(const)<%s>", id),
+		kind:     "const",
+		inputs:   nil,
+		setter:   nil,
+		executor: executor,
+	}
+	f.operations.Store(id, opEntry)
+
+	return opEntry, nil
+}
+
+// DefIn flow input operator
+//  paramID - index of the parameter
+func (f *Flow) DefIn(id string, paramID int) Operation {
+	executor := func(params ...Data) Data {
+		return params[paramID]
+	}
+	opEntry := &operation{
+		id:       id,
+		Mutex:    sync.Mutex{},
+		flow:     f,
+		name:     fmt.Sprintf("(in)<%s>", id),
+		kind:     "in",
+		inputs:   nil,
+		setter:   nil,
+		executor: executor,
+	}
+	f.operations.Store(id, opEntry)
+	//return opIn(f, paramID)
+	return opEntry
+}
+
+// DefVar define var operation with optional initial
+func (f *Flow) DefVar(id string, name string, initial ...Data) Operation {
+	// Unique
+	if _, ok := f.data[name]; !ok {
+		var v interface{}
+		if len(initial) > 0 {
+			v = initial[0]
+		}
+		f.data[name] = v
+	}
+	setter := func(v Data) { f.data[name] = v }
+	executor := func() Data { return f.data[name] }
+
+	opEntry := &operation{
+		Mutex:    sync.Mutex{},
+		id:       id,
+		flow:     f,
+		name:     fmt.Sprintf("(var)<%s>", name),
+		kind:     "var",
+		inputs:   nil,
+		setter:   setter,
+		executor: executor,
+	}
+	f.operations.Store(id, opEntry)
+	return opEntry
 }
 
+// Auto ID generation
+
 // Op return an function operator
 //  name - a previous registered function
 //  params - the function inputs
 func (f *Flow) Op(name string, params ...interface{}) (Operation, error) {
 	var op Operation
 	var err error
-
 	allocErr := f.allocID(func(id string) error {
 		op, err = f.DefOp(id, name, params...)
 		return err
@@ -119,14 +211,6 @@ func (f *Flow) Op(name string, params ...interface{}) (Operation, error) {
 	return op, err
 }
 
-// DefErrOp define a nil operation that will return error
-// Usefull for builders
-func (f *Flow) DefErrOp(id string, err error) (Operation, error) {
-	executor := func() error { return err }
-	f.operations.Store(id, &opEntry{sync.Mutex{}, fmt.Sprintf("(error)<%v>", err), nil, executor})
-	return opFunc(f, id), nil
-}
-
 // ErrOp error operation with generated ID
 func (f *Flow) ErrOp(operr error) (Operation, error) {
 	var op Operation
@@ -142,14 +226,6 @@ func (f *Flow) ErrOp(operr error) (Operation, error) {
 	return op, err
 }
 
-// DefConst define a const by defined ID
-func (f *Flow) DefConst(id string, value Data) (Operation, error) {
-	f.consts[id] = value
-	executor := func() Data { return f.consts[id] }
-	f.operations.Store(id, &opEntry{sync.Mutex{}, fmt.Sprintf("(const)<%s>", id), nil, executor})
-	return opFunc(f, id), nil
-}
-
 // Const returns a const operation with generated ID
 func (f *Flow) Const(value Data) (Operation, error) {
 	var op Operation
@@ -166,20 +242,29 @@ func (f *Flow) Const(value Data) (Operation, error) {
 
 // Var operation
 func (f *Flow) Var(name string, initial ...Data) Operation {
-	if _, ok := f.data[name]; !ok {
-		var v interface{}
-		if len(initial) > 0 {
-			v = initial[0]
-		}
-		f.data[name] = v
+	var op Operation
+	err := f.allocID(func(id string) error {
+		op = f.DefVar(id, name, initial...)
+		return nil
+	})
+	if err != nil {
+		return nil
 	}
-	return opVar(f, name)
+	return op
 }
 
-// In flow input operator
-//  paramID - index of the parameter
-func (f *Flow) In(paramID int) Operation {
-	return opIn(f, paramID)
+// In input operation
+func (f *Flow) In(paramID int) (Operation, error) {
+	var op Operation
+	err := f.allocID(func(id string) error {
+		op = f.DefIn(id, paramID)
+		return nil
+	})
+	if err != nil {
+		return nil, err
+	}
+	return op, nil
+
 }
 
 // Analyse every operations
@@ -189,7 +274,7 @@ func (f *Flow) Analyse(w io.Writer, params ...Data) {
 	}
 	fmt.Fprintf(w, "Ops analysis:\n")
 	f.operations.Range(func(pk, po interface{}) bool {
-		k, op := pk.(string), po.(*opEntry)
+		k, op := pk.(string), po.(*operation)
 		fw := bytes.NewBuffer(nil)
 		//fmt.Fprintf(w, "  [%s] (%v)", k, op.name)
 		fmt.Fprintf(fw, "  [%s] %s(", pk, op.name)
@@ -203,12 +288,12 @@ func (f *Flow) Analyse(w io.Writer, params ...Data) {
 				fmt.Fprintf(w, "Operator: %s error#%s\n", op.name, err)
 				return false
 			}
-			fmt.Fprintf(fw, " %s[%v](%v)", in.kind, in.id, ires)
+			fmt.Fprintf(fw, " %s[%v](%v)", op.kind, op.id, ires)
 		}
 		fmt.Fprintf(fw, ") - ")
 		// Create OpProcessor and execute
 		//
-		opfn := opFunc(f, k)
+		opfn := f.GetOp(k)
 		res, err := opfn.Process(params...)
 		if err != nil {
 			fmt.Fprintf(fw, "ERR\n")
@@ -239,14 +324,14 @@ func (f *Flow) String() string {
 
 	fmt.Fprintf(ret, "funcs:\n")
 	f.operations.Range(func(pk, pv interface{}) bool {
-		k, v := pk.(string), pv.(*opEntry)
+		k, v := pk.(string), pv.(*operation)
 
 		fmt.Fprintf(ret, "  [%s] %s(", k, v.name)
 		for j, in := range v.inputs {
 			if j != 0 {
 				fmt.Fprintf(ret, ", ")
 			}
-			fmt.Fprintf(ret, "%s[%v]", in.kind, in.id)
+			fmt.Fprintf(ret, "%s[%v]", "func", in.ID())
 		}
 		fmt.Fprintf(ret, ")\n")
 		return true
@@ -264,7 +349,7 @@ func (f *Flow) MarshalJSON() ([]byte, error) {
 	}
 	operations := map[string]opMarshal{}
 	f.operations.Range(func(pk, po interface{}) bool {
-		k, o := pk.(string), po.(*opEntry)
+		k, o := pk.(string), po.(*operation)
 		refs := []map[string]interface{}{}
 		for _, in := range o.inputs { // Switch type?
 			refs = append(refs, map[string]interface{}{
@@ -315,7 +400,7 @@ func (f *Flow) allocID(fn func(id string) error) error {
 
 }
 
-func (f *Flow) addEntry(entry *opEntry) (string, error) {
+func (f *Flow) addEntry(entry *operation) (string, error) {
 	f.Lock()
 	defer f.Unlock()
 
@@ -335,23 +420,22 @@ func (f *Flow) addEntry(entry *opEntry) (string, error) {
 /////////////////
 // Async data
 /////
-func (f *Flow) getOp(id string) (*opEntry, bool) {
+func (f *Flow) getOp(id string) (*operation, bool) {
 
 	o, ok := f.operations.Load(id)
 	if !ok {
 		return nil, false
 	}
-	return o.(*opEntry), true
+	return o.(*operation), true
 }
 
 // GetOp Return an existing operation or return notfound error
 func (f *Flow) GetOp(id string) Operation {
-	_, ok := f.operations.Load(id)
+	op, ok := f.operations.Load(id)
 	if !ok {
 		return nil
 	}
-	return opFunc(f, id)
-
+	return op.(Operation)
 }
 
 //////////////////////////////////////////////

+ 27 - 8
go/src/flow/flow_test.go

@@ -3,6 +3,7 @@ package flow_test
 import (
 	"bytes"
 	"encoding/json"
+	"fmt"
 	"testing"
 	"time"
 
@@ -17,24 +18,39 @@ import (
 func init() {
 	assert.Quiet = true
 }
+
+func TestInput(t *testing.T) {
+	a := assert.A(t)
+	f := flow.New()
+
+	op, err := f.Op("vecadd", []float32{1, 1, 1}, f.Must(f.In(0)))
+	a.Eq(err, nil, "result should not error")
+	a.NotEq(op, nil, "operation should not be nil")
+
+	d, err := op.Process([]float32{2, 2, 2})
+	a.Eq(err, nil, "should not error passing an input")
+
+	a.Eq(d, []float32{3, 3, 3}, "array should be equal")
+
+}
 func TestDefOp(t *testing.T) {
 	a := assert.A(t)
 	f := flow.New()
 
 	var err error
 	_, err = f.DefOp("2", "vecadd", []float32{1, 1, 1}, []float32{2, 2, 2}) // r:3 3 3
-	a.Eq(err, nil, "doing DefOp")
+	a.Eq(err, nil, fmt.Sprintf("doing DefOp\n%v", f))
 
-	_, err = f.DefOp("1", "vecadd", []float32{1, 2, 3}, f.Res("2")) // r: 4 5 6
+	_, err = f.DefOp("1", "vecadd", []float32{1, 2, 3}, f.GetOp("2")) // r: 4 5 6
 	a.Eq(err, nil, "doing DefOp")
 
-	op, err := f.Op("vecmul", f.Res("1"), []float32{2, 2, 2}) //r:8 10 12
+	op, err := f.Op("vecmul", f.GetOp("1"), []float32{2, 2, 2}) //r:8 10 12
 	a.Eq(err, nil, "mul operation")
 	a.NotEq(op, nil, "operation not nil")
 
 	desired := []float32{8, 10, 12}
 	res, _ := op.Process()
-	a.Eq(desired, res, "vector result should match")
+	a.Eq(res, desired, fmt.Sprintf("vector result should match:\n%v", f))
 
 	op, err = f.DefOp("123", "none")
 	a.NotEq(err, nil, "Error should not be nil")
@@ -84,10 +100,10 @@ func TestSerialize(t *testing.T) {
 			f.Must(f.Const([]float32{2, 2, 2, 2})),
 		)),
 	)
-	mul1, _ := f.Op("vecmul", c1, op1)       // op:2 - expected 12, 32, 60, 0
-	mul2, _ := f.Op("vecmul", mul1, var1)    // op:3 - expected 48, 128, 240, 0
-	mul3, _ := f.Op("vecmul", c2, mul2)      // op:4 - expected 96, 256, 480, 0
-	mul4, _ := f.Op("vecmul", mul3, f.In(0)) // op:5 - expected 96, 512, 1440,0
+	mul1, _ := f.Op("vecmul", c1, op1)               // op:2 - expected 12, 32, 60, 0
+	mul2, _ := f.Op("vecmul", mul1, var1)            // op:3 - expected 48, 128, 240, 0
+	mul3, _ := f.Op("vecmul", c2, mul2)              // op:4 - expected 96, 256, 480, 0
+	mul4, _ := f.Op("vecmul", mul3, f.Must(f.In(0))) // op:5 - expected 96, 512, 1440,0
 
 	s := bytes.NewBuffer(nil)
 	f.Analyse(s, []float32{1, 2, 3, 4})
@@ -99,8 +115,11 @@ func TestSerialize(t *testing.T) {
 
 	ret := bytes.NewBuffer(nil)
 	e := json.NewEncoder(ret)
+	e.SetIndent(" ", " ")
 	e.Encode(f)
 
+	// Deserialize
+
 	t.Log("Flow:", ret)
 
 }

+ 128 - 176
go/src/flow/operation.go

@@ -12,208 +12,160 @@ import (
 	"sync"
 )
 
-// OpCtx operation Context
-type OpCtx = *sync.Map
-
-// NewOpCtx creates a running context
-func newOpCtx() OpCtx {
-	return &sync.Map{}
-}
-
-// dumbSet
-func dumbSet(params ...Data) {}
-
 // Operation interface
 type Operation interface { // Id perhaps?
 	ID() string
-	Set(inputs ...Data) // Special var method
+	Set(input Data) // Special var method
 	Process(params ...Data) (Data, error)
 }
 
-// Run Context actually not OpCTX
-
-//local operation information
+// Will be named operation
 type operation struct {
-	flow    *Flow
-	id      interface{} // Interface key
-	kind    string
-	set     func(params ...Data)
-	process func(ctx OpCtx, params ...Data) (Data, error)
+	sync.Mutex
+	flow     *Flow
+	id       string
+	name     string
+	kind     string
+	inputs   []*operation // still figuring, might be Operation
+	setter   func(Data)
+	executor interface{}
 }
 
-// Id returns string Id of the operaton
-func (o *operation) ID() string {
-	return fmt.Sprint(o.id)
-}
-
-// Process operation process wrapper
-func (o *operation) Process(params ...Data) (Data, error) {
-	return o.processWithCtx(newOpCtx(), params...)
-}
+// OpCtx operation Context
+type OpCtx = *sync.Map
 
-// Every single one is run with this internally
-func (o *operation) processWithCtx(ctx OpCtx, params ...Data) (Data, error) {
-	return o.process(ctx, params...)
+// NewOpCtx creates a running context
+func newOpCtx() OpCtx {
+	return &sync.Map{}
 }
 
-// Set setter for certain operations (Var)
-func (o *operation) Set(params ...Data) {
-	o.set(params...)
-}
+func (o *operation) ID() string { return o.id }
 
-//////////////////////////////////////
-// Operators definition
-///////////////
-
-func opIn(f *Flow, id int) *operation {
-	return &operation{
-		flow: f,
-		id:   id,
-		kind: "in",
-		set:  dumbSet,
-		process: func(ctx OpCtx, params ...Data) (Data, error) {
-			if id >= len(params) || id < 0 {
-				return nil, errors.New("invalid input")
-			}
-			return params[id], nil
-		},
-	}
+func (o *operation) Process(params ...Data) (Data, error) {
+	// Create CTX
+	ctx := newOpCtx()
+	return executeOP(o.flow, o.id, ctx, params...)
 }
-func opConst(f *Flow, id string) *operation {
-	return &operation{
-		flow: f,
-		id:   id,
-		kind: "const",
-		set:  dumbSet,
-		process: func(ctx OpCtx, params ...Data) (Data, error) {
-			ret := f.consts[id]
-			return ret, nil
-		},
+func (o *operation) Set(data Data) {
+	if o.setter != nil {
+		o.setter(data)
 	}
 }
 
-// Debug type
-func opFunc(f *Flow, id string) *operation {
-	return &operation{
-		flow: f,
-		id:   id,
-		kind: "func",
-		set:  dumbSet,
-		process: func(ctx OpCtx, params ...Data) (ret Data, err error) {
-
-			defer func() {
-				if r := recover(); r != nil {
-					err = fmt.Errorf("Panic: %v", r)
-					f.hooks.error(id, err)
-				}
-			}()
-
-			op, ok := f.getOp(id)
-			if !ok {
-				err = fmt.Errorf("invalid operation '%s'", id)
-				f.hooks.error(id, err)
-				return nil, err
-			}
+// make Executor for func
+
+// The function that executions an operation
+func executeOP(f *Flow, id string, ctx OpCtx, params ...Data) (ret Data, err error) {
+	defer func() {
+		if r := recover(); r != nil {
+			err = fmt.Errorf("%v", r)
+			f.hooks.error(id, err)
+		}
+	}()
+
+	op, ok := f.getOp(id)
+	if !ok {
+		err = fmt.Errorf("invalid operation '%s'", id)
+		f.hooks.error(id, err)
+		return nil, err
+	}
+	// Simple data returner
+	switch fn := op.executor.(type) {
+	case func() Data:
+		f.hooks.start(id)
+		res := fn()
+		f.hooks.finish(id, res)
+		return res, nil
+	case func(...Data) Data:
+		f.hooks.start(id)
+		res := fn(params...)
+		f.hooks.finish(id, res)
+		return res, nil
 
-			/*if f.err != nil {
-				return nil
-			}*/
-			op.Lock()
-			defer op.Unlock()
-			if ctx != nil {
-				if v, ok := ctx.Load(id); ok { // Cache
-					return v, nil
-				}
-			}
+	}
+	// Input param
+
+	/*if f.err != nil {
+		return nil
+	}*/
+	op.Lock()
+	defer op.Unlock()
+	if ctx != nil {
+		if v, ok := ctx.Load(id); ok { // Cache
+			return v, nil
+		}
+	}
 
-			// Check inputs
-			fnval := reflect.ValueOf(op.executor)
-			if fnval.Type().NumIn() != len(op.inputs) {
-				err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
-				f.hooks.error(id, err)
-				return nil, err
-			}
-			/////////////////////////////
-			// NEW PARALLEL PROCESSING
-			///////////
-			f.hooks.wait(id)
-
-			callParam := make([]reflect.Value, len(op.inputs))
-
-			// Parallel processing if inputs
-			wg := sync.WaitGroup{}
-			wg.Add(len(op.inputs))
-			for i, in := range op.inputs {
-				go func(i int, in *operation) {
-					defer wg.Done()
-					fr, err := in.processWithCtx(ctx, params...)
-					if err != nil {
-						callParam[i] = reflect.Value{}
-						return
-					}
-
-					if fr == nil {
-						callParam[i] = reflect.Zero(fnval.Type().In(i))
-						return
-					}
-
-					callParam[i] = reflect.ValueOf(fr)
-				}(i, in)
-			}
-			wg.Wait()
-			// Return type checking
-			errMsg := ""
-			for i, p := range callParam {
-				if !p.IsValid() {
-					//callParam[i] = reflect.Zero(fnval.Type().In(i))
-					errMsg += fmt.Sprintf("Input %d invalid\n", i)
-				} else if !p.Type().AssignableTo(fnval.Type().In(i)) {
-					errMsg += fmt.Sprintf("Input %d type mismatch expected: %v got :%v\n", i, fnval.Type().In(i), p.Type())
-				}
-			}
-			if len(errMsg) > 0 {
-				err := errors.New(errMsg)
-				f.hooks.error(id, err)
-				return nil, err
+	// Check inputs
+	fnval := reflect.ValueOf(op.executor)
+	if fnval.Type().NumIn() != len(op.inputs) {
+		err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
+		f.hooks.error(id, err)
+		return nil, err
+	}
+	/////////////////////////////
+	// NEW PARALLEL PROCESSING
+	///////////
+	f.hooks.wait(id)
+
+	callParam := make([]reflect.Value, len(op.inputs))
+
+	// Parallel processing if inputs
+	wg := sync.WaitGroup{}
+	wg.Add(len(op.inputs))
+	for i, in := range op.inputs {
+		go func(i int, in *operation) {
+			defer wg.Done()
+			fr, err := executeOP(f, in.id, ctx, params...)
+			if err != nil {
+				callParam[i] = reflect.Value{}
+				return
 			}
 
-			f.hooks.start(id)
-
-			fnret := fnval.Call(callParam)
-			if len(fnret) > 1 && (fnret[1].Interface() != nil) {
-				err, ok := fnret[1].Interface().(error)
-				if !ok {
-					err = errors.New("unknown error")
-				}
-				f.hooks.error(id, err)
-				return nil, err
+			if fr == nil {
+				callParam[i] = reflect.Zero(fnval.Type().In(i))
+				return
 			}
 
-			// THE RESULT
-			ret = fnret[0].Interface()
-			if ctx != nil {
-				ctx.Store(id, ret)
-			}
-			f.hooks.finish(id, ret)
-			return ret, nil
-		},
+			callParam[i] = reflect.ValueOf(fr)
+		}(i, in)
+	}
+	wg.Wait()
+	// Return type error checking
+	//
+	errMsg := ""
+	for i, p := range callParam {
+		if !p.IsValid() {
+			//callParam[i] = reflect.Zero(fnval.Type().In(i))
+			errMsg += fmt.Sprintf("Input %d invalid\n", i)
+		} else if !p.Type().AssignableTo(fnval.Type().In(i)) {
+			errMsg += fmt.Sprintf("Input %d type mismatch expected: %v got :%v\n", i, fnval.Type().In(i), p.Type())
+		}
+	}
+	if len(errMsg) > 0 {
+		err := errors.New(errMsg)
+		f.hooks.error(id, err)
+		return nil, err
 	}
-}
 
-func opVar(f *Flow, id string) *operation {
-	return &operation{
-		flow:    f,
-		id:      id,
-		kind:    "var",
-		set:     func(params ...Data) { f.data[id] = params[0] },
-		process: func(ctx OpCtx, params ...Data) (Data, error) { return f.data[id], nil },
+	// The actual operation process
+	f.hooks.start(id)
+
+	fnret := fnval.Call(callParam)
+	if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) {
+		err, ok := fnret[1].Interface().(error)
+		if !ok {
+			err = errors.New("unknown error")
+		}
+		f.hooks.error(id, err)
+		return nil, err
 	}
-}
 
-func opNil(f *Flow) *operation {
-	return &operation{
-		flow:    f,
-		kind:    "nil",
-		process: func(ctx OpCtx, params ...Data) (Data, error) { return nil, errors.New("Nil operation") },
+	// THE RESULT
+	ret = fnret[0].Interface()
+	if ctx != nil {
+		ctx.Store(id, ret)
 	}
+	f.hooks.finish(id, ret)
+	return ret, nil
 }

+ 4 - 3
go/src/flowserver/flowbuilder.go

@@ -83,7 +83,8 @@ func (fd *FlowDocument) fetchLinkTo(ID string, n int) *Link {
 	return nil
 }
 
-var ErrLoop = errors.New("Looping through is disabled for now")
+//ErrLoop loop error
+var ErrLoop = errors.New("Looping is disabled for now")
 
 // FlowBuild build the graph based on an starting node
 func FlowBuild(rawData []byte, r *registry.R, startingID string) (*flow.Flow, error) {
@@ -129,7 +130,7 @@ func FlowBuild(rawData []byte, r *registry.R, startingID string) (*flow.Flow, er
 			}
 			op = f.In(inputID) // By id perhaps
 		case "Variable":
-			op = f.Var(node.ID, node.Prop["init"])
+			op = f.DefVar(node.ID, node.Label, node.Prop["init"])
 		case "Const":
 			raw := node.Label
 			val, err := parseValue(nil, raw)
@@ -198,7 +199,7 @@ func FlowBuild(rawData []byte, r *registry.R, startingID string) (*flow.Flow, er
 						log.Println("Mismatching trigger, but its a test")
 					}
 
-					op := f.Res(t.To)   // Repeating
+					op := f.GetOp(t.To) // Repeating
 					go op.Process(name) // Background
 				},
 			})

+ 7 - 2
go/src/flowserver/session.go

@@ -243,13 +243,18 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 			},
 		})
 
-		op := s.flow.Res(ID)
+		op := s.flow.GetOp(ID)
+		if op == nil {
+			err = fmt.Errorf("Operation not found %v", ID)
+			return
+		}
 		_, err := op.Process()
 		if err != nil {
 			log.Println("error processing node", err)
 		}
 		s.flow = nil
-	}()
+	}() // End routine
+
 	return nil
 }