package flowserver import ( "encoding/json" "errors" "flow" "flowserver/flowmsg" "fmt" "io" "io/ioutil" "log" "os" "sync" "time" "github.com/gorilla/websocket" ) // NodeActivity when nodes are processing type NodeActivity struct { ID string `json:"id"` Status string `json:"status"` // nodeStatus, Running, error, result StartTime time.Time `json:"startTime"` EndTime time.Time `json:"endTime"` Data flow.Data `json:"data"` Error string `json:"error"` } // FlowSession Create a session and link clients type FlowSession struct { sync.Mutex manager *FlowSessionManager ID string // Random handle for sessionID // List of clients on this session clients []*websocket.Conn Chat ChatRoom RawDoc []byte // Just share data nodeActivity map[string]*NodeActivity flow *flow.Flow } //NewSession creates and initializes a NewSession func NewSession(fsm *FlowSessionManager, ID string) *FlowSession { // Or load // // fpath, err := fsm.pathFor(ID) if err != nil { log.Println("Error fetching filepath", err) } rawDoc, err := ioutil.ReadFile(fpath) if err != nil { log.Println("Warning: unable to read file:", err) } if rawDoc == nil { rawDoc = []byte{} } s := &FlowSession{ Mutex: sync.Mutex{}, manager: fsm, ID: ID, clients: []*websocket.Conn{}, Chat: ChatRoom{}, RawDoc: rawDoc, nodeActivity: map[string]*NodeActivity{}, flow: nil, } return s } // ClientAdd add a client to session func (s *FlowSession) ClientAdd(c *websocket.Conn) error { s.Lock() defer s.Unlock() err := c.WriteJSON(flowmsg.SendMessage{OP: "sessionJoin", ID: s.ID}) if err != nil { return err } desc, err := s.manager.registry.Descriptions() if err != nil { return err } err = c.WriteJSON(flowmsg.SendMessage{OP: "registry", Data: desc}) if err != nil { return err } s.clients = append(s.clients, c) if len(s.RawDoc) == 0 { return nil } err = c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)}) if err != nil { return err } // Sending activity return c.WriteJSON(flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity}) // Send registry } // ClientRemove remove client from Session func (s *FlowSession) ClientRemove(c *websocket.Conn) { s.Lock() defer s.Unlock() for i, cl := range s.clients { if cl == c { s.clients = append(s.clients[:i], s.clients[i+1:]...) break } } s.Chat.ClientRemove(c) /*if len(s.clients) == 0 && s.flow == nil { log.Println("No more clients, remove session") delete(s.mgr.sessions, s.ID) // Clear memory session }*/ } // ChatJoin the chat room on this session func (s *FlowSession) ChatJoin(c *websocket.Conn, handle string) { s.Chat.ClientAdd(c, handle) } // DocumentUpdate client c Updates the session document func (s *FlowSession) DocumentUpdate(c *websocket.Conn, data []byte) error { s.Lock() defer s.Unlock() s.RawDoc = make([]byte, len(data)) copy(s.RawDoc, data) return s.broadcast(c, flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)}) } // DocumentSave persist document in a file func (s *FlowSession) DocumentSave(data []byte) error { log.Println("Receiving documentSave") s.Lock() defer s.Unlock() log.Println("Saving..") s.RawDoc = make([]byte, len(data)) copy(s.RawDoc, data) fpath, err := s.manager.pathFor(s.ID) if err != nil { log.Println("path error", err) return err } err = ioutil.WriteFile(fpath, s.RawDoc, os.FileMode(0600)) if err != nil { log.Println("writing file", err) return err } s.notify("Session saved") return s.broadcast(nil, flowmsg.SendMessage{OP: "documentSave", Data: "saved"}) } // Document send document to client c func (s *FlowSession) Document(c *websocket.Conn) error { s.Lock() defer s.Unlock() return c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)}) } // NodeRun a node triggering results // Build a flow and run func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error { var err error ID := string(data[1 : len(data)-1]) // remove " instead of unmarshalling json if s.flow != nil { return errors.New("node already running") } // Clear activity s.nodeActivity = map[string]*NodeActivity{} s.Broadcast(nil, flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity}) go func() { log.Printf("Building flow from '%s'\n", string(s.RawDoc)) localr := s.manager.registry.Clone() //Add our log func that is not in global registry localr.Register("Notify", func(v flow.Data, msg string) flow.Data { s.Notify(msg) return v }) localr.Register("Log", func() io.Writer { return s }) s.flow, err = FlowBuild(s.RawDoc, localr, ID) log.Println("Flow:", s.flow) if err != nil { s.Notify(fmt.Sprint("ERR:", err)) log.Println("Flow error:", err) return } defer func() { // After routing gone s.flow = nil }() s.flow.Hook(flow.Hook{ Any: func(name string, ID string, triggerTime time.Time, extra ...flow.Data) { s.Lock() defer s.Unlock() act, ok := s.nodeActivity[ID] if !ok { act = &NodeActivity{ID: ID} s.nodeActivity[ID] = act } status := "" switch name { case "Wait": status = "waiting" act.StartTime = time.Time{} act.EndTime = time.Time{} case "Start": status = "running" act.EndTime = time.Time{} act.StartTime = triggerTime case "Finish": status = "finish" act.EndTime = triggerTime act.Data = extra[0] case "Error": status = "error" act.EndTime = triggerTime act.Error = fmt.Sprint(extra[0]) } if act.Status == status { return } act.Status = status s.broadcast(nil, flowmsg.SendMessage{OP: "nodeActivity", Data: s.nodeActivity}) }, }) op := s.flow.Res(ID) _, err := op.Process() if err != nil { log.Println("error processing node", err) } s.flow = nil }() return nil } // Notify broadcast a notification to clients func (s *FlowSession) Notify(v interface{}) error { s.Lock() defer s.Unlock() return s.notify(v) } func (s *FlowSession) notify(v interface{}) error { return s.broadcast(nil, flowmsg.SendMessage{OP: "sessionNotify", Data: v}) } // Write io.Writer implementation to send event logging func (s *FlowSession) Write(data []byte) (int, error) { err := s.Broadcast(nil, flowmsg.SendMessage{OP: "sessionLog", Data: string(data)}) if err != nil { return -1, err } return len(data), nil } // Broadcast broadcast a message in session besides C func (s *FlowSession) Broadcast(c *websocket.Conn, v interface{}) error { s.Lock() defer s.Unlock() return s.broadcast(c, v) } // func (s *FlowSession) broadcast(c *websocket.Conn, v interface{}) error { for _, sc := range s.clients { if sc == c { // ours continue } err := sc.WriteJSON(v) if err != nil { return err } } return nil }