Bläddra i källkod

Corrected trigger executor

luis 7 år sedan
förälder
incheckning
7211a30562
3 ändrade filer med 11 tillägg och 123 borttagningar
  1. 0 18
      go/src/flow/flow.go
  2. 8 2
      go/src/flow/flow_test.go
  3. 3 103
      go/src/flow/operation.go

+ 0 - 18
go/src/flow/flow.go

@@ -65,12 +65,6 @@ func (f *Flow) Must(op Operation, err error) Operation {
 	return op
 }
 
-// Res returns a deferred operation result
-// passing the Id
-/*func (f *Flow) Res(id string) Operation {
-	return deferred(f, id)
-}*/
-
 // Auto ID generation
 
 // Op return an function operator
@@ -295,18 +289,6 @@ func (f *Flow) addEntry(entry *operation) (string, error) {
 
 }
 
-/////////////////
-// Async data
-/////
-/*func (f *Flow) getOp(id string) (*operation, bool) {
-
-	o, ok := f.operations.Load(id)
-	if !ok {
-		return nil, false
-	}
-	return o.(*operation), true
-}*/
-
 // GetOp Return an existing operation or return notfound error
 func (f *Flow) GetOp(id string) *operation {
 	op, ok := f.operations.Load(id)

+ 8 - 2
go/src/flow/flow_test.go

@@ -23,11 +23,17 @@ func TestInput(t *testing.T) {
 	a := assert.A(t)
 	f := flow.New()
 
-	op, err := f.Op("vecadd", []float32{1, 1, 1}, f.Must(f.In(0)))
+	opIn, err := f.In(0)
+	a.Eq(err, nil, "input err should be nil")
+
+	d, err := opIn.Process([]float32{2, 2, 2})
+	a.Eq(d, []float32{2, 2, 2}, "array should be equal")
+
+	op, err := f.Op("vecadd", []float32{1, 1, 1}, opIn)
 	a.Eq(err, nil, "result should not error")
 	a.NotEq(op, nil, "operation should not be nil")
 
-	d, err := op.Process([]float32{2, 2, 2})
+	d, err = op.Process([]float32{2, 2, 2})
 	a.Eq(err, nil, "should not error passing an input")
 
 	a.Eq(d, []float32{3, 3, 3}, "array should be equal")

+ 3 - 103
go/src/flow/operation.go

@@ -57,109 +57,10 @@ func (o *operation) Set(data Data) {
 }
 
 // make Executor for func
-
-// The function that executions an operation
-/*func executeOP(f *Flow, id string, ctx OpCtx, params ...Data) (ret Data, err error) {
-	defer func() {
-		if r := recover(); r != nil {
-			err = fmt.Errorf("%v", r)
-			f.hooks.error(id, err)
-		}
-	}()
-
-	op, ok := f.getOp(id)
-	if !ok {
-		err = fmt.Errorf("invalid operation '%s'", id)
-		f.hooks.error(id, err)
-		return nil, err
-	}
-		// Input param
-
-	op.Lock()
-	defer op.Unlock()
-	if ctx != nil {
-		if v, ok := ctx.Load(id); ok { // Cache
-			return v, nil
-		}
-	}
-
-	// Check inputs
-	fnval := reflect.ValueOf(op.executor)
-	if fnval.Type().NumIn() != len(op.inputs) {
-		err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
-		f.hooks.error(id, err)
-		return nil, err
-	}
-	/////////////////////////////
-	// NEW PARALLEL PROCESSING
-	///////////
-	f.hooks.wait(id)
-
-	callParam := make([]reflect.Value, len(op.inputs))
-
-	// Parallel processing if inputs
-	wg := sync.WaitGroup{}
-	wg.Add(len(op.inputs))
-	for i, in := range op.inputs {
-		go func(i int, in *operation) {
-			defer wg.Done()
-			fr, err := executeOP(f, in.id, ctx, params...)
-			if err != nil {
-				callParam[i] = reflect.Value{}
-				return
-			}
-
-			if fr == nil {
-				callParam[i] = reflect.Zero(fnval.Type().In(i))
-				return
-			}
-
-			callParam[i] = reflect.ValueOf(fr)
-		}(i, in)
-	}
-	wg.Wait()
-	// Return type error checking
-	//
-	errMsg := ""
-	for i, p := range callParam {
-		if !p.IsValid() {
-			//callParam[i] = reflect.Zero(fnval.Type().In(i))
-			errMsg += fmt.Sprintf("Input %d invalid\n", i)
-		} else if !p.Type().AssignableTo(fnval.Type().In(i)) {
-			errMsg += fmt.Sprintf("Input %d type mismatch expected: %v got :%v\n", i, fnval.Type().In(i), p.Type())
-		}
-	}
-	if len(errMsg) > 0 {
-		err := errors.New(errMsg)
-		f.hooks.error(id, err)
-		return nil, err
-	}
-
-	// The actual operation process
-	f.hooks.start(id)
-
-	fnret := fnval.Call(callParam)
-	if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) {
-		err, ok := fnret[1].Interface().(error)
-		if !ok {
-			err = errors.New("unknown error")
-		}
-		f.hooks.error(id, err)
-		return nil, err
-	}
-
-	// THE RESULT
-	ret = fnret[0].Interface()
-	if ctx != nil {
-		ctx.Store(id, ret)
-	}
-	f.hooks.finish(id, ret)
-	return ret, nil
-}*/
 func (f *Flow) makeTrigger(id string, fn executorFunc) executorFunc {
 	return func(ctx OpCtx, params ...Data) (Data, error) {
 		f.hooks.start(id)
-		d, err := fn(ctx, params)
+		d, err := fn(ctx, params...)
 		if err != nil {
 			f.hooks.error(id, err)
 			return d, err
@@ -235,13 +136,12 @@ func (f *Flow) makeExecutor(id string, fn interface{}) executorFunc {
 					paramMutex.Unlock()
 					return
 				} else if !res.Type().ConvertibleTo(fnval.Type().In(i)) {
-
 					if fnval.Type().In(i).Kind() == reflect.String {
 						callParam[i] = reflect.ValueOf(fmt.Sprint(res.Interface()))
 						return
 					}
 					paramMutex.Lock()
-					callErrors += fmt.Sprintf("Input %d type: %v cannot be converted to %v\n", i, res.Type(), fnval.Type().In(i))
+					callErrors += fmt.Sprintf("Input %d type: %v(%v) cannot be converted to %v\n", i, res.Type(), res.Interface(), fnval.Type().In(i))
 					paramMutex.Unlock()
 					return
 				}
@@ -387,7 +287,7 @@ func (f *Flow) DefIn(id string, paramID int) Operation {
 		id:       id,
 		Mutex:    sync.Mutex{},
 		flow:     f,
-		name:     fmt.Sprintf("(in)<%s>", id),
+		name:     fmt.Sprintf("(in)<%d>", paramID),
 		kind:     "in",
 		inputs:   nil,
 		setter:   nil,