session.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package flowserver
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "flow"
  6. "flowserver/flowmsg"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "log"
  11. "os"
  12. "sync"
  13. "time"
  14. "github.com/gorilla/websocket"
  15. )
  16. // NodeActivity when nodes are processing
  17. type NodeActivity struct {
  18. ID string `json:"id"`
  19. Status string `json:"status"` // nodeStatus, Running, error, result
  20. StartTime time.Time `json:"startTime"`
  21. EndTime time.Time `json:"endTime"`
  22. Data flow.Data `json:"data"`
  23. Error string `json:"error"`
  24. }
  25. // FlowSession Create a session and link clients
  26. type FlowSession struct {
  27. sync.Mutex
  28. manager *FlowSessionManager
  29. ID string // Random handle for sessionID
  30. // List of clients on this session
  31. clients []*websocket.Conn
  32. Chat ChatRoom
  33. RawDoc []byte // Just share data
  34. nodeActivity map[string]*NodeActivity
  35. flow *flow.Flow
  36. }
  37. //NewSession creates and initializes a NewSession
  38. func NewSession(fsm *FlowSessionManager, ID string) *FlowSession {
  39. // Or load
  40. //
  41. //
  42. fpath, err := fsm.pathFor(ID)
  43. if err != nil {
  44. log.Println("Error fetching filepath", err)
  45. }
  46. rawDoc, err := ioutil.ReadFile(fpath)
  47. if err != nil {
  48. log.Println("Warning: unable to read file:", err)
  49. }
  50. if rawDoc == nil {
  51. rawDoc = []byte{}
  52. }
  53. s := &FlowSession{
  54. Mutex: sync.Mutex{},
  55. manager: fsm,
  56. ID: ID,
  57. clients: []*websocket.Conn{},
  58. Chat: ChatRoom{},
  59. RawDoc: rawDoc,
  60. nodeActivity: map[string]*NodeActivity{},
  61. flow: nil,
  62. }
  63. return s
  64. }
  65. // ClientAdd add a client to session
  66. func (s *FlowSession) ClientAdd(c *websocket.Conn) error {
  67. s.Lock()
  68. defer s.Unlock()
  69. err := c.WriteJSON(flowmsg.SendMessage{OP: "sessionJoin", ID: s.ID})
  70. if err != nil {
  71. return err
  72. }
  73. desc, err := s.manager.registry.Descriptions()
  74. if err != nil {
  75. return err
  76. }
  77. err = c.WriteJSON(flowmsg.SendMessage{OP: "registry", Data: desc})
  78. if err != nil {
  79. return err
  80. }
  81. s.clients = append(s.clients, c)
  82. if len(s.RawDoc) == 0 {
  83. return nil
  84. }
  85. err = c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
  86. if err != nil {
  87. return err
  88. }
  89. // Sending activity
  90. return c.WriteJSON(flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
  91. // Send registry
  92. }
  93. // ClientRemove remove client from Session
  94. func (s *FlowSession) ClientRemove(c *websocket.Conn) {
  95. s.Lock()
  96. defer s.Unlock()
  97. for i, cl := range s.clients {
  98. if cl == c {
  99. s.clients = append(s.clients[:i], s.clients[i+1:]...)
  100. break
  101. }
  102. }
  103. s.Chat.ClientRemove(c)
  104. /*if len(s.clients) == 0 && s.flow == nil {
  105. log.Println("No more clients, remove session")
  106. delete(s.mgr.sessions, s.ID) // Clear memory session
  107. }*/
  108. }
  109. // ChatJoin the chat room on this session
  110. func (s *FlowSession) ChatJoin(c *websocket.Conn, handle string) {
  111. s.Chat.ClientAdd(c, handle)
  112. }
  113. // DocumentUpdate client c Updates the session document
  114. func (s *FlowSession) DocumentUpdate(c *websocket.Conn, data []byte) error {
  115. s.Lock()
  116. defer s.Unlock()
  117. s.RawDoc = make([]byte, len(data))
  118. copy(s.RawDoc, data)
  119. return s.broadcast(c, flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
  120. }
  121. // DocumentSave persist document in a file
  122. func (s *FlowSession) DocumentSave(data []byte) error {
  123. log.Println("Receiving documentSave")
  124. s.Lock()
  125. defer s.Unlock()
  126. log.Println("Saving..")
  127. s.RawDoc = make([]byte, len(data))
  128. copy(s.RawDoc, data)
  129. fpath, err := s.manager.pathFor(s.ID)
  130. if err != nil {
  131. log.Println("path error", err)
  132. return err
  133. }
  134. err = ioutil.WriteFile(fpath, s.RawDoc, os.FileMode(0600))
  135. if err != nil {
  136. log.Println("writing file", err)
  137. return err
  138. }
  139. s.notify("Session saved")
  140. return s.broadcast(nil, flowmsg.SendMessage{OP: "documentSave", Data: "saved"})
  141. }
  142. // Document send document to client c
  143. func (s *FlowSession) Document(c *websocket.Conn) error {
  144. s.Lock()
  145. defer s.Unlock()
  146. return c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
  147. }
  148. // NodeRun a node triggering results
  149. // Build a flow and run
  150. func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
  151. var err error
  152. ID := string(data[1 : len(data)-1]) // remove " instead of unmarshalling json
  153. if s.flow != nil {
  154. return errors.New("node already running")
  155. }
  156. // Clear activity
  157. s.nodeActivity = map[string]*NodeActivity{}
  158. s.Broadcast(nil, flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
  159. go func() {
  160. log.Printf("Building flow from '%s'\n", string(s.RawDoc))
  161. localr := s.manager.registry.Clone()
  162. //Add our log func that is not in global registry
  163. localr.Register("Notify", func(v flow.Data, msg string) flow.Data {
  164. s.Notify(msg)
  165. return v
  166. })
  167. localr.Register("Log", func() io.Writer {
  168. return s
  169. })
  170. s.flow, err = FlowBuild(s.RawDoc, localr, ID)
  171. log.Println("Flow:", s.flow)
  172. if err != nil {
  173. s.Notify(fmt.Sprint("ERR:", err))
  174. log.Println("Flow error:", err)
  175. return
  176. }
  177. defer func() { // After routing gone
  178. s.flow = nil
  179. }()
  180. s.flow.Hook(flow.Hook{
  181. Any: func(name string, ID string, triggerTime time.Time, extra ...flow.Data) {
  182. s.Lock()
  183. defer s.Unlock()
  184. act, ok := s.nodeActivity[ID]
  185. if !ok {
  186. act = &NodeActivity{ID: ID}
  187. s.nodeActivity[ID] = act
  188. }
  189. status := ""
  190. switch name {
  191. case "Wait":
  192. status = "waiting"
  193. act.StartTime = time.Time{}
  194. act.EndTime = time.Time{}
  195. case "Start":
  196. status = "running"
  197. act.EndTime = time.Time{}
  198. act.StartTime = triggerTime
  199. case "Finish":
  200. status = "finish"
  201. act.EndTime = triggerTime
  202. act.Data = extra[0]
  203. case "Error":
  204. status = "error"
  205. act.EndTime = triggerTime
  206. act.Error = fmt.Sprint(extra[0])
  207. }
  208. if act.Status == status {
  209. return
  210. }
  211. act.Status = status
  212. s.broadcast(nil, flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
  213. },
  214. })
  215. op := s.flow.Res(ID)
  216. _, err := op.Process()
  217. if err != nil {
  218. log.Println("error processing node", err)
  219. }
  220. s.flow = nil
  221. }()
  222. return nil
  223. }
  224. // Notify broadcast a notification to clients
  225. func (s *FlowSession) Notify(v interface{}) error {
  226. s.Lock()
  227. defer s.Unlock()
  228. return s.notify(v)
  229. }
  230. func (s *FlowSession) notify(v interface{}) error {
  231. return s.broadcast(nil, flowmsg.SendMessage{OP: "sessionNotify", Data: v})
  232. }
  233. // Write io.Writer implementation to send event logging
  234. func (s *FlowSession) Write(data []byte) (int, error) {
  235. err := s.Broadcast(nil, flowmsg.SendMessage{OP: "sessionLog", Data: string(data)})
  236. if err != nil {
  237. return -1, err
  238. }
  239. return len(data), nil
  240. }
  241. // Broadcast broadcast a message in session besides C
  242. func (s *FlowSession) Broadcast(c *websocket.Conn, v interface{}) error {
  243. s.Lock()
  244. defer s.Unlock()
  245. return s.broadcast(c, v)
  246. }
  247. //
  248. func (s *FlowSession) broadcast(c *websocket.Conn, v interface{}) error {
  249. for _, sc := range s.clients {
  250. if sc == c { // ours
  251. continue
  252. }
  253. err := sc.WriteJSON(v)
  254. if err != nil {
  255. return err
  256. }
  257. }
  258. return nil
  259. }