| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 | package flowimport (	"bytes"	"encoding/json"	"fmt"	"io"	"log"	"os"	"reflect")// Data interfacetype Data = interface{}type opEntry struct {	name     string	inputs   []*operation // still figuring, might be operation	executor interface{}}// Flow structure// We could Create a single array of operations// refs would only mean id, types would be embed in operationtype Flow struct {	registry *Registry	consts     []Data	data       map[string]Data // Should be named, to fetch later	operations map[string]opEntry	err   error	runID int	// Experimental run Event	handlers []func(name string, payLoad map[string]Data)}// New create a new flowfunc New() *Flow {	return &Flow{		registry: globalRegistry,		// Data		consts:     []Data{},		data:       map[string]Data{},		operations: map[string]opEntry{},	}}// Err returns internal error statefunc (f *Flow) Err() error {	return f.err}//SetRegistry use the registry specifiedfunc (f *Flow) SetRegistry(r *Registry) *Flow {	f.registry = r	// chain	return f}// DefOp Manual tag an Operationfunc (f *Flow) DefOp(id string, name string, params ...interface{}) Operation {	inputs := make([]*operation, len(params))	for i, p := range params {		switch v := p.(type) {		case *operation:			inputs[i] = v		default:			log.Println("WARNING defining const with value", v)			inputs[i] = f.Const(v).(*operation)		}	}	// Grab executor here	executor, err := f.registry.Get(name)	if err != nil {		f.err = err		return nil	}	f.operations[id] = opEntry{name, inputs, executor}	return opFunc(f, id)}// Res returns a deferred operation result// passing the Idfunc (f *Flow) Res(id string) Operation {	// Defered operation	return opFunc(f, id)}// Op return an function operator//  name - a previous registered function//  params - the function inputsfunc (f *Flow) Op(name string, params ...interface{}) Operation {	// Use this on Set?	inputs := make([]*operation, len(params))	for i, p := range params {		switch v := p.(type) {		case *operation:			inputs[i] = v		default:			// fail here			log.Println("WARNING defining const with value", v)			inputs[i] = f.Const(v).(*operation)		}	}	// Grab executor here	executor, err := f.registry.Get(name)	if err != nil {		f.err = err		return nil	}	// generate ID	for {		uuid := puuid()		if _, ok := f.operations[uuid]; ok {			continue		}		f.operations[uuid] = opEntry{name, inputs, executor}		return opFunc(f, uuid)	}	// Initialize opfunc maybe}// Const returns a const operationfunc (f *Flow) Const(value Data) Operation {	f.consts = append(f.consts, value)	refID := len(f.consts) - 1	return opConst(f, refID)}// Var operationfunc (f *Flow) Var(name string, initial ...Data) Operation {	if _, ok := f.data[name]; !ok {		var v interface{}		if len(initial) > 0 {			v = initial[0]		}		f.data[name] = v	}	return opVar(f, name)}// In flow input operator//  paramID - index of the parameterfunc (f *Flow) In(paramID int) Operation {	return opIn(f, paramID)}// Run a batch of operation?func (f *Flow) Run(op Operation, params ...Data) (Data, error) {	if f.err != nil {		return nil, f.err	}	cache := map[*operation]Data{}	return f.run(cache, op, params...)}func (f *Flow) run(cache map[*operation]Data, op Operation, params ...Data) (Data, error) {	o := op.(*operation)	if v, ok := cache[o]; ok {		return v, nil	}	// Manually fetch func data because of caching	var r Data	// This is wrong since the only source of func should be on operation	if o.kind == "func" {		op := f.operations[o.id.(string)]		callParam := make([]reflect.Value, len(op.inputs))		for i, in := range op.inputs {			fr, _ := f.run(cache, in, params...) // ignore error			callParam[i] = reflect.ValueOf(fr)		}		r = reflect.ValueOf(op.executor).Call(callParam)[0].Interface()	} else {		r = o.process(nil, params...)	}	cache[o] = r	return r, nil}// Analyse every operationsfunc (f *Flow) Analyse(w io.Writer, params ...Data) {	if w == nil {		w = os.Stdout	}	fmt.Fprintf(w, "Ops analysis:\n")	for k, op := range f.operations {		fw := bytes.NewBuffer(nil)		//fmt.Fprintf(w, "  [%s] (%v)", k, op.name)		fmt.Fprintf(fw, "  [%s] %s(", k, op.name)		for j, in := range op.inputs {			//ref := in.(Op)			if j != 0 {				fmt.Fprintf(fw, ", ")			}			ires := in.Process(params...)			if f.err != nil {				fmt.Fprintf(w, "Operator: %s error#%s\n", op.name, f.err.Error())				return			}			fmt.Fprintf(fw, " %s[%v](%v)", in.kind, in.id, ires)		}		fmt.Fprintf(fw, ") - ")		// Create OpProcessor and execute		//		opfn := opFunc(f, k)		res := opfn.Process(params...)		fmt.Fprintf(fw, "%v\n", res)		fmt.Fprintf(w, "%s", fw.String())	}}/////////////////////////////// Serializers inspectors//////////////////////func (f *Flow) String() string {	ret := bytes.NewBuffer(nil)	// consts	fmt.Fprintf(ret, "consts:\n")	for i, v := range f.consts {		fmt.Fprintf(ret, "  [%d] %v\n", i, v)	}	fmt.Fprintf(ret, "data:\n") // Or variable	for k, v := range f.data {		fmt.Fprintf(ret, "  [%v] %v\n", k, v)	}	fmt.Fprintf(ret, "operations:\n")	for k, v := range f.operations {		fmt.Fprintf(ret, "  [%s] %s(", k, v.name)		for j, in := range v.inputs {			if j != 0 {				fmt.Fprintf(ret, ", ")			}			fmt.Fprintf(ret, "%s[%v]", in.kind, in.id)		}		fmt.Fprintf(ret, ")\n")	}	return ret.String()}// MarshalJSON implementationfunc (f *Flow) MarshalJSON() ([]byte, error) {	data := map[string]interface{}{}	type opMarshal struct {		Name  string		Input []map[string]interface{}	}	operations := map[string]opMarshal{}	for k, o := range f.operations {		refs := []map[string]interface{}{}		for _, in := range o.inputs { // Switch type?			refs = append(refs, map[string]interface{}{				"type": in.kind,				"id":   in.id,			})		}		operations[k] = opMarshal{o.name, refs}	}	data["operations"] = operations	data["data"] = f.data	data["consts"] = f.consts	return json.Marshal(data)}//////////////////////////////////////////////// Experimental event////////////////// Handle attach a handlerfunc (f *Flow) Handle(handler func(name string, payLoad map[string]Data)) {	f.handlers = append(f.handlers, handler)}func (f *Flow) trigger(name string, payLoad map[string]Data) {	for _, h := range f.handlers {		h(name, payLoad)	}}
 |