session.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. package flowserver
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "flow"
  6. "flowserver/flowbuilder"
  7. "flowserver/flowmsg"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. "log"
  12. "os"
  13. "sync"
  14. "time"
  15. "github.com/gorilla/websocket"
  16. )
  17. // NodeActivity when nodes are processing
  18. type NodeActivity struct {
  19. ID string `json:"id"`
  20. Status string `json:"status"` // nodeStatus, Running, error, result
  21. StartTime time.Time `json:"startTime"`
  22. EndTime time.Time `json:"endTime"`
  23. Data flow.Data `json:"data"`
  24. Error string `json:"error"`
  25. }
  26. // FlowSession Create a session and link clients
  27. type FlowSession struct {
  28. sync.Mutex
  29. manager *FlowSessionManager
  30. ID string // Random handle for sessionID
  31. // List of clients on this session
  32. clients []*websocket.Conn
  33. Chat ChatRoom
  34. RawDoc []byte // Just share data
  35. nodeActivity map[string]*NodeActivity
  36. flow *flow.Flow
  37. }
  38. //NewSession creates and initializes a NewSession
  39. func NewSession(fsm *FlowSessionManager, ID string) *FlowSession {
  40. // Or load
  41. //
  42. //
  43. fpath, err := fsm.pathFor(ID)
  44. if err != nil {
  45. log.Println("Error fetching filepath", err)
  46. }
  47. rawDoc, err := ioutil.ReadFile(fpath)
  48. if err != nil {
  49. log.Println("Warning: unable to read file:", err)
  50. }
  51. if rawDoc == nil {
  52. rawDoc = []byte{}
  53. }
  54. s := &FlowSession{
  55. Mutex: sync.Mutex{},
  56. manager: fsm,
  57. ID: ID,
  58. clients: []*websocket.Conn{},
  59. Chat: ChatRoom{},
  60. RawDoc: rawDoc,
  61. nodeActivity: map[string]*NodeActivity{},
  62. flow: nil,
  63. }
  64. return s
  65. }
  66. // ClientAdd add a client to session
  67. func (s *FlowSession) ClientAdd(c *websocket.Conn) error {
  68. s.Lock()
  69. defer s.Unlock()
  70. err := c.WriteJSON(flowmsg.SendMessage{OP: "sessionJoin", ID: s.ID})
  71. if err != nil {
  72. return err
  73. }
  74. desc, err := s.manager.registry.Descriptions()
  75. if err != nil {
  76. return err
  77. }
  78. err = c.WriteJSON(flowmsg.SendMessage{OP: "registry", Data: desc})
  79. if err != nil {
  80. return err
  81. }
  82. s.clients = append(s.clients, c)
  83. if len(s.RawDoc) == 0 {
  84. return nil
  85. }
  86. err = c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
  87. if err != nil {
  88. return err
  89. }
  90. // Sending activity
  91. return c.WriteJSON(s.activity())
  92. // Send registry
  93. }
  94. // ClientRemove remove client from Session
  95. func (s *FlowSession) ClientRemove(c *websocket.Conn) {
  96. s.Lock()
  97. defer s.Unlock()
  98. for i, cl := range s.clients {
  99. if cl == c {
  100. s.clients = append(s.clients[:i], s.clients[i+1:]...)
  101. break
  102. }
  103. }
  104. s.Chat.ClientRemove(c)
  105. /*if len(s.clients) == 0 && s.flow == nil {
  106. log.Println("No more clients, remove session")
  107. delete(s.mgr.sessions, s.ID) // Clear memory session
  108. }*/
  109. }
  110. // ChatJoin the chat room on this session
  111. func (s *FlowSession) ChatJoin(c *websocket.Conn, handle string) {
  112. s.Chat.ClientAdd(c, handle)
  113. }
  114. // DocumentUpdate client c Updates the session document
  115. func (s *FlowSession) DocumentUpdate(c *websocket.Conn, data []byte) error {
  116. s.Lock()
  117. defer s.Unlock()
  118. s.RawDoc = make([]byte, len(data))
  119. copy(s.RawDoc, data)
  120. return s.broadcast(c, flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
  121. }
  122. // DocumentSave persist document in a file
  123. func (s *FlowSession) DocumentSave(data []byte) error {
  124. s.Lock()
  125. defer s.Unlock()
  126. s.RawDoc = make([]byte, len(data))
  127. copy(s.RawDoc, data)
  128. fpath, err := s.manager.pathFor(s.ID)
  129. if err != nil {
  130. log.Println("path error", err)
  131. return err
  132. }
  133. err = ioutil.WriteFile(fpath, s.RawDoc, os.FileMode(0600))
  134. if err != nil {
  135. log.Println("writing file", err)
  136. return err
  137. }
  138. s.notify("Session saved")
  139. return s.broadcast(nil, flowmsg.SendMessage{OP: "documentSave", Data: "saved"})
  140. }
  141. // Document send document to client c
  142. func (s *FlowSession) Document(c *websocket.Conn) error {
  143. s.Lock()
  144. defer s.Unlock()
  145. return c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
  146. }
  147. // NodeRun a node triggering results
  148. // Build a flow and run
  149. func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
  150. ID := string(data[1 : len(data)-1]) // remove " instead of unmarshalling json
  151. if s.flow != nil {
  152. s.Notify("a node is already running")
  153. return errors.New("node already running")
  154. }
  155. // Clear activity
  156. s.nodeActivity = map[string]*NodeActivity{}
  157. s.Broadcast(nil, s.activity()) // Ampty activity
  158. build := func() error {
  159. localR := s.manager.registry.Clone()
  160. //Add our log func that is not in global registry
  161. localR.Add("Notify", func(v flow.Data, msg string) flow.Data {
  162. log.Println("Notify:", msg)
  163. s.Notify(msg)
  164. return v
  165. })
  166. localR.Add("Log", func() io.Writer {
  167. return s
  168. })
  169. // Special func
  170. localR.Add("Variable", func(name string, initial flow.Data) flow.Data {
  171. _, ok := s.flow.Data[name]
  172. if !ok {
  173. s.flow.Data[name] = initial
  174. }
  175. return s.flow.Data[name]
  176. })
  177. localR.Add("Output", func(d interface{}) {
  178. r := fmt.Sprint("Result:", d)
  179. // Do something
  180. s.Notify(r)
  181. s.Write([]byte(r))
  182. })
  183. builder := flowbuilder.New(localR)
  184. builder.Load(s.RawDoc).Build(ID)
  185. if builder.Err != nil {
  186. return builder.Err
  187. }
  188. log.Println("New flow")
  189. s.flow = builder.Flow()
  190. defer func() { // After routing gone
  191. s.flow = nil
  192. }()
  193. s.flow.Hook(flow.Hook{
  194. Any: func(name string, ID string, triggerTime time.Time, extra ...flow.Data) {
  195. s.Lock()
  196. defer s.Unlock()
  197. act, ok := s.nodeActivity[ID]
  198. if !ok {
  199. act = &NodeActivity{ID: ID}
  200. s.nodeActivity[ID] = act
  201. }
  202. status := ""
  203. switch name {
  204. case "Wait":
  205. status = "waiting"
  206. act.StartTime = time.Time{}
  207. act.EndTime = time.Time{}
  208. case "Start":
  209. status = "running"
  210. act.EndTime = time.Time{}
  211. act.StartTime = triggerTime
  212. case "Finish":
  213. status = "finish"
  214. act.EndTime = triggerTime
  215. act.Data = extra[0]
  216. case "Error":
  217. status = "error"
  218. act.EndTime = triggerTime
  219. act.Error = fmt.Sprint(extra[0])
  220. }
  221. if act.Status == status {
  222. return
  223. }
  224. act.Status = status
  225. s.broadcast(nil, s.activity())
  226. },
  227. })
  228. op := s.flow.GetOp(ID)
  229. if op == nil {
  230. return fmt.Errorf("Operation not found %v", ID)
  231. }
  232. log.Println("Processing operation")
  233. _, err := op.Process()
  234. if err != nil {
  235. log.Println("Error operation", err)
  236. return err
  237. }
  238. log.Println("Operation finish")
  239. return nil
  240. }
  241. go func() {
  242. err := build()
  243. if err != nil {
  244. s.Notify(fmt.Sprint("ERR:", err))
  245. }
  246. }()
  247. return nil
  248. }
  249. func (s *FlowSession) activity() *flowmsg.SendMessage {
  250. msg := flowmsg.SendMessage{OP: "nodeActivity",
  251. Data: map[string]interface{}{
  252. "serverTime": time.Now(),
  253. "nodes": s.nodeActivity,
  254. },
  255. }
  256. return &msg
  257. }
  258. // Notify broadcast a notification to clients
  259. func (s *FlowSession) Notify(v interface{}) error {
  260. s.Lock()
  261. defer s.Unlock()
  262. return s.notify(v)
  263. }
  264. func (s *FlowSession) notify(v interface{}) error {
  265. return s.broadcast(nil, flowmsg.SendMessage{OP: "sessionNotify", Data: v})
  266. }
  267. // Write io.Writer implementation to send event logging
  268. func (s *FlowSession) Write(data []byte) (int, error) {
  269. err := s.Broadcast(nil, flowmsg.SendMessage{OP: "sessionLog", Data: string(data)})
  270. if err != nil {
  271. return -1, err
  272. }
  273. return len(data), nil
  274. }
  275. // Broadcast broadcast a message in session besides C
  276. func (s *FlowSession) Broadcast(c *websocket.Conn, v interface{}) error {
  277. s.Lock()
  278. defer s.Unlock()
  279. return s.broadcast(c, v)
  280. }
  281. //
  282. func (s *FlowSession) broadcast(c *websocket.Conn, v interface{}) error {
  283. for _, sc := range s.clients {
  284. if sc == c { // ours
  285. continue
  286. }
  287. err := sc.WriteJSON(v)
  288. if err != nil {
  289. return err
  290. }
  291. }
  292. return nil
  293. }