clientctx.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. // Client handler
  2. package wsrpc
  3. import (
  4. "context"
  5. "errors"
  6. "log"
  7. "reflect"
  8. "strings"
  9. "sync"
  10. "github.com/google/uuid"
  11. "golang.org/x/net/websocket"
  12. )
  13. // Vars
  14. var (
  15. ErrNParam = errors.New("Invalid number of parameters")
  16. ErrNReturn = errors.New("Number of outputs mismatch")
  17. ErrBadImplementation = errors.New("Bad implementation")
  18. errorInterface = reflect.TypeOf((*error)(nil)).Elem()
  19. )
  20. // CallObj object
  21. // Message
  22. type CallObj struct {
  23. OP string `json:"op"` // operation
  24. ID string `json:"id"` // Id of the call
  25. Method string `json:"method"` // could be param 0 instead?
  26. // Payload
  27. Params []interface{} `json:"params"`
  28. Response interface{} `json:"response"`
  29. Error string `json:"error,omitempty"`
  30. }
  31. // DataObj common structure for dymanic json object
  32. //type DataObj map[string]interface{}
  33. // ListenerFunc function type to handle browser events
  34. type ListenerFunc func(...interface{}) (interface{}, error)
  35. // Request request type for handling requests channels
  36. // ClientCtx main client struct
  37. type ClientCtx struct {
  38. context.Context
  39. Close func()
  40. locker sync.Mutex
  41. WS *websocket.Conn
  42. // stuff
  43. listeners map[string]ListenerFunc
  44. requests map[string]chan CallObj
  45. message chan CallObj
  46. }
  47. // NewClient new client (should have background context here)
  48. func NewClient(ws *websocket.Conn) *ClientCtx {
  49. ctx, cancelFunc := context.WithCancel(context.Background())
  50. return &ClientCtx{
  51. Context: ctx,
  52. Close: cancelFunc,
  53. WS: ws,
  54. listeners: map[string]ListenerFunc{},
  55. requests: map[string]chan CallObj{},
  56. }
  57. }
  58. // Process received messages
  59. func (c *ClientCtx) Process() {
  60. for {
  61. select {
  62. case <-c.Done():
  63. return // Break loop
  64. default:
  65. }
  66. var data = CallObj{}
  67. err := websocket.JSON.Receive(c.WS, &data)
  68. if err != nil { // Close receive
  69. c.Close()
  70. return
  71. }
  72. switch data.OP {
  73. case "call":
  74. params := data.Params
  75. var idn = data.Method
  76. var reqID = data.ID
  77. var fn, ok = c.listeners[idn]
  78. if !ok {
  79. websocket.JSON.Send(c.WS, CallObj{
  80. OP: "response",
  81. ID: reqID,
  82. Error: "Method not found",
  83. })
  84. break
  85. }
  86. go func() { // async send
  87. errStr := ""
  88. ret, err := fn(params...)
  89. if err != nil {
  90. errStr = err.Error()
  91. }
  92. var responseObj = CallObj{
  93. OP: "response",
  94. ID: reqID,
  95. Response: ret,
  96. Error: errStr,
  97. }
  98. websocket.JSON.Send(c.WS, responseObj)
  99. }()
  100. case "response":
  101. c.locker.Lock()
  102. mchan, ok := c.requests[data.ID]
  103. delete(c.requests, data.ID)
  104. c.locker.Unlock()
  105. if ok {
  106. mchan <- data
  107. }
  108. }
  109. }
  110. }
  111. // Call a client method and estabilishes a request id
  112. func (c *ClientCtx) Call(method string, params ...interface{}) (interface{}, error) {
  113. u := uuid.New()
  114. uuidStr := u.String()
  115. var callObj = CallObj{
  116. OP: "call",
  117. ID: uuidStr,
  118. Method: method,
  119. Params: params,
  120. }
  121. resCh := make(chan CallObj, 1)
  122. // Store the channel
  123. c.locker.Lock()
  124. c.requests[uuidStr] = resCh
  125. c.locker.Unlock()
  126. // IO send
  127. // Send to dispatcher instead?
  128. websocket.JSON.Send(c.WS, &callObj)
  129. // Hang until response
  130. res := <-resCh // Wait for response
  131. // Got Response
  132. var err error
  133. if res.Error != "" {
  134. err = errors.New(res.Error)
  135. }
  136. return res.Response, err
  137. }
  138. //Define export a method to ws client
  139. func (c *ClientCtx) Define(name string, listener ListenerFunc) {
  140. c.listeners[name] = listener
  141. }
  142. //Export a struct of funcs
  143. func (c *ClientCtx) Export(obj interface{}) {
  144. // Reflect here
  145. typ := reflect.TypeOf(obj)
  146. val := reflect.ValueOf(obj)
  147. for i := 0; i < typ.NumField(); i++ {
  148. fTyp := typ.Field(i)
  149. if fTyp.Type.Kind() != reflect.Func {
  150. log.Println("Ignore non func value")
  151. continue
  152. }
  153. tag, ok := fTyp.Tag.Lookup("wsexport")
  154. if !ok {
  155. continue
  156. }
  157. tagParts := strings.Split(tag, ",")
  158. exportName := tagParts[0]
  159. fnVal := val.Field(i)
  160. // Slow?
  161. c.listeners[exportName] = func(params ...interface{}) (interface{}, error) {
  162. if len(params) != fnVal.Type().NumIn() {
  163. return nil, ErrNParam
  164. }
  165. if fnVal.Type().NumOut() != 2 {
  166. return nil, ErrNReturn
  167. }
  168. if !fnVal.Type().Out(1).Implements(errorInterface) {
  169. return nil, ErrBadImplementation
  170. }
  171. vparam := []reflect.Value{}
  172. for _, p := range params {
  173. vparam = append(vparam, reflect.ValueOf(p))
  174. }
  175. r := fnVal.Call(vparam)
  176. return r[0].Interface(), r[1].Interface().(error)
  177. }
  178. // It is a func, check for tag
  179. }
  180. }