|
@@ -40,7 +40,9 @@ type FlowSession struct {
|
|
|
RawDoc []byte // Just share data
|
|
|
nodeActivity map[string]*NodeActivity
|
|
|
|
|
|
- flow *flow.Flow
|
|
|
+ Data map[interface{}]interface{}
|
|
|
+ flow *flow.Flow
|
|
|
+ running bool
|
|
|
}
|
|
|
|
|
|
//NewSession creates and initializes a NewSession
|
|
@@ -69,6 +71,8 @@ func NewSession(fsm *FlowSessionManager, ID string) *FlowSession {
|
|
|
Chat: ChatRoom{},
|
|
|
RawDoc: rawDoc,
|
|
|
nodeActivity: map[string]*NodeActivity{},
|
|
|
+ // Experimental
|
|
|
+ Data: map[interface{}]interface{}{},
|
|
|
|
|
|
flow: nil,
|
|
|
}
|
|
@@ -197,15 +201,6 @@ func (s *FlowSession) NodeProcess(c *websocket.Conn, data []byte) error {
|
|
|
localR.Add("Log", func() io.Writer {
|
|
|
return s
|
|
|
})
|
|
|
- // Special func
|
|
|
- /*localR.Add("Variable", func(name string, initial flow.Data) flow.Data {
|
|
|
- log.Println("Loading variable:", name)
|
|
|
- _, ok := s.flow.Data[name]
|
|
|
- if !ok {
|
|
|
- s.flow.Data[name] = initial
|
|
|
- }
|
|
|
- return s.flow.Data[name]
|
|
|
- })*/
|
|
|
localR.Add("Output", func(d interface{}) {
|
|
|
|
|
|
r := fmt.Sprint("Result:", d)
|
|
@@ -222,6 +217,11 @@ func (s *FlowSession) NodeProcess(c *websocket.Conn, data []byte) error {
|
|
|
s.flow = builder.Flow()
|
|
|
log.Println("Flow:", s.flow)
|
|
|
|
|
|
+ log.Println("Experimental: Loading data")
|
|
|
+ for k, v := range s.Data {
|
|
|
+ s.flow.Data.Store(k, v)
|
|
|
+ }
|
|
|
+
|
|
|
defer func() { // After routing gone
|
|
|
s.flow = nil
|
|
|
}()
|
|
@@ -277,11 +277,112 @@ func (s *FlowSession) NodeProcess(c *websocket.Conn, data []byte) error {
|
|
|
log.Println("Error operation", err)
|
|
|
return err
|
|
|
}
|
|
|
+ log.Println("Experimental storing data to session")
|
|
|
+ // Copy Data from flow
|
|
|
+ s.flow.Data.Range(func(k, v interface{}) bool {
|
|
|
+ s.Data[k] = v
|
|
|
+ return true
|
|
|
+ })
|
|
|
+ log.Println("Operation finish")
|
|
|
+ log.Println("Flow now:", s.flow)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ err := build()
|
|
|
+ if err != nil {
|
|
|
+ s.Notify(fmt.Sprint("ERR:", err))
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// NodeTrain temporary operation for repeating a node
|
|
|
+// this is for the demo purposes
|
|
|
+func (s *FlowSession) NodeTrain(c *websocket.Conn, data []byte) error {
|
|
|
+ ID := string(data[1 : len(data)-1]) // remove " instead of unmarshalling json
|
|
|
+ if s.flow != nil {
|
|
|
+ s.Notify("a node is already running")
|
|
|
+ return errors.New("node already running")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Clear activity
|
|
|
+ s.nodeActivity = map[string]*NodeActivity{}
|
|
|
+ s.Broadcast(nil, s.activity()) // Ampty activity
|
|
|
+
|
|
|
+ build := func() error {
|
|
|
+ localR := s.manager.registry.Clone()
|
|
|
+ //Add our log func that is not in global registry
|
|
|
+ localR.Add("Notify", func(v flow.Data, msg string) flow.Data {
|
|
|
+ log.Println("Notify:", msg)
|
|
|
+ s.Notify(msg)
|
|
|
+ return v
|
|
|
+ })
|
|
|
+ localR.Add("Log", func() io.Writer {
|
|
|
+ return s
|
|
|
+ })
|
|
|
+ localR.Add("Output", func(d interface{}) {
|
|
|
+ r := fmt.Sprint("Result:", d)
|
|
|
+ // Do something
|
|
|
+ s.Notify(r)
|
|
|
+ s.Write([]byte(r))
|
|
|
+ })
|
|
|
+ builder := flowbuilder.New(localR)
|
|
|
+ builder.Load(s.RawDoc).Build(ID)
|
|
|
+ if builder.Err != nil {
|
|
|
+ return builder.Err
|
|
|
+ }
|
|
|
+
|
|
|
+ s.flow = builder.Flow()
|
|
|
+ log.Println("Flow:", s.flow)
|
|
|
+
|
|
|
+ // XXX: Possibly remove
|
|
|
+ log.Println("Experimental: Loading global data")
|
|
|
+ for k, v := range s.Data {
|
|
|
+ s.flow.Data.Store(k, v)
|
|
|
+ }
|
|
|
+
|
|
|
+ defer func() { // After routing gone
|
|
|
+ s.flow = nil
|
|
|
+ }()
|
|
|
+ // Flow activity
|
|
|
+
|
|
|
+ op, ok := builder.OperationMap[ID]
|
|
|
+ if !ok {
|
|
|
+ return fmt.Errorf("Operation not found %v", ID)
|
|
|
+ }
|
|
|
+ log.Println("Processing operation")
|
|
|
+
|
|
|
+ epochs := 5000
|
|
|
+ s.Notify(fmt.Sprintf("Training for %d epochs", epochs))
|
|
|
+ for i := 0; i < epochs; i++ {
|
|
|
+ res, err := op.Process()
|
|
|
+ if err != nil {
|
|
|
+ log.Println("Error operation", err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if i%1000 == 0 {
|
|
|
+ fmt.Fprintf(s, "Res: %v", res)
|
|
|
+ fmt.Fprintf(s, "Training... %d/%d", i, epochs)
|
|
|
+ outs := builder.Doc.FetchNodeBySrc("Output")
|
|
|
+ if len(outs) == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ fmt.Fprintf(s, "%v", s.flow)
|
|
|
+ // Copy Data from flow
|
|
|
+ s.flow.Data.Range(func(k, v interface{}) bool {
|
|
|
+ s.Data[k] = v
|
|
|
+ return true
|
|
|
+ })
|
|
|
log.Println("Operation finish")
|
|
|
log.Println("Flow now:", s.flow)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+ // Parallel building
|
|
|
go func() {
|
|
|
err := build()
|
|
|
if err != nil {
|
|
@@ -290,6 +391,7 @@ func (s *FlowSession) NodeProcess(c *websocket.Conn, data []byte) error {
|
|
|
}()
|
|
|
|
|
|
return nil
|
|
|
+
|
|
|
}
|
|
|
|
|
|
func (s *FlowSession) activity() *flowmsg.SendMessage {
|