ソースを参照

backend improvements

luis 7 年 前
コミット
91ba20c097

+ 5 - 127
go/src/flow/flow.go

@@ -13,7 +13,6 @@ import (
 
 // Data interface
 type Data = interface{}
-type executorFunc func(...Data) Data
 
 // Flow structure
 // We could Create a single array of operations
@@ -72,127 +71,6 @@ func (f *Flow) Must(op Operation, err error) Operation {
 	return deferred(f, id)
 }*/
 
-// DefOp Manual tag an Operation
-func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation, error) {
-	inputs := make([]*operation, len(params))
-	for i, p := range params {
-		switch v := p.(type) {
-		case *operation:
-			inputs[i] = v
-		default:
-			c, err := f.Const(v)
-			if err != nil {
-				return nil, err
-			}
-			inputs[i], _ = c.(*operation)
-		}
-	}
-	// Grab executor here
-	executor, err := f.registry.Get(name)
-	if err != nil {
-		return nil, err
-	}
-	opEntry := &operation{
-		Mutex:    sync.Mutex{},
-		id:       id,
-		flow:     f,
-		name:     name,
-		kind:     "func",
-		inputs:   inputs,
-		setter:   nil, // No set
-		executor: executor,
-	}
-	f.operations.Store(id, opEntry)
-
-	return opEntry, nil
-}
-
-// DefErrOp define a nil operation that will return error
-// Usefull for builders
-func (f *Flow) DefErrOp(id string, err error) (Operation, error) {
-	executor := func() error { return err }
-	opEntry := &operation{
-		Mutex:    sync.Mutex{},
-		id:       id,
-		flow:     f,
-		name:     fmt.Sprintf("(error)<%v>", err),
-		kind:     "error",
-		inputs:   nil,
-		setter:   nil,
-		executor: executor,
-	}
-	f.operations.Store(id, opEntry)
-	return opEntry, nil
-}
-
-// DefConst define a const by defined ID
-func (f *Flow) DefConst(id string, value Data) (Operation, error) {
-	// Optimize this definition
-	f.consts[id] = value
-	executor := func() Data { return f.consts[id] }
-	opEntry := &operation{
-		id:       id,
-		Mutex:    sync.Mutex{},
-		flow:     f,
-		name:     fmt.Sprintf("(const)<%s>", id),
-		kind:     "const",
-		inputs:   nil,
-		setter:   nil,
-		executor: executor,
-	}
-	f.operations.Store(id, opEntry)
-
-	return opEntry, nil
-}
-
-// DefIn flow input operator
-//  paramID - index of the parameter
-func (f *Flow) DefIn(id string, paramID int) Operation {
-	executor := func(params ...Data) Data {
-		return params[paramID]
-	}
-	opEntry := &operation{
-		id:       id,
-		Mutex:    sync.Mutex{},
-		flow:     f,
-		name:     fmt.Sprintf("(in)<%s>", id),
-		kind:     "in",
-		inputs:   nil,
-		setter:   nil,
-		executor: executor,
-	}
-	f.operations.Store(id, opEntry)
-	//return opIn(f, paramID)
-	return opEntry
-}
-
-// DefVar define var operation with optional initial
-func (f *Flow) DefVar(id string, name string, initial ...Data) Operation {
-	// Unique
-	if _, ok := f.data[name]; !ok {
-		var v interface{}
-		if len(initial) > 0 {
-			v = initial[0]
-		}
-		f.data[name] = v
-	}
-	setter := func(v Data) { f.data[name] = v }
-	executor := func() Data { return f.data[name] }
-
-	opEntry := &operation{
-		Mutex:    sync.Mutex{},
-		id:       id,
-		flow:     f,
-		name:     fmt.Sprintf("(var)<%s>", name),
-		kind:     "var",
-		inputs:   nil,
-		setter:   setter,
-		executor: executor,
-	}
-	f.operations.Store(id, opEntry)
-	return opEntry
-}
-
 // Auto ID generation
 
 // Op return an function operator
@@ -322,7 +200,7 @@ func (f *Flow) String() string {
 		fmt.Fprintf(ret, "  [%v] %v\n", k, v)
 	}
 
