// Client handler package wsrpc import ( "context" "errors" "log" "reflect" "strings" "sync" "github.com/google/uuid" "golang.org/x/net/websocket" ) // Vars var ( ErrNParam = errors.New("Invalid number of parameters") ErrNReturn = errors.New("Number of outputs mismatch") ErrBadImplementation = errors.New("Bad implementation") errorInterface = reflect.TypeOf((*error)(nil)).Elem() ) // CallObj object // Message type CallObj struct { OP string `json:"op"` // operation ID string `json:"id"` // Id of the call Method string `json:"method"` // could be param 0 instead? // Payload Params []interface{} `json:"params"` Response interface{} `json:"response"` Error string `json:"error,omitempty"` } // DataObj common structure for dymanic json object //type DataObj map[string]interface{} // ListenerFunc function type to handle browser events type ListenerFunc func(...interface{}) (interface{}, error) // Request request type for handling requests channels // ClientCtx main client struct type ClientCtx struct { context.Context Close func() locker sync.Mutex WS *websocket.Conn // stuff listeners map[string]ListenerFunc requests map[string]chan CallObj message chan CallObj } // NewClient new client (should have background context here) func NewClient(ws *websocket.Conn) *ClientCtx { ctx, cancelFunc := context.WithCancel(context.Background()) return &ClientCtx{ Context: ctx, Close: cancelFunc, WS: ws, listeners: map[string]ListenerFunc{}, requests: map[string]chan CallObj{}, } } // Process received messages func (c *ClientCtx) Process() { for { select { case <-c.Done(): return // Break loop default: } var data = CallObj{} err := websocket.JSON.Receive(c.WS, &data) if err != nil { // Close receive c.Close() return } switch data.OP { case "call": params := data.Params var idn = data.Method var reqID = data.ID var fn, ok = c.listeners[idn] if !ok { websocket.JSON.Send(c.WS, CallObj{ OP: "response", ID: reqID, Error: "Method not found", }) break } go func() { // async send errStr := "" ret, err := fn(params...) if err != nil { errStr = err.Error() } var responseObj = CallObj{ OP: "response", ID: reqID, Response: ret, Error: errStr, } websocket.JSON.Send(c.WS, responseObj) }() case "response": c.locker.Lock() mchan, ok := c.requests[data.ID] delete(c.requests, data.ID) c.locker.Unlock() if ok { mchan <- data } } } } // Call a client method and estabilishes a request id func (c *ClientCtx) Call(method string, params ...interface{}) (interface{}, error) { u := uuid.New() uuidStr := u.String() var callObj = CallObj{ OP: "call", ID: uuidStr, Method: method, Params: params, } resCh := make(chan CallObj, 1) // Store the channel c.locker.Lock() c.requests[uuidStr] = resCh c.locker.Unlock() // IO send // Send to dispatcher instead? websocket.JSON.Send(c.WS, &callObj) // Hang until response res := <-resCh // Wait for response // Got Response var err error if res.Error != "" { err = errors.New(res.Error) } return res.Response, err } //Define export a method to ws client func (c *ClientCtx) Define(name string, listener ListenerFunc) { c.listeners[name] = listener } //Export a struct of funcs func (c *ClientCtx) Export(obj interface{}) { // Reflect here typ := reflect.TypeOf(obj) val := reflect.ValueOf(obj) for i := 0; i < typ.NumField(); i++ { fTyp := typ.Field(i) if fTyp.Type.Kind() != reflect.Func { log.Println("Ignore non func value") continue } tag, ok := fTyp.Tag.Lookup("wsexport") if !ok { continue } tagParts := strings.Split(tag, ",") exportName := tagParts[0] fnVal := val.Field(i) // Slow? c.listeners[exportName] = func(params ...interface{}) (interface{}, error) { if len(params) != fnVal.Type().NumIn() { return nil, ErrNParam } if fnVal.Type().NumOut() != 2 { return nil, ErrNReturn } if !fnVal.Type().Out(1).Implements(errorInterface) { return nil, ErrBadImplementation } vparam := []reflect.Value{} for _, p := range params { vparam = append(vparam, reflect.ValueOf(p)) } r := fnVal.Call(vparam) return r[0].Interface(), r[1].Interface().(error) } // It is a func, check for tag } }