flow_test.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package flow_test
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "testing"
  7. "time"
  8. "flow"
  9. vecasm "github.com/gohxs/vec-benchmark/asm"
  10. "flow/internal/assert"
  11. "flow/registry"
  12. )
  13. func init() {
  14. assert.Quiet = true
  15. }
  16. func TestInput(t *testing.T) {
  17. a := assert.A(t)
  18. f := flow.New()
  19. opIn := f.In(0)
  20. a.NotEq(opIn, nil, "input err should not be nil")
  21. d, err := opIn.Process([]float32{2, 2, 2})
  22. a.Eq(d, []float32{2, 2, 2}, "array should be equal")
  23. op := f.Op("vecadd", []float32{1, 1, 1}, opIn)
  24. _, err = op.Process([]float32{1, 2, 3})
  25. a.Eq(err, nil, "result should not error")
  26. a.NotEq(op, nil, "operation should not be nil")
  27. d, err = op.Process([]float32{2, 2, 2})
  28. a.Eq(err, nil, "should not error passing an input")
  29. a.Eq(d, []float32{3, 3, 3}, "array should be equal")
  30. }
  31. func TestDefOp(t *testing.T) {
  32. a := assert.A(t)
  33. f := flow.New()
  34. var err error
  35. _, err = f.DefOp("2", "vecadd", []float32{1, 1, 1}, []float32{2, 2, 2}) // r:3 3 3
  36. a.Eq(err, nil, fmt.Sprintf("doing DefOp\n%v", f))
  37. _, err = f.DefOp("1", "vecadd", []float32{1, 2, 3}, f.GetOp("2")) // r: 4 5 6
  38. a.Eq(err, nil, "doing DefOp")
  39. op := f.Op("vecmul", f.GetOp("1"), []float32{2, 2, 2}) //r:8 10 12
  40. a.NotEq(op, nil, "operation not nil")
  41. _, err = op.Process()
  42. a.Eq(err, nil, "mul operation")
  43. desired := []float32{8, 10, 12}
  44. res, _ := op.Process()
  45. a.Eq(res, desired, fmt.Sprintf("vector result should match:\n%v", f))
  46. op, err = f.DefOp("123", "none")
  47. a.NotEq(err, nil, "Error should not be nil")
  48. }
  49. func TestGetOp(t *testing.T) {
  50. a := assert.A(t)
  51. f := flow.New()
  52. op := f.GetOp("1")
  53. a.Eq(op, nil, "op should be nil")
  54. }
  55. /*func TestIDGen(t *testing.T) {
  56. a := assert.A(t)
  57. idTable := []string{"2", "1", "1"}
  58. f := flow.New()
  59. f.SetIDGen(func() string {
  60. if len(idTable) == 0 {
  61. return "0"
  62. }
  63. newID := idTable[len(idTable)-1]
  64. idTable = idTable[:len(idTable)-1]
  65. return newID
  66. })
  67. i1 := f.In(0)
  68. a.NotEq(i1, nil, "i1 should not be nil")
  69. a.Eq(i1.ID(), "1", "id should be 1")
  70. i2 := f.In(1)
  71. a.NotEq(i2, nil, "i2 should not be nil")
  72. a.Eq(i2.ID(), "2", "id should be 2")
  73. o := f.Op("vecadd", i1, i2)
  74. a.NotEq(o, nil, "Should not nil")
  75. a.Eq(o.ID(), "0", "id should be 0")
  76. o = f.Op("vecadd", i1, i2)
  77. a.Eq(o, nil, "Should be nil, id generation exausted")
  78. }*/
  79. func TestSerialize(t *testing.T) {
  80. // Does not text yet
  81. f := flow.New()
  82. var1 := f.Var("var1", []float32{4, 4, 4})
  83. c1 := f.Const([]float32{1, 2, 3})
  84. c2 := f.Const([]float32{2, 2, 2})
  85. op1 := f.Op("vecmul", // op:0 - expected: [12,16,20,24]
  86. f.Var("vec1", []float32{4, 4, 4, 4}),
  87. f.Op("vecadd", // op:1 - expected: [3,4,5,6]
  88. f.Const([]float32{1, 2, 3, 4}),
  89. f.Const([]float32{2, 2, 2, 2}),
  90. ),
  91. )
  92. mul1 := f.Op("vecmul", c1, op1) // op:2 - expected 12, 32, 60, 0
  93. mul2 := f.Op("vecmul", mul1, var1) // op:3 - expected 48, 128, 240, 0
  94. mul3 := f.Op("vecmul", c2, mul2) // op:4 - expected 96, 256, 480, 0
  95. mul4 := f.Op("vecmul", mul3, f.In(0)) // op:5 - expected 96, 512, 1440,0
  96. s := bytes.NewBuffer(nil)
  97. f.Analyse(s, []float32{1, 2, 3, 4})
  98. t.Log(s)
  99. res, _ := mul4.Process([]float32{1, 2, 3, 4})
  100. t.Log("Res:", res)
  101. t.Log("Flow:\n", f)
  102. ret := bytes.NewBuffer(nil)
  103. e := json.NewEncoder(ret)
  104. e.SetIndent(" ", " ")
  105. e.Encode(f)
  106. // Deserialize
  107. t.Log("Flow:", ret)
  108. }
  109. func TestConst(t *testing.T) {
  110. a := assert.A(t)
  111. f := flow.New()
  112. c := f.Const(1)
  113. res, err := c.Process()
  114. a.Eq(res, 1, "It should be one")
  115. a.Eq(err, nil, "const should not error")
  116. }
  117. func TestOp(t *testing.T) {
  118. a := assert.A(t)
  119. f := flow.New()
  120. add := f.Op("vecadd",
  121. f.Op("vecmul",
  122. []float32{1, 2, 3},
  123. []float32{2, 2, 2},
  124. ),
  125. []float32{1, 2, 3},
  126. )
  127. res, err := add.Process()
  128. a.Eq(err, nil)
  129. test := []float32{3, 6, 9}
  130. a.Eq(test, res)
  131. }
  132. /*
  133. * TODO: Create variable test
  134. func TestVariable(t *testing.T) {
  135. a := assert.A(t)
  136. f := flow.New()
  137. v := f.Var("v1", 1)
  138. res, err := v.Process()
  139. a.Eq(err, nil)
  140. a.Eq(res, 1)
  141. v.Set(2)
  142. res, err = v.Process()
  143. a.Eq(err, nil)
  144. a.Eq(res, 2)
  145. }*/
  146. // Test context
  147. func TestCache(t *testing.T) {
  148. a := assert.A(t)
  149. f := flow.New()
  150. {
  151. r := f.Op("inc")
  152. a.NotEq(r, nil, "should not error giving operation")
  153. for i := 1; i < 5; i++ {
  154. res, err := r.Process()
  155. a.Eq(err, nil)
  156. a.Eq(res, i)
  157. }
  158. }
  159. {
  160. var res flow.Data
  161. inc := f.Op("inc")
  162. add := f.Op("add", inc, inc)
  163. res, _ = add.Process() // 1+1
  164. assert.Eq(t, res, 2)
  165. res, _ = add.Process() // 2+2
  166. assert.Eq(t, res, 4)
  167. }
  168. }
  169. func TestHandler(t *testing.T) {
  170. f, op := prepareComplex()
  171. f.Hook(flow.Hook{
  172. Wait: func(ID string, triggerTime time.Time) { t.Logf("[%s] Wait", ID) },
  173. Start: func(ID string, triggerTime time.Time) { t.Logf("[%s]Start", ID) },
  174. Finish: func(ID string, triggerTime time.Time, res flow.Data) { t.Logf("[%s] Finish %v", ID, res) },
  175. Error: func(ID string, triggerTime time.Time, err error) { t.Logf("[%s] Error %v", ID, err) },
  176. })
  177. op.Process()
  178. }
  179. func TestLocalRegistry(t *testing.T) {
  180. a := assert.A(t)
  181. r := registry.New()
  182. e := r.Add("test", func() string { return "" })
  183. a.NotEq(e, nil, "registered in a local register")
  184. f := flow.New()
  185. f.UseRegistry(r)
  186. op := f.Op("test")
  187. a.NotEq(op, nil, "operation should be valid")
  188. op = f.Op("none")
  189. a.NotEq(op, nil, "operation should not be nil")
  190. _, err := op.Process()
  191. a.NotEq(err, nil, "flow should contain an error")
  192. }
  193. func init() {
  194. registry.Add("vecmul", VecMul)
  195. registry.Add("vecadd", VecAdd)
  196. registry.Add("vecdiv", VecDiv)
  197. registry.Add("inc", Inc)
  198. registry.Add("add", Add)
  199. }
  200. func prepareComplex() (*flow.Flow, flow.Operation) {
  201. vecsize := 5
  202. v1 := make([]float32, vecsize)
  203. v2 := make([]float32, vecsize)
  204. for i := range v1 {
  205. v1[i], v2[i] = float32(i+1), 2
  206. }
  207. f := flow.New()
  208. f1 := f.Var("f1", v1)
  209. f2 := f.Var("f2", v2)
  210. mul := f.Op("vecmul", f1, f2) // Doubles 2,4,6,8...
  211. add := f.Op("vecadd", mul, f2) // Sum 4,8,10,12...
  212. mul2 := f.Op("vecmul", mul, add) // mul again
  213. mul3 := f.Op("vecmul", mul2, f1) // mul with f1
  214. div1 := f.Op("vecdiv", mul3, mul2) // div
  215. return f, div1
  216. }
  217. func VecMul(a, b []float32) []float32 {
  218. sz := Min(len(a), len(b))
  219. out := make([]float32, sz)
  220. vecasm.VecMulf32x8(a, b, out)
  221. return out
  222. }
  223. func VecAdd(a, b []float32) []float32 {
  224. sz := Min(len(a), len(b))
  225. out := make([]float32, sz)
  226. for i := 0; i < sz; i++ {
  227. out[i] = a[i] + b[i]
  228. }
  229. return out
  230. }
  231. func VecDiv(a, b []float32) []float32 {
  232. sz := Min(len(a), len(b))
  233. out := make([]float32, sz)
  234. for i := 0; i < sz; i++ {
  235. out[i] = a[i] / b[i]
  236. }
  237. return out
  238. }
  239. // ScalarInt
  240. // Every time this operator is called we increase 1
  241. // Constructor
  242. func Inc() func() int {
  243. i := 0
  244. return func() int {
  245. i++
  246. return i
  247. }
  248. }
  249. func Add(a, b int) int {
  250. return a + b
  251. }
  252. // Utils
  253. func Min(p ...int) int {
  254. min := p[0]
  255. for _, v := range p[1:] {
  256. if min < v {
  257. min = v
  258. }
  259. }
  260. return min
  261. }