Browse Source

routine fixes

luis 7 years ago
parent
commit
8cbd0c8f3a
4 changed files with 58 additions and 31 deletions
  1. 1 1
      go/Makefile
  2. 15 3
      go/src/flow/flow.go
  3. 25 8
      go/src/flow/hook.go
  4. 17 19
      go/src/flow/operation.go

+ 1 - 1
go/Makefile

@@ -33,7 +33,7 @@ generate:
 	GOPATH="$(GOPATH)" go generate -v ./src/...
 
 test:
-	$(ENV) gocov test ./src/... | gocov report
+	$(ENV) gocov test -race ./src/... | gocov report
 
 #$(BIN): $(addprefix src/,$(SOURCE))
 #	echo $<

+ 15 - 3
go/src/flow/flow.go

@@ -25,6 +25,8 @@ type opEntry struct {
 // We could Create a single array of operations
 // refs would only mean id, types would be embed in operation
 type Flow struct {
+	sync.Mutex
+
 	registry   *registry.R
 	consts     map[string]Data
 	data       map[string]Data // Should be named, to fetch later
@@ -50,8 +52,18 @@ func New() *Flow {
 	}
 }
 
-// Err returns internal error state
-func (f *Flow) Err() error {
+// Err Set or get current error
+func (f *Flow) Err(p ...interface{}) error {
+	f.Lock()
+	defer f.Unlock()
+
+	if len(p) == 0 {
+		return f.err
+	}
+
+	if err, ok := p[0].(error); ok {
+		f.err = err
+	}
 	return f.err
 }
 
@@ -307,5 +319,5 @@ func (f *Flow) getOp(id string) (*opEntry, bool) {
 
 // Hook attach the node event hooks
 func (f *Flow) Hook(hook Hook) {
-	f.hooks = append(f.hooks, hook)
+	f.hooks.Attach(hook)
 }

+ 25 - 8
go/src/flow/hook.go

@@ -1,8 +1,14 @@
 package flow
 
-import "time"
+import (
+	"sync"
+	"time"
+)
 
-type Hooks []Hook
+type Hooks struct {
+	sync.Mutex
+	hooks []Hook
+}
 
 // Hook funcs to handle certain events on the flow
 type Hook struct {
@@ -14,8 +20,11 @@ type Hook struct {
 }
 
 // Trigger a hook
-func (hooks Hooks) Trigger(name string, ID string, extra ...Data) {
-	for _, h := range hooks {
+func (hs *Hooks) Trigger(name string, ID string, extra ...Data) {
+	hs.Lock()
+	defer hs.Unlock()
+
+	for _, h := range hs.hooks {
 		if h.Any != nil {
 			h.Any(name, ID, time.Now(), extra...)
 		}
@@ -40,7 +49,15 @@ func (hooks Hooks) Trigger(name string, ID string, extra ...Data) {
 	}
 }
 
-func (hooks Hooks) wait(ID string)             { hooks.Trigger("Wait", ID) }
-func (hooks Hooks) start(ID string)            { hooks.Trigger("Start", ID) }
-func (hooks Hooks) finish(ID string, res Data) { hooks.Trigger("Finish", ID, res) }
-func (hooks Hooks) error(ID string, err error) { hooks.Trigger("Error", ID, err) }
+func (hs *Hooks) wait(ID string)             { hs.Trigger("Wait", ID) }
+func (hs *Hooks) start(ID string)            { hs.Trigger("Start", ID) }
+func (hs *Hooks) finish(ID string, res Data) { hs.Trigger("Finish", ID, res) }
+func (hs *Hooks) error(ID string, err error) { hs.Trigger("Error", ID, err) }
+
+// Attach attach a hook
+func (hs *Hooks) Attach(h Hook) {
+	hs.Lock()
+	defer hs.Unlock()
+	hs.hooks = append(hs.hooks, h)
+
+}

+ 17 - 19
go/src/flow/operation.go

@@ -143,9 +143,9 @@ func opFunc(f *Flow, id string) *operation {
 			// Check inputs
 			fnval := reflect.ValueOf(op.executor)
 			if fnval.Type().NumIn() != len(op.inputs) {
-				f.err = fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
+				err := fmt.Errorf("expect %d inputs got %d", fnval.Type().NumIn(), len(op.inputs))
 				f.hooks.error(id, f.err)
-				log.Println("Operation not ok", f.err)
+				f.Err(err)
 				return nil
 			}
 			/////////////////////////////
@@ -162,6 +162,17 @@ func opFunc(f *Flow, id string) *operation {
 				go func(i int, in *operation) {
 					defer wg.Done()
 					fr := in.processWithCtx(ctx, params...)
+					p := reflect.ValueOf(fr)
+
+					// Error checking
+					if !p.IsValid() {
+						f.Err(fmt.Errorf("Input %d is not valid %v", i, p))
+						return
+					}
+					if !p.Type().AssignableTo(fnval.Type().In(i)) {
+						f.Err(fmt.Errorf("Input %d not assignable to %v", i, p))
+					}
+
 					callParam[i] = reflect.ValueOf(fr)
 				}(i, in)
 			}
@@ -169,27 +180,14 @@ func opFunc(f *Flow, id string) *operation {
 
 			f.hooks.start(id)
 
-			// Check params results
-			for i, p := range callParam {
-				f.err = func() error {
-					if !p.IsValid() {
-						return fmt.Errorf("Input is not valid %v", p)
-					}
-					if !p.Type().AssignableTo(fnval.Type().In(i)) {
-						return fmt.Errorf("Input not assignable to %v", p)
-					}
-					return nil
-				}()
-				if f.err != nil {
-					f.hooks.error(id, f.err)
-					return nil
-				}
+			if f.Err() != nil {
+				f.hooks.error(id, f.err)
+				return nil
 			}
 
 			fnret := fnval.Call(callParam)
 			if len(fnret) > 1 && (fnret[1].Interface().(error) != nil) {
-				f.err = fnret[1].Interface().(error)
-				log.Println("Flow err:", f.err)
+				f.Err(fnret[1].Interface().(error))
 				f.hooks.error(id, f.err)
 				return nil
 			}