Quellcode durchsuchen

Backend const operations,

* Vue handle fix
luis vor 7 Jahren
Ursprung
Commit
5d58e721b3

+ 3 - 3
browser/vue-flow/src/components/chat.vue

@@ -8,8 +8,8 @@
           class="handle"
           type="text"
           :value="handle"
-          @blur="CHAT_RENAME($event.target.value)"
-          @keyup.enter="CHAT_RENAME($event.target.value)">
+          @blur="HANDLE_UPDATE($event.target.value)"
+          @keyup.enter="HANDLE_UPDATE($event.target.value)">
         <div ref="messages" class="flow-chat__messages">
           <div v-for="m in events" class="message">
             <div class="handle">
@@ -98,7 +98,7 @@ export default {
 
   methods: {
     ...mapActions('flow', ['NOTIFICATION_ADD']),
-    ...mapActions('chat', ['EVENT_SEND', 'CHAT_RENAME', 'CHAT_JOIN']),
+    ...mapActions('chat', ['EVENT_SEND', 'HANDLE_UPDATE', 'CHAT_JOIN']),
     send () {
       const msg = this.input
       if (msg.trim() === '') { return }

+ 0 - 9
browser/vue-flow/src/components/panel-inspector.vue

@@ -66,15 +66,6 @@
             <label>Error</label>
             <div class="property">{{ activity && activity[nodeInspect.id] && activity[nodeInspect.id].error }}</div>
           </div>
-          <div
-            v-if="activity && activity[nodeInspect.id]"
-            class="flow-inspector--properties-time">
-            <label>Time:</label>
-            <div class="property">
-              {{ activity[nodeInspect.id].startTime }} -- {{ activity[nodeInspect.id].endTime }}
-            </div>
-          </div>
-
         </div>
         <!-- PARAMETERS -->
         <div class="flow-inspector--params flow-inspector__area" v-if="nodeInspect.prop" >

+ 2 - 1
browser/vue-flow/src/store/ws.js

@@ -38,12 +38,13 @@ export default store => {
 
   // Connected
   flowService.connected(() => {
-    store.dispatch(flow.NOTIFICATION_ADD, 'Connected')
     // Make this in a service
     if (store.state.route.params.sessId === undefined) {
       flowService.sessionNew()
       return
     }
+
+    store.dispatch(flow.NOTIFICATION_ADD, 'Connected')
     flowService.sessionLoad(undefined, store.state.route.params.sessId)
   })
 

+ 52 - 133
go/src/flow/flow.go

@@ -8,7 +8,6 @@ import (
 	"fmt"
 	"io"
 	"os"
-	"reflect"
 	"sync"
 )
 
@@ -33,13 +32,10 @@ type Flow struct {
 	consts     map[string]Data
 	data       map[string]Data // Should be named, to fetch later
 	operations sync.Map
-	//map[string]*opEntry
-	//err   error // should be a list of errors/report
-	runID int
+	runID      int
 
 	// Experimental run Event
 	hooks Hooks
-	//func(name string, payLoad map[string]Data)
 }
 
 // New create a new flow
@@ -55,21 +51,6 @@ func New() *Flow {
 	}
 }
 
-// Err Set or get current error
-/*func (f *Flow) Err(p ...interface{}) error {
-	f.Lock()
-	defer f.Unlock()
-
-	if len(p) == 0 {
-		return f.err
-	}
-
-	if err, ok := p[0].(error); ok {
-		f.err = err
-	}
-	return f.err
-}*/
-
 //SetRegistry use the registry specified
 func (f *Flow) SetRegistry(r *registry.R) *Flow {
 	f.registry = r
@@ -83,6 +64,20 @@ func (f *Flow) SetIDGen(idGen func() string) {
 	f.idGen = idGen
 }
 
+// Must Helper to return from operations
+func (f *Flow) Must(op Operation, err error) Operation {
+	if err != nil {
+		panic(err)
+	}
+	return op
+}
+
+// Res returns a deferred operation result
+// passing the Id
+func (f *Flow) Res(id string) Operation {
+	return opFunc(f, id)
+}
+
 // DefOp Manual tag an Operation
 func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation, error) {
 	inputs := make([]*operation, len(params))
@@ -91,7 +86,6 @@ func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation,
 		case *operation:
 			inputs[i] = v
 		default:
-			//log.Println("WARNING defining const with value", v)
 			c, err := f.Const(v)
 			if err != nil {
 				return nil, err
@@ -108,100 +102,63 @@ func (f *Flow) DefOp(id string, name string, params ...interface{}) (Operation,
 	return opFunc(f, id), nil
 }
 
-// Res returns a deferred operation result
-// passing the Id
-func (f *Flow) Res(id string) Operation {
-	// Defered operation
-	return opFunc(f, id)
-}
-
 // Op return an function operator
 //  name - a previous registered function
 //  params - the function inputs
 func (f *Flow) Op(name string, params ...interface{}) (Operation, error) {
-	// Use this on Set?
-	inputs := make([]*operation, len(params))
-	for i, p := range params {
-		switch v := p.(type) {
-		case *operation:
-			inputs[i] = v
-		default:
-
-			c, err := f.Const(v)
-			if err != nil {
-				return nil, err
-			}
-			// fail here
-			//log.Println("WARNING defining const with value", v)
-			inputs[i] = c.(*operation)
-		}
-	}
+	var op Operation
+	var err error
 
-	// Grab executor here
-	executor, err := f.registry.Get(name)
-	if err != nil {
-		return nil, err
-	}
-
-	var op *operation
-	err = f.allocID(func(id string) (Data, error) {
-		op = opFunc(f, id)
-		return &opEntry{sync.Mutex{}, name, inputs, executor}, nil
+	allocErr := f.allocID(func(id string) {
+		op, err = f.DefOp(id, name, params...)
 	})
-	if err != nil {
-		return nil, err
+	if allocErr != nil {
+		return nil, allocErr
 	}
-	return op, nil
+	return op, err
 }
 
-// HasOp verifies if an operation exists
-func (f *Flow) HasOp(name string) bool {
-	_, ret := f.operations.Load(name)
-	return ret
+// DefErrOp define a nil operation that will return error
+// Usefull for builders
+func (f *Flow) DefErrOp(id string, err error) (Operation, error) {
+	executor := func() error { return err }
+	f.operations.Store(id, &opEntry{sync.Mutex{}, fmt.Sprintf("(error)<%v>", err), nil, executor})
+	return opFunc(f, id), nil
 }
 
-// Must it will panic on error
-func (f *Flow) Must(op Operation, err error) Operation {
-	if err != nil {
-		panic(err)
+// ErrOp error operation with generated ID
+func (f *Flow) ErrOp(operr error) (Operation, error) {
+	var op Operation
+	var err error
+
+	allocErr := f.allocID(func(id string) {
+		op, err = f.DefErrOp(id, operr)
+	})
+	if allocErr != nil {
+		return nil, err
 	}
-	return op
+	return op, err
 }
 
-// DefConst define a const by ID
+// DefConst define a const by defined ID
 func (f *Flow) DefConst(id string, value Data) (Operation, error) {
 	f.consts[id] = value
 	executor := func() Data { return f.consts[id] }
-	f.operations.Store(id, &opEntry{sync.Mutex{}, fmt.Sprintf("const<%s>", id), nil, executor})
+	f.operations.Store(id, &opEntry{sync.Mutex{}, fmt.Sprintf("(const)<%s>", id), nil, executor})
 	return opFunc(f, id), nil
 }
 
-// Const returns a const operation
+// Const returns a const operation with generated ID
 func (f *Flow) Const(value Data) (Operation, error) {
-
-	var op *operation
-
-	err := f.allocID(func(id string) (Data, error) {
-		f.consts[id] = value
-		executor := func() Data { return f.consts[id] }
-		op = opFunc(f, id)
-		return &opEntry{sync.Mutex{}, fmt.Sprintf("const<%s>", id), nil, executor}, nil
-
+	var op Operation
+	var err error
+	allocErr := f.allocID(func(id string) {
+		op, err = f.DefConst(id, value)
 	})
-	return op, err
-	// generate ID
-	/*for i := 0; i < 10; i++ {
-		id := f.idGen()
-		if _, ok := f.consts[id]; ok {
-			continue
-		}
-		return opConst(f, id), nil
+	if allocErr != nil {
+		return nil, allocErr
 	}
-	return nil, errors.New("ID exausted")*/
-
-	//	f.consts[id] = value
-	//	return opFunc(f, id), nil
-
+	return op, err
 }
 
 // Var operation
@@ -222,41 +179,6 @@ func (f *Flow) In(paramID int) Operation {
 	return opIn(f, paramID)
 }
 
-// Run a batch of operation?
-func (f *Flow) Run(op Operation, params ...Data) (Data, error) {
-	cache := map[*operation]Data{}
-	return f.run(cache, op, params...)
-}
-func (f *Flow) run(cache map[*operation]Data, op Operation, params ...Data) (Data, error) {
-	o := op.(*operation)
-	if v, ok := cache[o]; ok {
-		return v, nil
-	}
-	// Manually fetch func data because of caching
-	var r Data
-	// This is wrong since the only source of func should be on operation
-	if o.kind == "func" {
-		op, ok := f.getOp(o.id.(string))
-		if !ok {
-			return nil, fmt.Errorf("Operation %s not found", o.id)
-		}
-		callParam := make([]reflect.Value, len(op.inputs))
-		for i, in := range op.inputs {
-			fr, _ := f.run(cache, in, params...) // ignore error
-			callParam[i] = reflect.ValueOf(fr)
-		}
-		r = reflect.ValueOf(op.executor).Call(callParam)[0].Interface()
-	} else {
-		var err error
-		r, err = o.process(nil, params...)
-		if err != nil {
-			return nil, err
-		}
-	}
-	cache[o] = r
-	return r, nil
-}
-
 // Analyse every operations
 func (f *Flow) Analyse(w io.Writer, params ...Data) {
 	if w == nil {
@@ -358,7 +280,7 @@ func (f *Flow) MarshalJSON() ([]byte, error) {
 	return json.Marshal(data)
 }
 
-func (f *Flow) allocID(fn func(id string) (Data, error)) error {
+func (f *Flow) allocID(fn func(id string)) error {
 	f.Lock()
 	defer f.Unlock()
 
@@ -369,12 +291,9 @@ func (f *Flow) allocID(fn func(id string) (Data, error)) error {
 		if _, ok := f.operations.Load(id); !ok {
 			break
 		}
+		return errors.New("ID Exausted")
 	}
-	entry, err := fn(id)
-	if err != nil {
-		return err
-	}
-	f.operations.Store(id, entry)
+	fn(id)
 	return nil
 
 }

+ 1 - 16
go/src/flow/flow_test.go

@@ -124,7 +124,7 @@ func TestOp(t *testing.T) {
 		)),
 		[]float32{1, 2, 3},
 	)
-	res, err := f.Run(add)
+	res, err := add.Process()
 	a.Eq(err, nil)
 
 	test := []float32{3, 6, 9}
@@ -199,21 +199,6 @@ func TestLocalRegistry(t *testing.T) {
 	a.NotEq(err, nil, "flow should contain an error")
 }
 
-func BenchmarkComplex(b *testing.B) {
-	f, op := prepareComplex()
-
-	b.Run("OP", func(b *testing.B) {
-		for i := 0; i < b.N; i++ {
-			op.Process()
-		}
-	})
-	b.Run("RUN", func(b *testing.B) {
-		for i := 0; i < b.N; i++ {
-			f.Run(op)
-		}
-	})
-}
-
 func init() {
 	registry.Register("vecmul", VecMul)
 	registry.Register("vecadd", VecAdd)

+ 6 - 28
go/src/flow/operation.go

@@ -8,7 +8,6 @@ package flow
 import (
 	"errors"
 	"fmt"
-	"log"
 	"reflect"
 	"sync"
 )
@@ -55,25 +54,6 @@ func (o *operation) Process(params ...Data) (Data, error) {
 // Every single one is run with this internally
 func (o *operation) processWithCtx(ctx OpCtx, params ...Data) (Data, error) {
 	return o.process(ctx, params...)
-	/*entry, _ := o.flow.getOp(fmt.Sprint(o.id))
-	if entry == nil {
-		log.Println("Entry is nil for id:", o.id, ", why??")
-		return nil
-	}
-	entry.Lock()
-	defer entry.Unlock()
-
-	if o.flow.err != nil {
-		return nil
-	}
-	if ctx == nil { // No cache/Context
-	}
-	if v, ok := ctx.Load(o.id); ok {
-		return v
-	}
-	res := o.process(ctx, params...)
-	ctx.Store(o.id, res)
-	return res*/
 }
 
 // Set setter for certain operations (Var)
@@ -167,13 +147,13 @@ func opFunc(f *Flow, id string) *operation {
 				go func(i int, in *operation) {
 					defer wg.Done()
 					fr, err := in.processWithCtx(ctx, params...)
-					if fr == nil {
-						log.Println("Creating a new nil value?")
-						callParam[i] = reflect.Zero(fnval.Type().In(i))
+					if err != nil {
+						callParam[i] = reflect.Value{}
 						return
 					}
-					if err != nil {
-						//callParam[i] = reflect.ValueOf(err)
+
+					if fr == nil {
+						callParam[i] = reflect.Zero(fnval.Type().In(i))
 						return
 					}
 
@@ -184,9 +164,7 @@ func opFunc(f *Flow, id string) *operation {
 			// Return type checking
 			errMsg := ""
 			for i, p := range callParam {
-				if p.Interface() == nil {
-					// do nothing just passthrough
-				} else if !p.IsValid() {
+				if !p.IsValid() {
 					//callParam[i] = reflect.Zero(fnval.Type().In(i))
 					errMsg += fmt.Sprintf("Input %d invalid\n", i)
 				} else if !p.Type().AssignableTo(fnval.Type().In(i)) {

+ 71 - 11
go/src/flowserver/flowbuilder.go

@@ -7,6 +7,7 @@ import (
 	"flow/registry"
 	"fmt"
 	"log"
+	"reflect"
 	"strconv"
 	"time"
 )
@@ -72,6 +73,15 @@ func (fd *FlowDocument) fetchLinksTo(ID string) []Link {
 	}
 	return ret
 }
+func (fd *FlowDocument) fetchLinkTo(ID string, n int) *Link {
+	for _, l := range fd.Links {
+		if l.To != ID || l.In != n {
+			continue
+		}
+		return &l
+	}
+	return nil
+}
 
 var ErrLoop = errors.New("Looping through is disabled for now")
 
@@ -79,6 +89,7 @@ var ErrLoop = errors.New("Looping through is disabled for now")
 func FlowBuild(rawData []byte, r *registry.R, startingID string) (*flow.Flow, error) {
 
 	doc := FlowDocument{[]Node{}, []Link{}, []Trigger{}}
+	log.Println("Building flowDocument from:", string(rawData))
 	err := json.Unmarshal(rawData, &doc)
 	if err != nil {
 		return nil, err
@@ -95,6 +106,7 @@ func FlowBuild(rawData []byte, r *registry.R, startingID string) (*flow.Flow, er
 			return nil, ErrLoop //fmt.Errorf("[%v] Looping through nodes is disabled:", ID)
 		}
 		nodeTrack[ID] = true
+		defer delete(nodeTrack, ID)
 
 		// If flow already has ID just return
 		if op := f.GetOp(ID); op != nil {
@@ -119,28 +131,38 @@ func FlowBuild(rawData []byte, r *registry.R, startingID string) (*flow.Flow, er
 		case "Variable":
 			op = f.Var(node.ID, node.Prop["init"])
 		case "Const":
-			var val flow.Data
-			// XXX: Automate this in a func
 			raw := node.Label
-			err := json.Unmarshal([]byte(raw), &val)
-			if err != nil { // Try to unmarshal as a string?
-				val = string(raw)
+			val, err := parseValue(nil, raw)
+			if err != nil {
+				op, _ = f.DefErrOp(node.ID, err)
+			} else {
+				op, _ = f.DefConst(node.ID, val)
 			}
-			op, _ = f.DefConst(node.ID, val)
 		default:
 			// Load entry
 			entry, err := r.Entry(node.Src)
 			if err != nil {
 				return nil, err
 			}
-			inputs := doc.fetchLinksTo(node.ID)
+			//// Process inputs ////
 			param := make([]flow.Data, len(entry.Inputs))
-			for _, l := range inputs {
-				param[l.In], err = build(l.From)
+			for i := range param {
+				l := doc.fetchLinkTo(node.ID, i)
+				if l == nil {
+					// Const value
+					v, err := parseValue(entry.Inputs[i], node.DefaultInputs[i])
+					if err != nil {
+						param[i], _ = f.ErrOp(err)
+						continue
+					}
+					param[i] = v
+					continue
+				}
+				inOp, err := build(l.From)
 				if err != nil {
 					return nil, err
 				}
-				// if type Different than ours try to convert
+				param[i] = inOp
 			}
 			op, err = f.DefOp(node.ID, node.Src, param...)
 			if err != nil {
@@ -181,7 +203,6 @@ func FlowBuild(rawData []byte, r *registry.R, startingID string) (*flow.Flow, er
 				},
 			})
 
-			delete(nodeTrack, node.ID)
 		}
 		return op, nil
 	}
@@ -192,5 +213,44 @@ func FlowBuild(rawData []byte, r *registry.R, startingID string) (*flow.Flow, er
 		return nil, err
 	}
 
+	log.Println("Flow:", f)
 	return f, nil
 }
+
+// Or give a string
+func parseValue(typ reflect.Type, raw string) (flow.Data, error) {
+	if typ == nil {
+		var val flow.Data
+		err := json.Unmarshal([]byte(raw), &val)
+		if err != nil { // Try to unmarshal as a string?
+			val = string(raw)
+		}
+		return val, nil
+	}
+
+	var ret flow.Data
+	switch typ.Kind() {
+	case reflect.Int:
+		v, err := strconv.Atoi(raw)
+		if err != nil {
+			log.Println("Wrong int conversion", err)
+			return nil, err
+		}
+		ret = v
+	case reflect.String:
+		ret = raw
+	default:
+		if len(raw) == 0 {
+			ret = reflect.Zero(typ)
+		} else {
+			refVal := reflect.New(typ)
+			err := json.Unmarshal([]byte(raw), refVal.Interface())
+			if err != nil {
+				return nil, err
+			}
+			ret = refVal.Interface()
+		}
+	}
+	log.Printf("Returning %#v", ret)
+	return ret, nil
+}

+ 0 - 5
go/src/flowserver/session.go

@@ -141,12 +141,9 @@ func (s *FlowSession) DocumentUpdate(c *websocket.Conn, data []byte) error {
 
 // DocumentSave persist document in a file
 func (s *FlowSession) DocumentSave(data []byte) error {
-	log.Println("Receiving documentSave")
 	s.Lock()
 	defer s.Unlock()
 
-	log.Println("Saving..")
-
 	s.RawDoc = make([]byte, len(data))
 	copy(s.RawDoc, data)
 
@@ -201,8 +198,6 @@ func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
 		})
 
 		s.flow, err = FlowBuild(s.RawDoc, localr, ID)
-
-		log.Println("Flow:", s.flow)
 		if err != nil {
 			s.Notify(fmt.Sprint("ERR:", err))
 			log.Println("Flow error:", err)