-	fmt.Fprintf(ret, "funcs:\n")
+	fmt.Fprintf(ret, "operations:\n")
 	f.operations.Range(func(pk, pv interface{}) bool {
 		k, v := pk.(string), pv.(*operation)
 
@@ -420,22 +298,22 @@ func (f *Flow) addEntry(entry *operation) (string, error) {
 /////////////////
 // Async data
 /////
-func (f *Flow) getOp(id string) (*operation, bool) {
+/*func (f *Flow) getOp(id string) (*operation, bool) {
 
 	o, ok := f.operations.Load(id)
 	if !ok {
 		return nil, false
 	}
 	return o.(*operation), true
-}
+}*/
 
 // GetOp Return an existing operation or return notfound error
-func (f *Flow) GetOp(id string) Operation {
+func (f *Flow) GetOp(id string) *operation {
 	op, ok := f.operations.Load(id)
 	if !ok {
 		return nil
 	}
-	return op.(Operation)
+	return op.(*operation)
 }
 
 //////////////////////////////////////////////

+ 248 - 21
go/src/flow/operation.go

@@ -12,6 +12,8 @@ import (
 	"sync"
 )
 
+type executorFunc func(OpCtx, ...Data) (Data, error)
+
 // Operation interface
 type Operation interface { // Id perhaps?
 	ID() string
@@ -28,7 +30,7 @@ type operation struct {
 	kind     string
 	inputs   []*operation // still figuring, might be Operation
 	setter   func(Data)
-	executor interface{}
+	executor executorFunc
 }
 
 // OpCtx operation Context
@@ -44,7 +46,9 @@ func (o *operation) ID() string { return o.id }
 func (o *operation) Process(params ...Data) (Data, error) {
 	// Create CTX
 	ctx := newOpCtx()
-	return executeOP(o.flow, o.id, ctx, params...)
+	return o.executor(ctx, params...)
+
+	//return executeOP(o.flow, o.id, ctx, params...)
 }
 func (o *operation) Set(data Data) {
 	if o.setter != nil {
@@ -55,7 +59,7 @@ func (o *operation) Set(data Data) {
 // make Executor for func
 
 // The function that executions an operation
-func executeOP(f *Flow, id string, ctx OpCtx, params ...Data) (ret Data, err error) {
+/*func executeOP(f *Flow, id string, ctx OpCtx, params ...Data) (ret Data, err error) {
 	defer func() {
 		if r := recover(); r != nil {
 			err = fmt.Errorf("%v", r)
@@ -69,25 +73,8 @@ func executeOP(f *Flow, id string, ctx OpCtx, params ...Data) (ret Data, err err
 		f.hooks.error(id, err)
 		return nil, err
 	}
-	// Simple data returner
-	switch fn := op.executor.(type) {
-	case func() Data:
-		f.hooks.start(id)
-		res := fn()
-		f.hooks.finish(id, res)
-		return res, nil
-	case func(...Data) Data:
-		f.hooks.start(id)
-		res := fn(params...)
-		f.hooks.finish(id, res)
-		return res, nil
-
-	}
-	// Input param
+		// Input param
 
-	/*if f.err != nil {
-		return nil
-	}*/
 	op.Lock()
 	defer op.Unlock()
 	if ctx != nil {
@@ -168,4 +155,244 @@ func executeOP(f *Flow, id string, ctx OpCtx, params ...Data) (ret Data, err err
 	}
 	f.hooks.finish(id, ret)
 	return ret, nil
+}*/
+func (f *Flow) makeTrigger(id string, fn executorFunc) executorFunc {
+	return func(ctx OpCtx, params ...Data) (Data, error) {
+		f.hooks.start(id)
+		d, err := fn(ctx, params)
+		if err != nil {
+			f.hooks.error(id, err)
+			return d, err
+		}
+		f.hooks.finish(id, d)
+		return d, err
+	}
+}
+
+func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
+	return func(ctx OpCtx, params ...Data) (Data, error) {
+		var err error
+		defer func() {
+			if r := recover(); r != nil {
+				err = fmt.Errorf("%v", r)
+				f.hooks.error(id, err)
+			}
+		}()
+
+		// Should not need this
+		op := f.GetOp(id)
+		if op == nil {
+			err = fmt.Errorf("invalid operation '%s'", id)
+			f.hooks.error(id, err)
+			return nil, err
+		}
+		op.Lock()
+		defer op.Unlock()
+		if ctx != nil {
+			if v, ok := ctx.Load(id); ok { // Cache
+				return v, nil
+			}
+		}
+
+		// Check inputs
+		fnval := reflect.ValueOf(fn)
+		if fnval.Type().NumIn() != len(op.inputs) {
+			err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
+			f.hooks.error(id, err)
+			return nil, err
+		}
+		/////////////////////////////
+		// NEW PARALLEL PROCESSING
+		///////////
+		//f.hooks.wait(id)
+		f.hooks.wait(id)
+
+		callParam := make([]reflect.Value, len(op.inputs))
+		callErrors := ""
+		paramMutex := sync.Mutex{}
+		// Parallel processing if inputs
+		wg := sync.WaitGroup{}
+		wg.Add(len(op.inputs))
+		for i, in := range op.inputs {
+			go func(i int, in *operation) {
+				defer wg.Done()
+				// Sub function with type checking
+				//Check the ctx
+				fr, err := in.executor(ctx, params...)
+				if err != nil {
+					paramMutex.Lock()
+					callErrors += err.Error() + "\n"
+					paramMutex.Unlock()
+					return
+				}
+				if fr == nil {
+					callParam[i] = reflect.Zero(fnval.Type().In(i))
+				}
+				res := reflect.ValueOf(fr)
+				if !res.IsValid() {
+					paramMutex.Lock()
+					callErrors += fmt.Sprintf("Input %d invalid\n", i)
+					paramMutex.Unlock()
+					return
+				} else if !res.Type().ConvertibleTo(fnval.Type().In(i)) {
+
+					if fnval.Type().In(i).Kind() == reflect.String {
+						callParam[i] = reflect.ValueOf(fmt.Sprint(res.Interface()))
+						return
+					}
+					paramMutex.Lock()
+					callErrors += fmt.Sprintf("Input %d type: %v cannot be converted to %v\n", i, res.Type(), fnval.Type().In(i))
+					paramMutex.Unlock()
+					return
+				}
+				// can convert too?
+
+				// CheckError and safelly append
+				callParam[i] = res.Convert(fnval.Type().In(i))
+			}(i, in)
+		}
+		wg.Wait()
+		// Return type error checking
+		if len(callErrors) > 0 {
+			err := errors.New(callErrors)
+			f.hooks.error(id, err)
+			return nil, err
+		}
+
+		// The actual operation process
+		f.hooks.start(id)
+
+		fnret := fnval.Call(callParam)
+		if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) {
+			err, ok := fnret[1].Interface().(error)
+			if !ok {
+				err = errors.New("unknown error")
+			}
+			f.hooks.error(id, err)
+			return nil, err
+		}
+
+		// THE RESULT
+		ret := fnret[0].Interface()
+		if ctx != nil {
+			ctx.Store(id, ret)
+		}
+		f.hooks.finish(id, ret)
+		return ret, nil
+	}
+}
+
+// DefVar define var operation with optional initial
+func (f *Flow) DefVar(id string, name string, initial ...Data) Operation {
+	// Unique
+	if _, ok := f.data[name]; !ok {
+		var v interface{}
+		if len(initial) > 0 {
+			v = initial[0]
+		}
+		f.data[name] = v
+	}
+	setter := func(v Data) { f.data[name] = v }
+
+	opEntry := &operation{
+		Mutex:    sync.Mutex{},
+		id:       id,
+		flow:     f,
+		name:     fmt.Sprintf("(var)<%s>", name),
+		kind:     "var",
+		inputs:   nil,
+		setter:   setter,
+		executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { return f.data[name], nil }),
+	}
+	f.operations.Store(id, opEntry)
+	return opEntry
+}
+
+// DefOp Manual tag an Operation
+func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation, error) {
+	inputs := make([]*operation, len(params))
+	for i, p := range params {
+		switch v := p.(type) {
+		case *operation:
+			inputs[i] = v
+		default:
+			c, err := f.Const(v)
+			if err != nil {
+				return nil, err
+			}
+			inputs[i], _ = c.(*operation)
+		}
+	}
+	// Grab executor here
+	registryFn, err := f.registry.Get(name)
+	if err != nil {
+		return nil, err
+	}
+	executor := f.makeExecutor(id, registryFn)
+	op := &operation{
+		Mutex:    sync.Mutex{},
+		id:       id,
+		flow:     f,
+		name:     name,
+		kind:     "func",
+		inputs:   inputs,
+		setter:   nil, // No set
+		executor: executor,
+	}
+	f.operations.Store(id, op)
+
+	return op, nil
+}
+
+// DefErrOp define a nil operation that will return error
+// Usefull for builders
+func (f *Flow) DefErrOp(id string, err error) (Operation, error) {
+	op := &operation{
+		Mutex:    sync.Mutex{},
+		id:       id,
+		flow:     f,
+		name:     fmt.Sprintf("(error)<%v>", err),
+		kind:     "error",
+		inputs:   nil,
+		setter:   nil,
+		executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { return nil, err }),
+	}
+	f.operations.Store(id, op)
+	return op, nil
+}
+
+// DefConst define a const by defined ID
+func (f *Flow) DefConst(id string, value Data) (Operation, error) {
+	// Optimize this definition
+	f.consts[id] = value
+
+	op := &operation{
+		id:       id,
+		Mutex:    sync.Mutex{},
+		flow:     f,
+		name:     fmt.Sprintf("(const)<%s>", id),
+		kind:     "const",
+		inputs:   nil,
+		setter:   nil,
+		executor: f.makeTrigger(id, func(OpCtx, ...Data) (Data, error) { return f.consts[id], nil }),
+	}
+	f.operations.Store(id, op)
+
+	return op, nil
+}
+
+// DefIn define input operation
+func (f *Flow) DefIn(id string, paramID int) Operation {
+	op := &operation{
+		id:       id,
+		Mutex:    sync.Mutex{},
+		flow:     f,
+		name:     fmt.Sprintf("(in)<%s>", id),
+		kind:     "in",
+		inputs:   nil,
+		setter:   nil,
+		executor: f.makeTrigger(id, func(ctx OpCtx, params ...Data) (Data, error) { return params[paramID], nil }),
+	}
+	f.operations.Store(id, op)
+	return op
 }

+ 0 - 257
go/src/flowserver/flowbuilder.go

@@ -1,257 +0,0 @@
-package flowserver
-
-import (
-	"encoding/json"
-	"errors"
-	"flow"
-	"flow/registry"
-	"fmt"
-	"log"
-	"reflect"
-	"strconv"
-	"time"
-)
-
-// Node that will contain registry src
-type Node struct {
-	ID            string            `json:"id"`
-	Src           string            `json:"src"`
-	Label         string            `json:"label"`
-	DefaultInputs map[int]string    `json:"defaultInputs"`
-	Prop          map[string]string `json:"prop"`
-}
-
-// Link that joins two nodes
-type Link struct {
-	From string `json:"from"`
-	To   string `json:"to"`
-	In   int    `json:"in"`
-}
-
-// Trigger that join two nodes on state change
-type Trigger struct {
-	From string   `json:"from"`
-	To   string   `json:"to"`
-	On   []string `json:"on"`
-}
-
-// FlowDocument flow document
-type FlowDocument struct {
-	Nodes    []Node    `json:"nodes"`
-	Links    []Link    `json:"links"`
-	Triggers []Trigger `json:"triggers"`
-}
-
-func (fd *FlowDocument) fetchNodeByID(ID string) *Node {
-	for _, n := range fd.Nodes {
-		if n.ID == ID {
-			return &n
-		}
-	}
-	return nil
-}
-
-func (fd *FlowDocument) fetchTriggerFrom(ID string) []Trigger {
-	ret := []Trigger{}
-	for _, t := range fd.Triggers {
-		if t.From != ID {
-			continue
-		}
-		log.Println("Trigger is:", ID, "adding")
-		ret = append(ret, t)
-	}
-	return ret
-}
-
-func (fd *FlowDocument) fetchLinksTo(ID string) []Link {
-	ret := []Link{}
-	for _, l := range fd.Links {
-		if l.To != ID {
-			continue
-		}
-		ret = append(ret, l)
-	}
-	return ret
-}
-func (fd *FlowDocument) fetchLinkTo(ID string, n int) *Link {
-	for _, l := range fd.Links {
-		if l.To != ID || l.In != n {
-			continue
-		}
-		return &l
-	}
-	return nil
-}
-
-//ErrLoop loop error
-var ErrLoop = errors.New("Looping is disabled for now")
-
-// FlowBuild build the graph based on an starting node
-func FlowBuild(rawData []byte, r *registry.R, startingID string) (*flow.Flow, error) {
-
-	doc := FlowDocument{[]Node{}, []Link{}, []Trigger{}}
-	log.Println("Building flowDocument from:", string(rawData))
-	err := json.Unmarshal(rawData, &doc)
-	if err != nil {
-		return nil, err
-	}
-
-	f := flow.New()
-	f.SetRegistry(r)
-
-	nodeTrack := map[string]bool{}
-
-	var build func(ID string) (flow.Operation, error)
-	build = func(ID string) (flow.Operation, error) {
-		if _, ok := nodeTrack[ID]; ok {
-			return nil, ErrLoop //fmt.Errorf("[%v] Looping through nodes is disabled:", ID)
-		}
-		nodeTrack[ID] = true
-		defer delete(nodeTrack, ID)
-
-		// If flow already has ID just return
-		if op := f.GetOp(ID); op != nil {
-			log.Println("Return operation")
-			return op, nil
-		}
-
-		node := doc.fetchNodeByID(ID)
-		if node == nil {
-			return nil, fmt.Errorf("node not found [%v]", startingID)
-		}
-
-		var op flow.Operation
-
-		switch node.Src {
-		case "Input":
-			inputID, err := strconv.Atoi(node.Prop["input"])
-			if err != nil {
-				return nil, errors.New("Invalid inputID value, must be a number")
-			}
-			op = f.In(inputID) // By id perhaps
-		case "Variable":
-			op = f.DefVar(node.ID, node.Label, node.Prop["init"])
-		case "Const":
-			raw := node.Label
-			val, err := parseValue(nil, raw)
-			if err != nil {
-				op, _ = f.DefErrOp(node.ID, err)
-			} else {
-				op, _ = f.DefConst(node.ID, val)
-			}
-		default:
-			// Load entry
-			entry, err := r.Entry(node.Src)
-			if err != nil {
-				return nil, err
-			}
-			//// Process inputs ////
-			param := make([]flow.Data, len(entry.Inputs))
-			for i := range param {
-				l := doc.fetchLinkTo(node.ID, i)
-				if l == nil {
-					// Const value
-					v, err := parseValue(entry.Inputs[i], node.DefaultInputs[i])
-					if err != nil {
-						param[i], _ = f.ErrOp(err)
-						continue
-					}
-					param[i] = v
-					continue
-				}
-				inOp, err := build(l.From)
-				if err != nil {
-					return nil, err
-				}
-				param[i] = inOp
-			}
-			op, err = f.DefOp(node.ID, node.Src, param...)
-			if err != nil {
-				return nil, err
-			}
-		}
-
-		// Process triggers for this node
-		triggers := doc.fetchTriggerFrom(node.ID)
-		for _, t := range triggers {
-			_, err := build(t.To)
-			if err != nil {
-				return nil, err
-			}
-			// Register the thing here
-			f.Hook(flow.Hook{
-				Any: func(name string, ID string, triggerTime time.Time, extra ...interface{}) {
-					if name != "Error" && name != "Finish" {
-						return
-					}
-					if ID != t.From {
-						log.Printf("ID[%v] triggered [%v], I'm t.From: %v", ID, name, t.From)
-						return
-					}
-					exec := false
-					for _, o := range t.On {
-						if name == o {
-							exec = true
-							break
-						}
-					}
-					if !exec {
-						log.Println("Mismatching trigger, but its a test")
-					}
-
-					op := f.GetOp(t.To) // Repeating
-					go op.Process(name) // Background
-				},
-			})
-
-		}
-		return op, nil
-	}
-
-	_, err = build(startingID)
-	if err != nil {
-		log.Println("Error building")
-		return nil, err
-	}
-
-	log.Println("Flow:", f)
-	return f, nil
-}
-
-// Or give a string
-func parseValue(typ reflect.Type, raw string) (flow.Data, error) {
-	if typ == nil {
-		var val flow.Data
-		err := json.Unmarshal([]byte(raw), &val)
-		if err != nil { // Try to unmarshal as a string?
-			val = string(raw)
-		}
-		return val, nil
-	}
-
-	var ret flow.Data
-	switch typ.Kind() {
-	case reflect.Int:
-		v, err := strconv.Atoi(raw)
-		if err != nil {
-			log.Println("Wrong int conversion", err)
-			return nil, err
-		}
-		ret = v
-	case reflect.String:
-		ret = raw
-	default:
-		if len(raw) == 0 {
-			ret = reflect.Zero(typ)
-		} else {
-			refVal := reflect.New(typ)
-			err := json.Unmarshal([]byte(raw), refVal.Interface())
-			if err != nil {
-				return nil, err
-			}
-			ret = refVal.Interface()
-		}
-	}
-	log.Printf("Returning %#v", ret)
-	return ret, nil
-}

+ 340 - 0
go/src/flowserver/flowbuilder/builder.go

@@ -0,0 +1,340 @@
+package flowbuilder
+
+import (
+	"encoding/json"
+	"errors"
+	"flow"
+	"flow/registry"
+	"fmt"
+	"log"
+	"reflect"
+	"strconv"
+	"time"
+)
+
+// ErrLoop loop error
+var ErrLoop = errors.New("Looping is disabled for now")
+
+// FlowBuilder builds a flow from flow-ui json data
+type FlowBuilder struct {
+	registry  *registry.R
+	doc       *FlowDocument
+	flow      *flow.Flow
+	nodeTrack map[string]bool
+	Err       error
+}
+
+// New creates a New builder
+func New(r *registry.R) *FlowBuilder {
+	return &FlowBuilder{
+		registry:  r,
+		nodeTrack: map[string]bool{},
+	}
+}
+
+// Load document from json into builder
+func (fb *FlowBuilder) Load(rawData []byte) *FlowBuilder {
+	doc := &FlowDocument{[]Node{}, []Link{}, []Trigger{}}
+	log.Println("Loading document from:", string(rawData))
+	err := json.Unmarshal(rawData, doc)
+	if err != nil {
+		fb.Err = err
+		return fb
+	}
+
+	fb.doc = doc
+	fb.flow = flow.New()
+	fb.flow.SetRegistry(fb.registry)
+
+	return fb
+}
+
+// Build a flow starting from node
+func (fb *FlowBuilder) Build(ID string) flow.Operation {
+	if fb.Err != nil {
+		op, _ := fb.flow.DefErrOp(ID, fb.Err)
+		return op
+	}
+	f := fb.flow
+	r := fb.registry
+	doc := fb.doc
+
+	if _, ok := fb.nodeTrack[ID]; ok {
+		fb.Err = ErrLoop //fmt.Errorf("[%v] Looping through nodes is disabled:", ID)
+		op, _ := fb.flow.DefErrOp(ID, fb.Err)
+		return op
+	}
+	fb.nodeTrack[ID] = true
+	defer delete(fb.nodeTrack, ID)
+
+	// If flow already has ID just return
+	if op := f.GetOp(ID); op != nil {
+		return op
+	}
+
+	node := fb.doc.fetchNodeByID(ID)
+	if node == nil {
+		op, _ := fb.flow.DefErrOp(ID, fmt.Errorf("node not found [%v]", ID))
+		return op
+	}
+
+	var op flow.Operation
+
+	switch node.Src {
+	case "Input":
+		inputID, err := strconv.Atoi(node.Prop["input"])
+		if err != nil {
+			op, _ = f.DefErrOp(node.ID, errors.New("Invalid inputID value, must be a number"))
+		} else {
+			op, _ = f.In(inputID) // By id perhaps
+		}
+	case "Variable":
+		raw := node.Prop["init"]
+		val, err := parseValue(nil, raw)
+		if err != nil {
+			op, _ = f.DefErrOp(node.ID, err)
+		} else {
+			op = f.DefVar(node.ID, node.Label, val)
+		}
+	case "Const":
+		raw := node.Label
+		val, err := parseValue(nil, raw)
+		if err != nil {
+			op, _ = f.DefErrOp(node.ID, err)
+		} else {
+			op, _ = f.DefConst(node.ID, val)
+		}
+	default:
+		// Load entry
+		entry, err := r.Entry(node.Src)
+		if err != nil {
+			op, _ = f.DefErrOp(node.ID, err)
+		}
+		//// Process inputs ////
+		param := make([]flow.Data, len(entry.Inputs))
+		for i := range param {
+			l := doc.fetchLinkTo(node.ID, i)
+			if l == nil {
+				// Const value
+				v, err := parseValue(entry.Inputs[i], node.DefaultInputs[i])
+				if err != nil {
+					param[i], _ = f.ErrOp(err)
+					continue
+				}
+				param[i] = v
+				continue
+			}
+			param[i] = fb.Build(l.From)
+		}
+		op, err = f.DefOp(node.ID, node.Src, param...)
+		if err != nil {
+			op, _ := f.DefErrOp(node.ID, err)
+			return op
+		}
+	}
+
+	return op
+}
+
+func (fb *FlowBuilder) addTriggersTo(node Node) error {
+	// Process triggers for this node
+	triggers := fb.doc.fetchTriggerFrom(node.ID)
+	for _, t := range triggers {
+		op := fb.Build(t.To)
+		// Register the thing here
+		fb.flow.Hook(flow.Hook{
+			Any: func(name string, ID string, triggerTime time.Time, extra ...interface{}) {
+				if name != "Error" && name != "Finish" {
+					return
+				}
+				if ID != t.From {
+					log.Printf("ID[%v] triggered [%v], I'm t.From: %v", ID, name, t.From)
+					return
+				}
+				exec := false
+				for _, o := range t.On {
+					if name == o {
+						exec = true
+						break
+					}
+				}
+				if !exec {
+					log.Println("Mismatching trigger, but its a test")
+				}
+
+				//op := opfb.flow.GetOp(t.To) // Repeating
+				go op.Process(name) // Background
+			},
+		})
+	}
+	return nil
+}
+
+// Flow returns the build flow
+func (fb *FlowBuilder) Flow() *flow.Flow {
+	return fb.flow
+}
+
+// FlowBuild build the graph based on an starting node
+/*func FlowBuild(rawData []byte, r *registry.R, startingID string) (*flow.Flow, error) {
+
+	var build func(ID string) (flow.Operation, error)
+	build = func(ID string) (flow.Operation, error) {
+		if _, ok := nodeTrack[ID]; ok {
+			return nil, ErrLoop //fmt.Errorf("[%v] Looping through nodes is disabled:", ID)
+		}
+		nodeTrack[ID] = true
+		defer delete(nodeTrack, ID)
+
+		// If flow already has ID just return
+		if op := f.GetOp(ID); op != nil {
+			log.Println("Return operation")
+			return op, nil
+		}
+
+		node := doc.fetchNodeByID(ID)
+		if node == nil {
+			return nil, fmt.Errorf("node not found [%v]", startingID)
+		}
+
+		var op flow.Operation
+
+		switch node.Src {
+		case "Input":
+			inputID, err := strconv.Atoi(node.Prop["input"])
+			if err != nil {
+				return nil, errors.New("Invalid inputID value, must be a number")
+			}
+			op, _ = f.In(inputID) // By id perhaps
+		case "Variable":
+			raw := node.Prop["init"]
+			val, err := parseValue(nil, raw)
+			if err != nil {
+				op, _ = f.DefErrOp(node.ID, err)
+			} else {
+				op = f.DefVar(node.ID, node.Label, val)
+			}
+		case "Const":
+			raw := node.Label
+			val, err := parseValue(nil, raw)
+			if err != nil {
+				op, _ = f.DefErrOp(node.ID, err)
+			} else {
+				op, _ = f.DefConst(node.ID, val)
+			}
+		default:
+			// Load entry
+			entry, err := r.Entry(node.Src)
+			if err != nil {
+				return nil, err
+			}
+			//// Process inputs ////
+			param := make([]flow.Data, len(entry.Inputs))
+			for i := range param {
+				l := doc.fetchLinkTo(node.ID, i)
+				if l == nil {
+					// Const value
+					v, err := parseValue(entry.Inputs[i], node.DefaultInputs[i])
+					if err != nil {
+						param[i], _ = f.ErrOp(err)
+						continue
+					}
+					param[i] = v
+					continue
+				}
+				inOp, err := build(l.From)
+				if err != nil {
+					return nil, err
+				}
+				param[i] = inOp
+			}
+			op, err = f.DefOp(node.ID, node.Src, param...)
+			if err != nil {
+				return nil, err
+			}
+		}
+
+		// Process triggers for this node
+		triggers := doc.fetchTriggerFrom(node.ID)
+		for _, t := range triggers {
+			_, err := build(t.To)
+			if err != nil {
+				return nil, err
+			}
+			// Register the thing here
+			f.Hook(flow.Hook{
+				Any: func(name string, ID string, triggerTime time.Time, extra ...interface{}) {
+					if name != "Error" && name != "Finish" {
+						return
+					}
+					if ID != t.From {
+						log.Printf("ID[%v] triggered [%v], I'm t.From: %v", ID, name, t.From)
+						return
+					}
+					exec := false
+					for _, o := range t.On {
+						if name == o {
+							exec = true
+							break
+						}
+					}
+					if !exec {
+						log.Println("Mismatching trigger, but its a test")
+					}
+
+					op := f.GetOp(t.To) // Repeating
+					go op.Process(name) // Background
+				},
+			})
+
+		}
+		return op, nil
+	}
+
+	_, err = build(startingID)
+	if err != nil {
+		log.Println("Error building")
+		return nil, err
+	}
+
+	log.Println("Flow:", f)
+	return f, nil
+}*/
+
+// Or give a string
+func parseValue(typ reflect.Type, raw string) (flow.Data, error) {
+	if typ == nil {
+		var val flow.Data
+		err := json.Unmarshal([]byte(raw), &val)
+		if err != nil { // Try to unmarshal as a string?
+			val = string(raw)
+		}
+		return val, nil
+	}
+
+	var ret flow.Data
+	switch typ.Kind() {
+	case reflect.Int:
+		v, err := strconv.Atoi(raw)
+		if err != nil {
+			log.Println("Wrong int conversion", err)
+			return nil, err
+		}
+		ret = v
+	case reflect.String:
+		ret = raw
+	default:
+		if len(raw) == 0 {
+			ret = reflect.Zero(typ)
+		} else {
+			refVal := reflect.New(typ)
+			err := json.Unmarshal([]byte(raw), refVal.Interface())
+			if err != nil {
+				return nil, err
+			}
+			ret = refVal.Interface()
+		}
+	}
+	log.Printf("Returning %#v", ret)
+	return ret, nil
+}

+ 74 - 0
go/src/flowserver/flowbuilder/model.go

@@ -0,0 +1,74 @@
+package flowbuilder
+
+// Node that will contain registry src
+import "log"
+
+type Node struct {
+	ID            string            `json:"id"`
+	Src           string            `json:"src"`
+	Label         string            `json:"label"`
+	DefaultInputs map[int]string    `json:"defaultInputs"`
+	Prop          map[string]string `json:"prop"`
+}
+
+// Link that joins two nodes
+type Link struct {
+	From string `json:"from"`
+	To   string `json:"to"`
+	In   int    `json:"in"`
+}
+
+// Trigger that join two nodes on state change
+type Trigger struct {
+	From string   `json:"from"`
+	To   string   `json:"to"`
+	On   []string `json:"on"`
+}
+
+// FlowDocument flow document
+type FlowDocument struct {
+	Nodes    []Node    `json:"nodes"`
+	Links    []Link    `json:"links"`
+	Triggers []Trigger `json:"triggers"`
+}
+
+func (fd *FlowDocument) fetchNodeByID(ID string) *Node {
+	for _, n := range fd.Nodes {
+		if n.ID == ID {
+			return &n
+		}
+	}
+	return nil
+}
+
+func (fd *FlowDocument) fetchTriggerFrom(ID string) []Trigger {
+	ret := []Trigger{}
+	for _, t := range fd.Triggers {
+		if t.From != ID {
+			continue
+		}
+		log.Println("Trigger is:", ID, "adding")
+		ret = append(ret, t)
+	}
+	return ret
+}
+
+func (fd *FlowDocument) fetchLinksTo(ID string) []Link {
+	ret := []Link{}
+	for _, l := range fd.Links {
+		if l.To != ID {
+			continue
+		}
+		ret = append(ret, l)
+	}
+	return ret
+}
+func (fd *FlowDocument) fetchLinkTo(ID string, n int) *Link {
+	for _, l := range fd.Links {
+		if l.To != ID || l.In != n {
+			continue
+		}
+		return &l
+	}
+	return nil
+}

+ 24 - 15
go/src/flowserver/session.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"errors"
 	"flow"
+	"flowserver/flowbuilder"
 	"flowserver/flowmsg"
 	"fmt"
 	"io"
@@ -174,9 +175,9 @@ func (s *FlowSession) Document(c *websocket.Conn) error {
 // NodeRun a node triggering results
 // Build a flow and run
 func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
-	var err error
 	ID := string(data[1 : len(data)-1]) // remove " instead of unmarshalling json
 	if s.flow != nil {
+		s.Notify("a node is already running")
 		return errors.New("node already running")
 	}
 
@@ -184,25 +185,27 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 	s.nodeActivity = map[string]*NodeActivity{}
 	s.Broadcast(nil, flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
 
-	go func() {
+	build := func() error {
 		log.Printf("Building flow from '%s'\n", string(s.RawDoc))
 
-		localr := s.manager.registry.Clone()
+		localR := s.manager.registry.Clone()
 		//Add our log func that is not in global registry
-		localr.Register("Notify", func(v flow.Data, msg string) flow.Data {
+		localR.Register("Notify", func(v flow.Data, msg string) flow.Data {
 			s.Notify(msg)
 			return v
 		})
-		localr.Register("Log", func() io.Writer {
+		localR.Register("Log", func() io.Writer {
 			return s
 		})
 
-		s.flow, err = FlowBuild(s.RawDoc, localr, ID)
-		if err != nil {
-			s.Notify(fmt.Sprint("ERR:", err))
-			log.Println("Flow error:", err)
-			return
+		builder := flowbuilder.New(localR)
+		builder.Load(s.RawDoc).Build(ID)
+
+		if builder.Err != nil {
+			return builder.Err
 		}
+		s.flow = builder.Flow()
+
 		defer func() { // After routing gone
 			s.flow = nil
 		}()
@@ -245,15 +248,21 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 
 		op := s.flow.GetOp(ID)
 		if op == nil {
-			err = fmt.Errorf("Operation not found %v", ID)
-			return
+			return fmt.Errorf("Operation not found %v", ID)
 		}
 		_, err := op.Process()
 		if err != nil {
-			log.Println("error processing node", err)
+			return err
+		}
+		return nil
+	}
+
+	go func() {
+		err := build()
+		if err != nil {
+			s.Notify(fmt.Sprint("ERR:", err))
 		}
-		s.flow = nil
-	}() // End routine
+	}()
 
 	return nil
 }

+ 1 - 0
go/src/flowserver/sessionmgr.go

@@ -192,6 +192,7 @@ func (fsm *FlowSessionManager) ServeHTTP(w http.ResponseWriter, r *http.Request)
 			}
 			return nil
 		}()
+
 	}
 	log.Println("ws Is disconnecting", r.RemoteAddr)
 }