123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- package flow
- //
- // Find a way to improve this mess, maybe it can be merged in one func
- //
- //
- import (
- "errors"
- "fmt"
- "log"
- "reflect"
- "runtime/debug"
- "sync"
- )
- type executorFunc func(OpCtx, ...Data) (Data, error)
- // Operation interface
- type Operation interface { // Id perhaps?
- //ID() string
- //Set(input Data) // Special var method
- Process(params ...Data) (Data, error)
- }
- type operation struct {
- sync.Mutex
- flow *Flow
- name string
- kind string
- // Could be a simple ID for operation, but it will depend on flow
- inputs []*operation // still figuring, might be Operation
- fn interface{} // Any func
- executor executorFunc
- }
- // OpCtx operation Context
- type OpCtx = *sync.Map
- // NewOpCtx creates a running context
- func newOpCtx() OpCtx {
- return &sync.Map{}
- }
- //func (o *operation) ID() string { return o.id }
- func (o *operation) Process(params ...Data) (Data, error) {
- // Create CTX
- ctx := newOpCtx()
- return o.executor(ctx, params...)
- }
- // make Executor for func
- func (f *Flow) asTrigger(op *operation, fn executorFunc) executorFunc {
- return func(ctx OpCtx, params ...Data) (Data, error) {
- f.hooks.start(op)
- //panic recoverer, since nodes are not our functions
- var err error
- var res Data
- func() {
- defer func() {
- if r := recover(); r != nil {
- log.Println("Panic:", r)
- debug.PrintStack()
- err = fmt.Errorf("%v", r)
- }
- }()
- res, err = fn(ctx, params...)
- }()
- if err != nil {
- f.hooks.error(op, err)
- } else {
- f.hooks.finish(op, res)
- }
- return res, err
- }
- }
- // safe makeExecutor with a bunch of type checks
- // we will create a less safer functions to get performance
- func (f *Flow) makeExecutor(op *operation, fn interface{}) executorFunc {
- // ExecutorFunc
- return func(ctx OpCtx, params ...Data) (Data, error) {
- /*op := f.GetOp(id)
- if op == nil {
- return nil, fmt.Errorf("invalid operation '%s'", id)
- }*/
- op.Lock()
- defer op.Unlock()
- // Load from cache if any
- if ctx != nil {
- if v, ok := ctx.Load(op); ok {
- return v, nil
- }
- }
- // Change to wait to wait for the inputs
- f.hooks.wait(op)
- fnval := reflect.ValueOf(fn)
- callParam, err := f.processInputs(ctx, op, fnval, params...)
- if err != nil {
- return nil, err
- }
- // Start again and execute function
- f.hooks.start(op)
- fnret := fnval.Call(callParam)
- if len(fnret) == 0 {
- return nil, nil
- }
- // Output erroring
- if len(fnret) > 1 && (fnret[len(fnret)-1].Interface() != nil) {
- err, ok := fnret[len(fnret)-1].Interface().(error)
- if !ok {
- err = errors.New("unknown error")
- }
- return nil, err
- }
- // THE RESULT
- ret := fnret[0].Interface()
- // Store in the cache
- if ctx != nil {
- ctx.Store(op, ret)
- }
- return ret, nil
- }
- }
- // processInputs will run a list of operations and return reflect values
- // to be processed next
- // NEW PARALLEL PROCESSING
- func (f *Flow) processInputs(ctx OpCtx, op *operation, fnval reflect.Value, params ...Data) ([]reflect.Value, error) {
- nInputs := fnval.Type().NumIn()
- // Total inputs
- callParam := make([]reflect.Value, nInputs)
- if nInputs != len(op.inputs) {
- return nil, fmt.Errorf("expect %d inputs got %d", nInputs, len(op.inputs))
- } //Wait
- callErrors := ""
- paramMutex := sync.Mutex{}
- // Parallel processing if inputs
- wg := sync.WaitGroup{}
- wg.Add(len(op.inputs))
- for i, in := range op.inputs {
- go func(i int, in *operation) {
- defer wg.Done()
- inTyp := fnval.Type().In(i)
- /////////////////
- // Executor
- fr, err := in.executor(ctx, params...)
- //log.Println("Executing:", in.id, in.name, fr)
- paramMutex.Lock()
- defer paramMutex.Unlock()
- if err != nil {
- callErrors += err.Error() + "\n"
- return
- }
- if fr == nil {
- callParam[i] = reflect.Zero(inTyp)
- return
- }
- res := reflect.ValueOf(fr)
- var cres reflect.Value
- // Conversion effort
- switch {
- case !res.IsValid():
- callErrors += fmt.Sprintf("Input %d invalid\n", i)
- return
- case !res.Type().ConvertibleTo(inTyp):
- if inTyp.Kind() != reflect.String {
- callErrors += fmt.Sprintf("Input %d type: %v(%v) cannot be converted to %v\n", i, res.Type(), res.Interface(), inTyp)
- log.Println(f)
- return
- }
- cres = reflect.ValueOf(fmt.Sprint(res.Interface()))
- default:
- cres = res.Convert(inTyp)
- }
- // CheckError and safelly append
- callParam[i] = cres
- }(i, in)
- }
- wg.Wait()
- // Check for any error
- if callErrors != "" {
- log.Println("Call errors:", callErrors)
- return nil, errors.New(callErrors)
- }
- return callParam, nil
- }
- // Var create a operation
- func (f *Flow) Var(name string, initial Data) Operation {
- // Input from params
- var input *operation
- switch v := initial.(type) {
- case *operation:
- input = v
- default:
- c := f.Const(v)
- input = c.(*operation)
- }
- op := &operation{
- Mutex: sync.Mutex{},
- flow: f,
- name: fmt.Sprintf("(var)<%s>", name),
- kind: "var",
- inputs: []*operation{input},
- }
- op.executor = f.makeExecutor(op, func(initial Data) Data {
- val, ok := f.Data[name]
- if !ok {
- val = initial
- f.Data[name] = val
- }
- return val
- })
- return op
- }
- // Var define var operation with optional initial
- /*func (f *Flow) Var(name string, initial ...Data) Operation {
- // Unique
- if _, ok := f.Data[name]; !ok {
- var v interface{}
- if len(initial) > 0 {
- v = initial[0]
- }
- f.Data[name] = v
- }
- op := &operation{
- Mutex: sync.Mutex{},
- flow: f,
- name: fmt.Sprintf("(var)<%s>", name),
- kind: "var",
- inputs: nil,
- }
- op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) {
- // if f.data == nil we set from the init operation
- return f.Data[name], nil
- })
- f.operations = append(f.operations, op)
- //f.operations.Store(id, op)
- return op
- }*/
- // Op Manual tag an Operation
- // Define operation for ID
- func (f *Flow) Op(name string, params ...interface{}) Operation {
- inputs := make([]*operation, len(params))
- for i, p := range params {
- switch v := p.(type) {
- case *operation:
- inputs[i] = v
- default:
- c := f.Const(v)
- inputs[i], _ = c.(*operation)
- }
- }
- // If special executor we attach our func
- // Grab executor here
- registryFn, err := f.registry.Get(name)
- if err != nil {
- return f.ErrOp(err)
- }
- op := &operation{
- Mutex: sync.Mutex{},
- flow: f,
- name: name,
- kind: "func",
- inputs: inputs,
- }
- executor := f.makeExecutor(op, registryFn)
- op.executor = f.asTrigger(op, executor)
- f.operations = append(f.operations, op)
- return op
- }
- // ErrOp define a nil operation that will return error
- // Usefull for builders
- func (f *Flow) ErrOp(err error) Operation {
- op := &operation{
- Mutex: sync.Mutex{},
- //id: id,
- flow: f,
- name: fmt.Sprintf("(error)<%v>", err),
- kind: "error",
- inputs: nil,
- }
- op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) { return nil, err })
- //f.operations = append(f.operations, op)
- return op
- }
- // Const define a const by defined ID
- func (f *Flow) Const(value Data) Operation {
- // Optimize this definition
- constID := -1
- for k, v := range f.consts {
- if v == value {
- constID = k
- break
- }
- }
- if constID == -1 {
- constID = len(f.consts)
- f.consts = append(f.consts, value)
- }
- op := &operation{
- Mutex: sync.Mutex{},
- flow: f,
- name: fmt.Sprintf("(const)<%v:%v>", constID, value),
- kind: "const",
- inputs: nil,
- }
- op.executor = f.asTrigger(op, func(OpCtx, ...Data) (Data, error) { return f.consts[constID], nil })
- //f.operations = append(f.operations, op)
- return op
- }
- // In define input operation
- func (f *Flow) In(paramID int) Operation {
- op := &operation{
- Mutex: sync.Mutex{},
- flow: f,
- name: fmt.Sprintf("(in)<%d>", paramID),
- kind: "in",
- inputs: nil,
- }
- op.executor = f.asTrigger(op, func(ctx OpCtx, params ...Data) (Data, error) {
- if paramID < 0 || paramID >= len(params) {
- return nil, ErrInput
- }
- return params[paramID], nil
- })
- f.operations = append(f.operations, op)
- return op
- }
|