瀏覽代碼

backend Sending errors

luis 7 年之前
父節點
當前提交
a67c6f7494

+ 1 - 1
Makefile

@@ -35,7 +35,7 @@ frontend: DIST/web
 backend: DIST/flowserver
 
 dev: clean backend
-	tmux split "DEBUG=1 DIST/flowserver"
+	tmux split "DEBUG=1 DIST/flowserver;$$SHELL"
 	cd browser/vue-flow; yarn dev
 
 builder:

+ 4 - 0
browser/vue-flow/src/assets/dark-theme.css

@@ -32,3 +32,7 @@
   outline: none;
   box-shadow: inset 0 1px 1px rgba(0, 0, 0, 0.4);
 }
+
+.dark .flow-node__activity[status="running"] .flow-node__activity-icon > * {
+  stroke: #9af;
+}

+ 3 - 3
browser/vue-flow/src/assets/default-theme.css

@@ -134,8 +134,8 @@ input {
 }
 
 .flow-view:not(.activity)
-  .flow-link:not(.flow-link--pointer):hover
-  .flow-link__visible {
+.flow-link:not(.flow-link--pointer):hover
+.flow-link__visible {
   stroke: var(--link-hover);
 
   /*filter: url(#highlight-border);*/
@@ -178,7 +178,6 @@ input {
 }
 
 .flow-node__label {
-  /*font-family: FiraMono, monospace;*/
   font-family: RobotoMono, monospace;
   letter-spacing: -0.05em;
   font-size: 14px;
@@ -199,6 +198,7 @@ input {
 .flow-node__activity {
   fill: var(--background-secondary);
 }
+
 .flow-node__activity-icon > * {
   stroke: var(--normal);
 }

+ 1 - 1
browser/vue-flow/src/components/flow/link.vue

@@ -37,7 +37,7 @@ export default {
       }
       s = Math.max(s, 2)
       const x2 = this.x2 - (this.pointer ? 4 : 15.5)
-      const x1 = this.x1 + 4
+      const x1 = this.x1 + 7
       return `M${x1},${this.y1} C${x1 + s},${this.y1} ${x2 - s},${this.y2} ${x2},${this.y2}`
     }
   }

+ 3 - 1
browser/vue-flow/src/components/flow/node.vue

@@ -170,7 +170,7 @@ export default {
     },
     bodyProps () {
       let width = this.labelRect.width + 46
-      let height = Math.max(this.labelRect.height + 20, 60)
+      let height = Math.max(this.labelRect.height + 20, 60, this.inputs.length * 25)
       if (this.style.shape === 'circle') {
         width = height = Math.max(width, height)
       }
@@ -264,6 +264,7 @@ export default {
 .flow-node__body {
   transition: all 0.3s;
 }
+
 .flow-node[status=running] .flow-node__body{
   stroke: yellow !important;
 }
@@ -271,6 +272,7 @@ export default {
 .flow-node[status=error] .flow-node__body{
   stroke: red !important;
 }
+
 .flow-node[status=finished] .flow-node__body{
   stroke: green !important;
 }

+ 35 - 25
browser/vue-flow/src/components/flow/nodestatus.vue

@@ -6,34 +6,33 @@
       class="flow-node__activity-background"
       rx="0"
       ry="0"
-      r="14"/>
-    >
-
+      r="17"/>
     <icon-refresh v-if="status=='running'" v-bind="iconProps" class="flow-node__activity-icon" />
     <icon-wait v-else-if="status=='waiting'" v-bind="iconProps" class="flow-node__activity-icon"/>
     <icon-ok v-else-if="status=='finish'" v-bind="iconProps"class="flow-node__activity-icon"/>
+    <icon-fail v-else-if="status=='error'" v-bind="iconProps"class="flow-node__activity-icon"/>
     <icon-question v-else v-bind="iconProps" class="flow-node__activity-icon" />
-
   </g>
 </template>
 <script>
 import IconWait from '@/assets/icons/wait.svg'
+import IconFail from '@/assets/icons/fail.svg'
 import IconOk from '@/assets/icons/ok.svg'
 import IconQuestion from '@/assets/icons/question.svg'
 import IconRefresh from '@/assets/icons/refresh.svg'
 
 export default {
   name: 'FlowNodeStatus',
-  components: {IconWait, IconOk, IconQuestion, IconRefresh},
+  components: {IconWait, IconFail, IconOk, IconQuestion, IconRefresh},
   props: { status: {type: String, default: ''} },
   computed: {
     iconProps () {
       return {
-        x: -10,
-        y: -10,
+        x: -12,
+        y: -12,
         viewBox: '-4 -4 72 72',
-        width: 20,
-        height: 20
+        width: 24,
+        height: 24
       }
     }
   }
@@ -47,12 +46,14 @@ export default {
 
 .flow-node__activity-background {
   fill: inherits;
-  transition: all .3s;
+  transition: all 0.3s;
 }
+
 .flow-node__activity-icon{
   width:20px;
   height:20px;
 }
+
 .flow-node__activity-icon  >* {
   transform-origin: 50% 50%;
   stroke-width: 8px;
@@ -60,33 +61,41 @@ export default {
 }
 
 .flow-node__activity[status=running] .flow-node__activity-icon  >* {
-   -webkit-animation: spin 1s infinite linear;
-   -moz-animation: spin 1s infinite linear;
-   animation: spin 1s infinite linear;
-  stroke: #00C;
+  -webkit-animation: spin 1s infinite linear;
+  -moz-animation: spin 1s infinite linear;
+  animation: spin 1s infinite linear;
+  stroke: #00c !default;
 }
+
+.flow-node__activity[status=error] .flow-node__activity-icon  > * {
+  stroke: #f22;
+}
+
 .flow-node__activity[status=waiting] .flow-node__activity-icon  >* {
-   -webkit-animation: shake 1s infinite linear;
-   -moz-animation: shake 1s infinite linear;
-   animation: shake 1s infinite linear;
+  -webkit-animation: shake 1s infinite linear;
+  -moz-animation: shake 1s infinite linear;
+  animation: shake 1s infinite linear;
 }
+
 .flow-node__activity[status=finish] .flow-node__activity-icon  >* {
-  stroke: #2C2;
+  stroke: #2c2;
 }
 
 /*** ANIMATIONS ***/
 @-moz-keyframes spin {
-    from { -moz-transform: rotate(0deg); }
-    to { -moz-transform: rotate(-360deg); }
+  from { -moz-transform: rotate(0deg); }
+  to { -moz-transform: rotate(-360deg); }
 }
+
 @-webkit-keyframes spin {
-    from { -webkit-transform: rotate(0deg); }
-    to { -webkit-transform: rotate(-360deg); }
+  from { -webkit-transform: rotate(0deg); }
+  to { -webkit-transform: rotate(-360deg); }
 }
+
 @keyframes spin {
-    from {transform:rotate(0deg);}
-    to {transform:rotate(-360deg);}
-  }
+  from {transform:rotate(0deg);}
+  to {transform:rotate(-360deg);}
+}
 
 @keyframes shake {
   10%, 90% {
@@ -100,6 +109,7 @@ export default {
   30%, 50%, 70% {
     transform: rotate(-7deg);
   }
+
   40%, 60% {
     transform: rotate(7deg);
   }

+ 16 - 1
browser/vue-flow/src/components/main.vue

@@ -222,7 +222,22 @@ export default {
       }
     })
     this.$flowService.on('registry', (v) => {
-      this.registry = Object.assign({}, defRegistry, v.data)
+      console.log('Registry received:', JSON.stringify(v))
+
+      let res = {}
+      for (let k of Object.keys(v.data)) {
+        const e = v.data[k]
+        res[k] = {
+          categories: e.categories,
+          inputs: e.inputs,
+          inputDesc: e.inputDesc,
+
+          output: e.output,
+          outputDesC: e.outputDesc,
+          style: e.extra && e.extra.style
+        }
+      }
+      this.registry = Object.assign({}, defRegistry, res)
     })
     this.$flowService.on('nodeActivity', (v) => {
       console.log('Received activity')

+ 3 - 2
browser/vue-flow/src/components/panel.vue

@@ -134,7 +134,7 @@ export default {
 }
 
 .flow-funcs__src {
-  font-size:12px;
+  font-size:14px;
   padding:12px 4px;
   margin-top:1px;
   text-align:center;
@@ -157,7 +157,8 @@ export default {
 
 .flow-funcs__group.blocks .flow-funcs__src {
   display:block;
-  width: calc(50% - 2px);
+  font-size:10px;
+  width: calc(25% - 2px);
   padding:18px 4px;
   text-overflow: ellipsis;
   margin:1px;

+ 4 - 4
go/src/flow/hook.go

@@ -38,7 +38,7 @@ func (hooks Hooks) Trigger(name string, ID string, extra ...Data) {
 	}
 }
 
-func (hooks Hooks) Wait(ID string)             { hooks.Trigger("Wait", ID) }
-func (hooks Hooks) Start(ID string)            { hooks.Trigger("Start", ID) }
-func (hooks Hooks) Finish(ID string, res Data) { hooks.Trigger("Finish", ID, res) }
-func (hooks Hooks) Error(ID string, err error) { hooks.Trigger("Error", ID, err) }
+func (hooks Hooks) wait(ID string)             { hooks.Trigger("Wait", ID) }
+func (hooks Hooks) start(ID string)            { hooks.Trigger("Start", ID) }
+func (hooks Hooks) finish(ID string, res Data) { hooks.Trigger("Finish", ID, res) }
+func (hooks Hooks) error(ID string, err error) { hooks.Trigger("Error", ID, err) }

+ 17 - 18
go/src/flow/operation.go

@@ -8,7 +8,6 @@ package flow
 import (
 	"errors"
 	"fmt"
-	"log"
 	"reflect"
 	"sync"
 )
@@ -49,27 +48,21 @@ func (o *operation) ID() string {
 
 // Process operation process wrapper
 func (o *operation) Process(params ...Data) Data {
-	log.Println("Execute internally with ctx")
 	return o.processWithCtx(newOpCtx(), params...)
 }
 
 // Every single one is run with this internally
 func (o *operation) processWithCtx(ctx OpCtx, params ...Data) Data {
 	if o.flow.err != nil {
-		log.Println("Flow has error")
 		return nil
 	}
 	if ctx == nil { // No cache/Context
-		log.Println("context is nil, run directly")
 		return o.process(ctx, params...)
 	}
-	log.Println("Load var from context")
 	if v, ok := ctx.Load(o); ok {
 		return v
 	}
-	log.Println("Processing executor")
 	res := o.process(ctx, params...)
-	log.Println("Storing value")
 	ctx.Store(o, res)
 
 	return res
@@ -118,41 +111,47 @@ func opFunc(f *Flow, id string) *operation {
 		kind: "func",
 		set:  dumbSet,
 		process: func(ctx OpCtx, params ...Data) Data {
-			log.Println("Function executing:", id)
 			op, ok := f.getOp(id)
 			if !ok {
 				f.err = fmt.Errorf("invalid operation '%s'", id)
-				f.hooks.Error(id, f.err)
+				f.hooks.error(id, f.err)
 				return nil
 			}
-			log.Println("Got operation")
 			/////////////////////////////
 			// NEW PARALLEL PROCESSING
 			///////////
-			log.Println("Sending wait")
-			f.hooks.Wait(id)
-			log.Println("Waited")
+			f.hooks.wait(id)
 
 			callParam := make([]reflect.Value, len(op.inputs))
 
-			log.Println("Create wait group for", len(op.inputs))
 			wg := sync.WaitGroup{}
 			wg.Add(len(op.inputs))
 			for i, in := range op.inputs {
 				go func(i int, in *operation) {
-					log.Println("Running parallel")
 					defer wg.Done()
 					fr := in.processWithCtx(ctx, params...)
 					callParam[i] = reflect.ValueOf(fr)
-					log.Println("Done")
 				}(i, in)
 			}
 			wg.Wait()
-			f.hooks.Start(id)
+			// Check params
+			for _, p := range callParam {
+				if !p.IsValid() {
+					f.hooks.error(id, errors.New("Input failed"))
+					return nil
+				}
+			}
+			f.hooks.start(id)
 			fnret := reflect.ValueOf(op.executor).Call(callParam)
+			if len(fnret) > 1 {
+				if err := fnret[1].Interface().(error); err != nil {
+					f.hooks.error(id, err)
+					return nil
+				}
+			}
 			// check fnret[1] for error
 			ret := fnret[0].Interface()
-			f.hooks.Finish(id, ret)
+			f.hooks.finish(id, ret)
 			return ret
 		},
 	}

+ 63 - 2
go/src/flow/registry/entry.go

@@ -5,16 +5,30 @@ import (
 	"reflect"
 )
 
+// Description of an entry
+type Description struct {
+	Name       string   `json:"name"`
+	Categories []string `json:"categories"`
+
+	Inputs     []string       `json:"inputs"`
+	InputDesc  map[int]string `json:"inputsDesc"`
+	Output     string         `json:"output"`
+	OutputDesc string         `json:"outputDesc"`
+
+	Extra map[string]interface{} `json:"extra"`
+}
+
 // Entry contains a function description params etc
 type Entry struct {
+	registry    *Registry
 	fn          interface{}
 	Description *Description
 	err         error
 }
 
 // NewEntry creates and describes a New Entry
-func NewEntry(fn interface{}) *Entry {
-	e := &Entry{fn: fn}
+func NewEntry(r *Registry, fn interface{}) *Entry {
+	e := &Entry{registry: r, fn: fn}
 
 	fntyp := reflect.TypeOf(e.fn)
 	if fntyp.Kind() != reflect.Func {
@@ -38,6 +52,8 @@ func NewEntry(fn interface{}) *Entry {
 		Categories: []string{"generic"},
 		Inputs:     Inputs,
 		Output:     Output,
+		InputDesc:  map[int]string{},
+		Extra:      map[string]interface{}{},
 	}
 	return e
 }
@@ -53,6 +69,51 @@ func (e *Entry) Categories(cat ...string) *Entry {
 		return e
 	}
 	e.Description.Categories = cat
+	return e
+}
+
+// Input description for Input
+func (e *Entry) Input(i int, desc string) *Entry {
+	if e.err != nil {
+		return e
+	}
+	e.Description.InputDesc[i] = desc
+	return e
+}
+
+// Output description for Input
+func (e *Entry) Output(desc string) *Entry {
+	if e.err != nil {
+		return e
+	}
+	e.Description.OutputDesc = desc
+	return e
+}
 
+// Extra information on entry
+func (e *Entry) Extra(name string, extra interface{}) *Entry {
+	if e.err != nil {
+		return e
+	}
+	e.Description.Extra[name] = extra
 	return e
 }
+
+//Batch helper to batch set properties
+type Batch []*Entry
+
+//Categories set categories of the group
+func (eg Batch) Categories(cat ...string) Batch {
+	for _, e := range eg {
+		e.Categories(cat...)
+	}
+	return eg
+}
+
+// Extra set extras of the group
+func (eg Batch) Extra(name string, value interface{}) Batch {
+	for _, e := range eg {
+		e.Extra(name, value)
+	}
+	return eg
+}

+ 61 - 3
go/src/flow/registry/entry_test.go

@@ -8,11 +8,69 @@ import (
 
 func TestNewEntryInvalid(t *testing.T) {
 	assert := assert.A(t)
-	e := registry.NewEntry("string")
+	r := registry.New()
+	e := registry.NewEntry(r, "string")
 	assert.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
 }
 func TestNewEntryValid(t *testing.T) {
 	assert := assert.A(t)
-	e := registry.NewEntry(func(a int) int { return 0 })
-	assert.Eq(e.Err(), nil, "Fetching an entry")
+	r := registry.New()
+	e := registry.NewEntry(r, func(a int) int { return 0 })
+	assert.Eq(e.Err(), nil, "fetching an entry")
+}
+
+func TestDescription(t *testing.T) {
+	assert := assert.A(t)
+	r := registry.New()
+
+	e := registry.NewEntry(r, func() int { return 0 })
+	e.Categories("a", "b")
+	assert.Eq(e.Err(), nil, "should not fail setting categories")
+	assert.Eq(len(e.Description.Categories), 2, "should have 2 categories")
+
+	e.Input(0, "input doc")
+	assert.Eq(e.Err(), nil, "should not fail setting input description")
+	assert.Eq(len(e.Description.InputDesc), 1, "should have 1 input description")
+
+	e.Output("output desc")
+	assert.Eq(e.Err(), nil, "should not fail setting input description")
+	assert.Eq(e.Description.OutputDesc, "output desc", "output description should be the same")
+
+	e.Extra("test", 123)
+	assert.Eq(e.Err(), nil, "should not fail setting extra doc")
+	assert.Eq(e.Description.Extra["test"], 123, "extra text should be as expected")
+}
+func TestDescriptionError(t *testing.T) {
+	assert := assert.A(t)
+	r := registry.New()
+	e := registry.NewEntry(r, "string")
+	assert.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
+	e.Categories("a", "b")
+	assert.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
+	e.Input(0, "input doc")
+	assert.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
+	e.Output("output desc")
+	assert.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
+	e.Extra("test", 123)
+	assert.Eq(e.Err(), registry.ErrNotAFunc, "entry is not a function")
+
+}
+
+func TestEntryBatch(t *testing.T) {
+	assert := assert.A(t)
+	r := registry.New()
+
+	b := registry.Batch{
+		registry.NewEntry(r, func() int { return 0 }),
+		registry.NewEntry(r, func() int { return 0 }),
+		registry.NewEntry(r, func() int { return 0 }),
+	}.Categories("test").
+		Extra("name", 1)
+
+	assert.Eq(len(b), 3, "should have 3 items")
+	for _, e := range b {
+		assert.Eq(e.Description.Categories[0], "test", "It should be of category test")
+		assert.Eq(e.Description.Extra["name"], 1, "It should contain extra")
+	}
+
 }

+ 1 - 9
go/src/flow/registry/registry.go

@@ -4,14 +4,6 @@ import (
 	"reflect"
 )
 
-// Description of an entry
-type Description struct {
-	Name       string   `json:"name"`
-	Categories []string `json:"categories"`
-	Inputs     []string `json:"inputs"`
-	Output     string   `json:"output"`
-}
-
 // Registry function registry
 type Registry struct {
 	data map[string]*Entry
@@ -25,7 +17,7 @@ func New() *Registry {
 //Register should be a func only register
 func (r *Registry) Register(name string, v interface{}) *Entry {
 
-	e := NewEntry(v)
+	e := NewEntry(r, v)
 	if e.err != nil {
 		return e
 	}

+ 47 - 12
go/src/flowserver/cmd/flowserver/main.go

@@ -1,7 +1,9 @@
 package main
 
 import (
+	"errors"
 	"flow"
+	"flow/registry"
 	"flowserver"
 	"fmt"
 	"log"
@@ -17,20 +19,53 @@ func main() {
 
 	log.Println("Running version:", flowserver.Version)
 
-	flow.Register("somebigfunctionthatoverflows", strCat)
-	flow.Register("str.cat", strCat)
-	flow.Register("str.reverse", strReverse)
-	flow.Register("str.split", strings.Split)
-	flow.Register("str.join", strings.Join)
-
-	flow.Register("delay.reverse", delayReverse)
-	flow.Register("delay.split", delaySplit)
-	flow.Register("delay.join", delayJoin)
-	flow.Register("longduration", longduration)
-	flow.Register("duration", duration)
+	registry.Batch{
+		flow.Register("delay.cat", delayCat),
+		flow.Register("delay.reverse", delayReverse),
+		flow.Register("delay.split", delaySplit),
+		flow.Register("delay.join", delayJoin),
+		flow.Register("addduration", duration),
+	}.Categories("string", "test").
+		Extra("style", map[string]string{
+			"color": "#a55",
+		})
+
+	registry.Batch{
+		flow.Register("multiplex4", multiplex4),
+		flow.Register("multiplex6", multiplex6),
+		flow.Register("longduration", longduration),
+		flow.Register("longduration", longduration),
+		flow.Register("error", func(s string) (string, error) {
+			time.Sleep(4 * time.Second)
+			return "", errors.New("Some error")
+		}),
+	}.Categories("test").
+		Extra("style", map[string]string{
+			"color": "#959",
+		})
+
+	registry.Batch{
+		flow.Register("str.cat", strCat),
+		flow.Register("str.reverse", strReverse),
+		flow.Register("str.split", strings.Split),
+		flow.Register("str.join", strings.Join),
+	}.Categories("string").
+		Extra("style", map[string]string{
+			"color": "#aa5",
+		})
+
+	addr := ":2015"
+	log.Println("Starting server  at:", ":2015")
 
 	f := flowserver.FlowServer{}
-	f.ListenAndServe()
+	f.ListenAndServe(addr)
+}
+
+func multiplex4(a1, a2, a3, a4 string) string {
+	return strings.Join([]string{a1, a2, a3, a4}, " ")
+}
+func multiplex6(a1, a2, a3, a4, a5, a6 string) string {
+	return strings.Join([]string{a1, a2, a3, a4, a5, a6}, " ")
 }
 
 /* Sample funcs */

+ 18 - 18
go/src/flowserver/flowserver.go

@@ -1,9 +1,7 @@
 package flowserver
 
 import (
-	"fmt"
 	"log"
-	"net"
 	"net/http"
 	"net/http/httputil"
 	"net/url"
@@ -23,7 +21,7 @@ type FlowServer struct{}
 
 // ListenAndServe starts the httpserver
 // It will listen on default port 2015 and increase if port is in use
-func (f *FlowServer) ListenAndServe() error {
+func (f *FlowServer) ListenAndServe(addr string) error {
 	fsm := NewFlowSessionManager()
 
 	c := chain.New(webu.ChainLogger(prettylog.New("req")))
@@ -42,22 +40,24 @@ func (f *FlowServer) ListenAndServe() error {
 		mux.Handle("/", c.Build(webu.StaticHandler("web", "index.html")))
 	}
 
+	return http.ListenAndServe(addr, mux)
+
 	////////////////////
 	// Server starter
 	/////
-	port := 2015
-	for {
-		addr := fmt.Sprintf(":%d", port)
-		s, err := net.Listen("tcp", addr)
-		if err != nil {
-			log.Println("Listen error:", err)
-			port++
-			continue
-		}
-		log.Println("Listening at:", addr)
-		err = http.Serve(s, mux)
-		if err != nil {
-			log.Fatal(err)
-		}
-	}
+	//port := 2015
+	//for {
+	//addr := fmt.Sprintf(":%d", port)
+	//s, err := net.Listen("tcp", addr)
+	//if err != nil {
+	//log.Println("Listen error:", err)
+	//port++
+	//continue
+	//}
+	//log.Println("Listening at:", addr)
+	//err = http.Serve(s, mux)
+	//if err != nil {
+	//log.Fatal(err)
+	//}
+	//}
 }