123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- package flowserver
- import (
- "encoding/json"
- "errors"
- "flow"
- "flowserver/flowmsg"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "os"
- "sync"
- "time"
- "github.com/gorilla/websocket"
- )
- // NodeActivity when nodes are processing
- type NodeActivity struct {
- ID string `json:"id"`
- Status string `json:"status"` // nodeStatus, Running, error, result
- StartTime time.Time `json:"startTime"`
- EndTime time.Time `json:"endTime"`
- Data flow.Data `json:"data"`
- Error string `json:"error"`
- }
- // FlowSession Create a session and link clients
- type FlowSession struct {
- sync.Mutex
- manager *FlowSessionManager
- ID string // Random handle for sessionID
- // List of clients on this session
- clients []*websocket.Conn
- Chat ChatRoom
- RawDoc []byte // Just share data
- nodeActivity map[string]*NodeActivity
- flow *flow.Flow
- }
- //NewSession creates and initializes a NewSession
- func NewSession(fsm *FlowSessionManager, ID string) *FlowSession {
- // Or load
- //
- //
- fpath, err := fsm.pathFor(ID)
- if err != nil {
- log.Println("Error fetching filepath", err)
- }
- rawDoc, err := ioutil.ReadFile(fpath)
- if err != nil {
- log.Println("Warning: unable to read file:", err)
- }
- if rawDoc == nil {
- rawDoc = []byte{}
- }
- s := &FlowSession{
- Mutex: sync.Mutex{},
- manager: fsm,
- ID: ID,
- clients: []*websocket.Conn{},
- Chat: ChatRoom{},
- RawDoc: rawDoc,
- nodeActivity: map[string]*NodeActivity{},
- flow: nil,
- }
- return s
- }
- // ClientAdd add a client to session
- func (s *FlowSession) ClientAdd(c *websocket.Conn) error {
- s.Lock()
- defer s.Unlock()
- err := c.WriteJSON(flowmsg.SendMessage{OP: "sessionJoin", ID: s.ID})
- if err != nil {
- return err
- }
- desc, err := s.manager.registry.Descriptions()
- if err != nil {
- return err
- }
- err = c.WriteJSON(flowmsg.SendMessage{OP: "registry", Data: desc})
- if err != nil {
- return err
- }
- s.clients = append(s.clients, c)
- if len(s.RawDoc) == 0 {
- return nil
- }
- err = c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
- if err != nil {
- return err
- }
- // Sending activity
- return c.WriteJSON(flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
- // Send registry
- }
- // ClientRemove remove client from Session
- func (s *FlowSession) ClientRemove(c *websocket.Conn) {
- s.Lock()
- defer s.Unlock()
- for i, cl := range s.clients {
- if cl == c {
- s.clients = append(s.clients[:i], s.clients[i+1:]...)
- break
- }
- }
- s.Chat.ClientRemove(c)
- /*if len(s.clients) == 0 && s.flow == nil {
- log.Println("No more clients, remove session")
- delete(s.mgr.sessions, s.ID) // Clear memory session
- }*/
- }
- // ChatJoin the chat room on this session
- func (s *FlowSession) ChatJoin(c *websocket.Conn, handle string) {
- s.Chat.ClientAdd(c, handle)
- }
- // DocumentUpdate client c Updates the session document
- func (s *FlowSession) DocumentUpdate(c *websocket.Conn, data []byte) error {
- s.Lock()
- defer s.Unlock()
- s.RawDoc = make([]byte, len(data))
- copy(s.RawDoc, data)
- return s.broadcast(c, flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
- }
- // DocumentSave persist document in a file
- func (s *FlowSession) DocumentSave(data []byte) error {
- log.Println("Receiving documentSave")
- s.Lock()
- defer s.Unlock()
- log.Println("Saving..")
- s.RawDoc = make([]byte, len(data))
- copy(s.RawDoc, data)
- fpath, err := s.manager.pathFor(s.ID)
- if err != nil {
- log.Println("path error", err)
- return err
- }
- err = ioutil.WriteFile(fpath, s.RawDoc, os.FileMode(0600))
- if err != nil {
- log.Println("writing file", err)
- return err
- }
- s.notify("Session saved")
- return s.broadcast(nil, flowmsg.SendMessage{OP: "documentSave", Data: "saved"})
- }
- // Document send document to client c
- func (s *FlowSession) Document(c *websocket.Conn) error {
- s.Lock()
- defer s.Unlock()
- return c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
- }
- // NodeRun a node triggering results
- // Build a flow and run
- func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
- var err error
- ID := string(data[1 : len(data)-1]) // remove " instead of unmarshalling json
- if s.flow != nil {
- return errors.New("node already running")
- }
- // Clear activity
- s.nodeActivity = map[string]*NodeActivity{}
- s.Broadcast(nil, flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
- go func() {
- log.Printf("Building flow from '%s'\n", string(s.RawDoc))
- localr := s.manager.registry.Clone()
- //Add our log func that is not in global registry
- localr.Register("Notify", func(v flow.Data, msg string) flow.Data {
- s.Notify(msg)
- return v
- })
- localr.Register("Log", func() io.Writer {
- return s
- })
- s.flow, err = FlowBuild(s.RawDoc, localr, ID)
- log.Println("Flow:", s.flow)
- if err != nil {
- s.Notify(fmt.Sprint("ERR:", err))
- log.Println("Flow error:", err)
- return
- }
- defer func() { // After routing gone
- s.flow = nil
- }()
- s.flow.Hook(flow.Hook{
- Any: func(name string, ID string, triggerTime time.Time, extra ...flow.Data) {
- s.Lock()
- defer s.Unlock()
- act, ok := s.nodeActivity[ID]
- if !ok {
- act = &NodeActivity{ID: ID}
- s.nodeActivity[ID] = act
- }
- status := ""
- switch name {
- case "Wait":
- status = "waiting"
- act.StartTime = time.Time{}
- act.EndTime = time.Time{}
- case "Start":
- status = "running"
- act.EndTime = time.Time{}
- act.StartTime = triggerTime
- case "Finish":
- status = "finish"
- act.EndTime = triggerTime
- act.Data = extra[0]
- case "Error":
- status = "error"
- act.EndTime = triggerTime
- act.Error = fmt.Sprint(extra[0])
- }
- if act.Status == status {
- return
- }
- act.Status = status
- s.broadcast(nil, flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity})
- },
- })
- op := s.flow.Res(ID)
- _, err := op.Process()
- if err != nil {
- log.Println("error processing node", err)
- }
- s.flow = nil
- }()
- return nil
- }
- // Notify broadcast a notification to clients
- func (s *FlowSession) Notify(v interface{}) error {
- s.Lock()
- defer s.Unlock()
- return s.notify(v)
- }
- func (s *FlowSession) notify(v interface{}) error {
- return s.broadcast(nil, flowmsg.SendMessage{OP: "sessionNotify", Data: v})
- }
- // Write io.Writer implementation to send event logging
- func (s *FlowSession) Write(data []byte) (int, error) {
- err := s.Broadcast(nil, flowmsg.SendMessage{OP: "sessionLog", Data: string(data)})
- if err != nil {
- return -1, err
- }
- return len(data), nil
- }
- // Broadcast broadcast a message in session besides C
- func (s *FlowSession) Broadcast(c *websocket.Conn, v interface{}) error {
- s.Lock()
- defer s.Unlock()
- return s.broadcast(c, v)
- }
- //
- func (s *FlowSession) broadcast(c *websocket.Conn, v interface{}) error {
- for _, sc := range s.clients {
- if sc == c { // ours
- continue
- }
- err := sc.WriteJSON(v)
- if err != nil {
- return err
- }
- }
- return nil
- }
|