123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- // Client handler
- package wsrpc
- import (
- "context"
- "errors"
- "log"
- "reflect"
- "strings"
- "sync"
- "github.com/google/uuid"
- "golang.org/x/net/websocket"
- )
- // Vars
- var (
- ErrNParam = errors.New("Invalid number of parameters")
- ErrNReturn = errors.New("Number of outputs mismatch")
- ErrBadImplementation = errors.New("Bad implementation")
- errorInterface = reflect.TypeOf((*error)(nil)).Elem()
- )
- // CallObj object
- // Message
- type CallObj struct {
- OP string `json:"op"` // operation
- ID string `json:"id"` // Id of the call
- Method string `json:"method"` // could be param 0 instead?
- // Payload
- Params []interface{} `json:"params"`
- Response interface{} `json:"response"`
- Error string `json:"error,omitempty"`
- }
- // DataObj common structure for dymanic json object
- //type DataObj map[string]interface{}
- // ListenerFunc function type to handle browser events
- type ListenerFunc func(...interface{}) (interface{}, error)
- // Request request type for handling requests channels
- // ClientCtx main client struct
- type ClientCtx struct {
- context.Context
- Close func()
- locker sync.Mutex
- WS *websocket.Conn
- // stuff
- listeners map[string]ListenerFunc
- requests map[string]chan CallObj
- message chan CallObj
- }
- // NewClient new client (should have background context here)
- func NewClient(ws *websocket.Conn) *ClientCtx {
- ctx, cancelFunc := context.WithCancel(context.Background())
- return &ClientCtx{
- Context: ctx,
- Close: cancelFunc,
- WS: ws,
- listeners: map[string]ListenerFunc{},
- requests: map[string]chan CallObj{},
- }
- }
- // Process received messages
- func (c *ClientCtx) Process() {
- for {
- select {
- case <-c.Done():
- return // Break loop
- default:
- }
- var data = CallObj{}
- err := websocket.JSON.Receive(c.WS, &data)
- if err != nil { // Close receive
- c.Close()
- return
- }
- switch data.OP {
- case "call":
- params := data.Params
- var idn = data.Method
- var reqID = data.ID
- var fn, ok = c.listeners[idn]
- if !ok {
- websocket.JSON.Send(c.WS, CallObj{
- OP: "response",
- ID: reqID,
- Error: "Method not found",
- })
- break
- }
- go func() { // async send
- errStr := ""
- ret, err := fn(params...)
- if err != nil {
- errStr = err.Error()
- }
- var responseObj = CallObj{
- OP: "response",
- ID: reqID,
- Response: ret,
- Error: errStr,
- }
- websocket.JSON.Send(c.WS, responseObj)
- }()
- case "response":
- c.locker.Lock()
- mchan, ok := c.requests[data.ID]
- delete(c.requests, data.ID)
- c.locker.Unlock()
- if ok {
- mchan <- data
- }
- }
- }
- }
- // Call a client method and estabilishes a request id
- func (c *ClientCtx) Call(method string, params ...interface{}) (interface{}, error) {
- u := uuid.New()
- uuidStr := u.String()
- var callObj = CallObj{
- OP: "call",
- ID: uuidStr,
- Method: method,
- Params: params,
- }
- resCh := make(chan CallObj, 1)
- // Store the channel
- c.locker.Lock()
- c.requests[uuidStr] = resCh
- c.locker.Unlock()
- // IO send
- // Send to dispatcher instead?
- websocket.JSON.Send(c.WS, &callObj)
- // Hang until response
- res := <-resCh // Wait for response
- // Got Response
- var err error
- if res.Error != "" {
- err = errors.New(res.Error)
- }
- return res.Response, err
- }
- //Define export a method to ws client
- func (c *ClientCtx) Define(name string, listener ListenerFunc) {
- c.listeners[name] = listener
- }
- //Export a struct of funcs
- func (c *ClientCtx) Export(obj interface{}) {
- // Reflect here
- typ := reflect.TypeOf(obj)
- val := reflect.ValueOf(obj)
- for i := 0; i < typ.NumField(); i++ {
- fTyp := typ.Field(i)
- if fTyp.Type.Kind() != reflect.Func {
- log.Println("Ignore non func value")
- continue
- }
- tag, ok := fTyp.Tag.Lookup("wsexport")
- if !ok {
- continue
- }
- tagParts := strings.Split(tag, ",")
- exportName := tagParts[0]
- fnVal := val.Field(i)
- // Slow?
- c.listeners[exportName] = func(params ...interface{}) (interface{}, error) {
- if len(params) != fnVal.Type().NumIn() {
- return nil, ErrNParam
- }
- if fnVal.Type().NumOut() != 2 {
- return nil, ErrNReturn
- }
- if !fnVal.Type().Out(1).Implements(errorInterface) {
- return nil, ErrBadImplementation
- }
- vparam := []reflect.Value{}
- for _, p := range params {
- vparam = append(vparam, reflect.ValueOf(p))
- }
- r := fnVal.Call(vparam)
- return r[0].Interface(), r[1].Interface().(error)
- }
- // It is a func, check for tag
- }
- }
|