|
@@ -11,8 +11,6 @@ import (
|
|
|
"log"
|
|
|
"reflect"
|
|
|
"sync"
|
|
|
-
|
|
|
- "github.com/gohxs/prettylog"
|
|
|
)
|
|
|
|
|
|
// OpCtx operation Context
|
|
@@ -37,11 +35,9 @@ type Operation interface { // Id perhaps?
|
|
|
|
|
|
//local operation information
|
|
|
type operation struct {
|
|
|
- sync.Mutex
|
|
|
flow *Flow
|
|
|
id interface{} // Interface key
|
|
|
kind string
|
|
|
- src string
|
|
|
set func(params ...Data)
|
|
|
process func(ctx OpCtx, params ...Data) Data
|
|
|
}
|
|
@@ -59,32 +55,20 @@ func (o *operation) Process(params ...Data) Data {
|
|
|
// Every single one is run with this internally
|
|
|
func (o *operation) processWithCtx(ctx OpCtx, params ...Data) Data {
|
|
|
entry, _ := o.flow.getOp(fmt.Sprint(o.id))
|
|
|
- log := prettylog.New(o.ID() + ":" + entry.name)
|
|
|
-
|
|
|
- log.Printf("Operation waiting")
|
|
|
- o.Lock()
|
|
|
- defer o.Unlock()
|
|
|
- log.Printf("Executing")
|
|
|
-
|
|
|
- log.Println("Context", ctx)
|
|
|
+ entry.Lock()
|
|
|
+ defer entry.Unlock()
|
|
|
|
|
|
if o.flow.err != nil {
|
|
|
return nil
|
|
|
}
|
|
|
if ctx == nil { // No cache/Context
|
|
|
- log.Printf("Processing")
|
|
|
return o.process(ctx, params...)
|
|
|
}
|
|
|
- if v, ok := ctx.Load(o); ok {
|
|
|
- log.Printf("Cached")
|
|
|
+ if v, ok := ctx.Load(o.id); ok {
|
|
|
return v
|
|
|
}
|
|
|
-
|
|
|
- log.Printf("Processing")
|
|
|
res := o.process(ctx, params...)
|
|
|
- log.Println("Storing", res)
|
|
|
- ctx.Store(o, res)
|
|
|
-
|
|
|
+ ctx.Store(o.id, res)
|
|
|
return res
|
|
|
}
|
|
|
|