123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- package flowserver
- import (
- "encoding/json"
- "errors"
- "flow"
- "flowserver/flowbuilder"
- "flowserver/flowmsg"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "os"
- "sync"
- "time"
- "github.com/gorilla/websocket"
- )
- // NodeActivity when nodes are processing
- type NodeActivity struct {
- ID string `json:"id"`
- Status string `json:"status"` // nodeStatus, Running, error, result
- StartTime time.Time `json:"startTime"`
- EndTime time.Time `json:"endTime"`
- Data flow.Data `json:"data"`
- Error string `json:"error"`
- }
- // FlowSession Create a session and link clients
- type FlowSession struct {
- sync.Mutex
- manager *FlowSessionManager
- ID string // Random handle for sessionID
- // List of clients on this session
- clients []*websocket.Conn
- Chat ChatRoom
- RawDoc []byte // Just share data
- nodeActivity map[string]*NodeActivity
- flow *flow.Flow
- }
- //NewSession creates and initializes a NewSession
- func NewSession(fsm *FlowSessionManager, ID string) *FlowSession {
- // Or load
- //
- //
- fpath, err := fsm.pathFor(ID)
- if err != nil {
- log.Println("Error fetching filepath", err)
- }
- rawDoc, err := ioutil.ReadFile(fpath)
- if err != nil {
- log.Println("Warning: unable to read file:", err)
- }
- if rawDoc == nil {
- rawDoc = []byte{}
- }
- s := &FlowSession{
- Mutex: sync.Mutex{},
- manager: fsm,
- ID: ID,
- clients: []*websocket.Conn{},
- Chat: ChatRoom{},
- RawDoc: rawDoc,
- nodeActivity: map[string]*NodeActivity{},
- flow: nil,
- }
- return s
- }
- // ClientAdd add a client to session
- func (s *FlowSession) ClientAdd(c *websocket.Conn) error {
- s.Lock()
- defer s.Unlock()
- err := c.WriteJSON(flowmsg.SendMessage{OP: "sessionJoin", ID: s.ID})
- if err != nil {
- return err
- }
- desc, err := s.manager.registry.Descriptions()
- if err != nil {
- return err
- }
- err = c.WriteJSON(flowmsg.SendMessage{OP: "registry", Data: desc})
- if err != nil {
- return err
- }
- s.clients = append(s.clients, c)
- if len(s.RawDoc) == 0 {
- return nil
- }
- err = c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
- if err != nil {
- return err
- }
- // Sending activity
- return c.WriteJSON(s.activity())
- // Send registry
- }
- // ClientRemove remove client from Session
- func (s *FlowSession) ClientRemove(c *websocket.Conn) {
- s.Lock()
- defer s.Unlock()
- for i, cl := range s.clients {
- if cl == c {
- s.clients = append(s.clients[:i], s.clients[i+1:]...)
- break
- }
- }
- s.Chat.ClientRemove(c)
- /*if len(s.clients) == 0 && s.flow == nil {
- log.Println("No more clients, remove session")
- delete(s.mgr.sessions, s.ID) // Clear memory session
- }*/
- }
- // ChatJoin the chat room on this session
- func (s *FlowSession) ChatJoin(c *websocket.Conn, handle string) {
- s.Chat.ClientAdd(c, handle)
- }
- // DocumentUpdate client c Updates the session document
- func (s *FlowSession) DocumentUpdate(c *websocket.Conn, data []byte) error {
- s.Lock()
- defer s.Unlock()
- s.RawDoc = make([]byte, len(data))
- copy(s.RawDoc, data)
- return s.broadcast(c, flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
- }
- // DocumentSave persist document in a file
- func (s *FlowSession) DocumentSave(data []byte) error {
- s.Lock()
- defer s.Unlock()
- s.RawDoc = make([]byte, len(data))
- copy(s.RawDoc, data)
- fpath, err := s.manager.pathFor(s.ID)
- if err != nil {
- log.Println("path error", err)
- return err
- }
- err = ioutil.WriteFile(fpath, s.RawDoc, os.FileMode(0600))
- if err != nil {
- log.Println("writing file", err)
- return err
- }
- s.notify("Session saved")
- return s.broadcast(nil, flowmsg.SendMessage{OP: "documentSave", Data: "saved"})
- }
- // Document send document to client c
- func (s *FlowSession) Document(c *websocket.Conn) error {
- s.Lock()
- defer s.Unlock()
- return c.WriteJSON(flowmsg.SendMessage{OP: "document", Data: json.RawMessage(s.RawDoc)})
- }
- // NodeRun a node triggering results
- // Build a flow and run
- func (s *FlowSession) NodeRun(c *websocket.Conn, data []byte) error {
- ID := string(data[1 : len(data)-1]) // remove " instead of unmarshalling json
- if s.flow != nil {
- s.Notify("a node is already running")
- return errors.New("node already running")
- }
- // Clear activity
- s.nodeActivity = map[string]*NodeActivity{}
- s.Broadcast(nil, s.activity()) // Ampty activity
- build := func() error {
- localR := s.manager.registry.Clone()
- //Add our log func that is not in global registry
- localR.Add("Notify", func(v flow.Data, msg string) flow.Data {
- log.Println("Notify:", msg)
- s.Notify(msg)
- return v
- })
- localR.Add("Log", func() io.Writer {
- return s
- })
- // Special func
- localR.Add("Variable", func(name string, initial flow.Data) flow.Data {
- _, ok := s.flow.Data[name]
- if !ok {
- s.flow.Data[name] = initial
- }
- return s.flow.Data[name]
- })
- localR.Add("Output", func(d interface{}) {
- r := fmt.Sprint("Result:", d)
- // Do something
- s.Notify(r)
- s.Write([]byte(r))
- })
- builder := flowbuilder.New(localR)
- builder.Load(s.RawDoc).Build(ID)
- if builder.Err != nil {
- return builder.Err
- }
- log.Println("New flow")
- s.flow = builder.Flow()
- defer func() { // After routing gone
- s.flow = nil
- }()
- s.flow.Hook(flow.Hook{
- Any: func(name string, ID string, triggerTime time.Time, extra ...flow.Data) {
- s.Lock()
- defer s.Unlock()
- act, ok := s.nodeActivity[ID]
- if !ok {
- act = &NodeActivity{ID: ID}
- s.nodeActivity[ID] = act
- }
- status := ""
- switch name {
- case "Wait":
- status = "waiting"
- act.StartTime = time.Time{}
- act.EndTime = time.Time{}
- case "Start":
- status = "running"
- act.EndTime = time.Time{}
- act.StartTime = triggerTime
- case "Finish":
- status = "finish"
- act.EndTime = triggerTime
- act.Data = extra[0]
- case "Error":
- status = "error"
- act.EndTime = triggerTime
- act.Error = fmt.Sprint(extra[0])
- }
- if act.Status == status {
- return
- }
- act.Status = status
- s.broadcast(nil, s.activity())
- },
- })
- op := s.flow.GetOp(ID)
- if op == nil {
- return fmt.Errorf("Operation not found %v", ID)
- }
- log.Println("Processing operation")
- _, err := op.Process()
- if err != nil {
- log.Println("Error operation", err)
- return err
- }
- log.Println("Operation finish")
- return nil
- }
- go func() {
- err := build()
- if err != nil {
- s.Notify(fmt.Sprint("ERR:", err))
- }
- }()
- return nil
- }
- func (s *FlowSession) activity() *flowmsg.SendMessage {
- msg := flowmsg.SendMessage{OP: "nodeActivity",
- Data: map[string]interface{}{
- "serverTime": time.Now(),
- "nodes": s.nodeActivity,
- },
- }
- return &msg
- }
- // Notify broadcast a notification to clients
- func (s *FlowSession) Notify(v interface{}) error {
- s.Lock()
- defer s.Unlock()
- return s.notify(v)
- }
- func (s *FlowSession) notify(v interface{}) error {
- return s.broadcast(nil, flowmsg.SendMessage{OP: "sessionNotify", Data: v})
- }
- // Write io.Writer implementation to send event logging
- func (s *FlowSession) Write(data []byte) (int, error) {
- err := s.Broadcast(nil, flowmsg.SendMessage{OP: "sessionLog", Data: string(data)})
- if err != nil {
- return -1, err
- }
- return len(data), nil
- }
- // Broadcast broadcast a message in session besides C
- func (s *FlowSession) Broadcast(c *websocket.Conn, v interface{}) error {
- s.Lock()
- defer s.Unlock()
- return s.broadcast(c, v)
- }
- //
- func (s *FlowSession) broadcast(c *websocket.Conn, v interface{}) error {
- for _, sc := range s.clients {
- if sc == c { // ours
- continue
- }
- err := sc.WriteJSON(v)
- if err != nil {
- return err
- }
- }
- return nil
- }
|