parallel.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. // Copyright 2015 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package fusetesting
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "os"
  19. "path"
  20. "runtime"
  21. "sync/atomic"
  22. "time"
  23. . "github.com/jacobsa/ogletest"
  24. "github.com/jacobsa/syncutil"
  25. "golang.org/x/net/context"
  26. )
  27. // Run an ogletest test that checks expectations for parallel calls to open(2)
  28. // with O_CREAT.
  29. func RunCreateInParallelTest_NoTruncate(
  30. ctx context.Context,
  31. dir string) {
  32. // Ensure that we get parallelism for this test.
  33. defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
  34. // Try for awhile to see if anything breaks.
  35. const duration = 500 * time.Millisecond
  36. startTime := time.Now()
  37. for time.Since(startTime) < duration {
  38. filename := path.Join(dir, "foo")
  39. // Set up a function that opens the file with O_CREATE and then appends a
  40. // byte to it.
  41. worker := func(id byte) (err error) {
  42. f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
  43. if err != nil {
  44. err = fmt.Errorf("Worker %d: Open: %v", id, err)
  45. return
  46. }
  47. defer f.Close()
  48. _, err = f.Write([]byte{id})
  49. if err != nil {
  50. err = fmt.Errorf("Worker %d: Write: %v", id, err)
  51. return
  52. }
  53. return
  54. }
  55. // Run several workers in parallel.
  56. const numWorkers = 16
  57. b := syncutil.NewBundle(ctx)
  58. for i := 0; i < numWorkers; i++ {
  59. id := byte(i)
  60. b.Add(func(ctx context.Context) (err error) {
  61. err = worker(id)
  62. return
  63. })
  64. }
  65. err := b.Join()
  66. AssertEq(nil, err)
  67. // Read the contents of the file. We should see each worker's ID once.
  68. contents, err := ioutil.ReadFile(filename)
  69. AssertEq(nil, err)
  70. idsSeen := make(map[byte]struct{})
  71. for i, _ := range contents {
  72. id := contents[i]
  73. AssertLt(id, numWorkers)
  74. if _, ok := idsSeen[id]; ok {
  75. AddFailure("Duplicate ID: %d", id)
  76. }
  77. idsSeen[id] = struct{}{}
  78. }
  79. AssertEq(numWorkers, len(idsSeen))
  80. // Delete the file.
  81. err = os.Remove(filename)
  82. AssertEq(nil, err)
  83. }
  84. }
  85. // Run an ogletest test that checks expectations for parallel calls to open(2)
  86. // with O_CREAT|O_TRUNC.
  87. func RunCreateInParallelTest_Truncate(
  88. ctx context.Context,
  89. dir string) {
  90. // Ensure that we get parallelism for this test.
  91. defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
  92. // Try for awhile to see if anything breaks.
  93. const duration = 500 * time.Millisecond
  94. startTime := time.Now()
  95. for time.Since(startTime) < duration {
  96. filename := path.Join(dir, "foo")
  97. // Set up a function that opens the file with O_CREATE and O_TRUNC and then
  98. // appends a byte to it.
  99. worker := func(id byte) (err error) {
  100. f, err := os.OpenFile(
  101. filename,
  102. os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_TRUNC,
  103. 0600)
  104. if err != nil {
  105. err = fmt.Errorf("Worker %d: Open: %v", id, err)
  106. return
  107. }
  108. defer f.Close()
  109. _, err = f.Write([]byte{id})
  110. if err != nil {
  111. err = fmt.Errorf("Worker %d: Write: %v", id, err)
  112. return
  113. }
  114. return
  115. }
  116. // Run several workers in parallel.
  117. const numWorkers = 16
  118. b := syncutil.NewBundle(ctx)
  119. for i := 0; i < numWorkers; i++ {
  120. id := byte(i)
  121. b.Add(func(ctx context.Context) (err error) {
  122. err = worker(id)
  123. return
  124. })
  125. }
  126. err := b.Join()
  127. AssertEq(nil, err)
  128. // Read the contents of the file. We should see at least one ID (the last
  129. // one that truncated), and at most all of them.
  130. contents, err := ioutil.ReadFile(filename)
  131. AssertEq(nil, err)
  132. idsSeen := make(map[byte]struct{})
  133. for i, _ := range contents {
  134. id := contents[i]
  135. AssertLt(id, numWorkers)
  136. if _, ok := idsSeen[id]; ok {
  137. AddFailure("Duplicate ID: %d", id)
  138. }
  139. idsSeen[id] = struct{}{}
  140. }
  141. AssertGe(len(idsSeen), 1)
  142. AssertLe(len(idsSeen), numWorkers)
  143. // Delete the file.
  144. err = os.Remove(filename)
  145. AssertEq(nil, err)
  146. }
  147. }
  148. // Run an ogletest test that checks expectations for parallel calls to open(2)
  149. // with O_CREAT|O_EXCL.
  150. func RunCreateInParallelTest_Exclusive(
  151. ctx context.Context,
  152. dir string) {
  153. // Ensure that we get parallelism for this test.
  154. defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
  155. // Try for awhile to see if anything breaks.
  156. const duration = 500 * time.Millisecond
  157. startTime := time.Now()
  158. for time.Since(startTime) < duration {
  159. filename := path.Join(dir, "foo")
  160. // Set up a function that opens the file with O_CREATE and O_EXCL, and then
  161. // appends a byte to it if it was successfully opened.
  162. var openCount uint64
  163. worker := func(id byte) (err error) {
  164. f, err := os.OpenFile(
  165. filename,
  166. os.O_CREATE|os.O_EXCL|os.O_WRONLY|os.O_APPEND,
  167. 0600)
  168. // If we failed to open due to the file already existing, just leave.
  169. if os.IsExist(err) {
  170. err = nil
  171. return
  172. }
  173. // Propgate other errors.
  174. if err != nil {
  175. err = fmt.Errorf("Worker %d: Open: %v", id, err)
  176. return
  177. }
  178. atomic.AddUint64(&openCount, 1)
  179. defer f.Close()
  180. _, err = f.Write([]byte{id})
  181. if err != nil {
  182. err = fmt.Errorf("Worker %d: Write: %v", id, err)
  183. return
  184. }
  185. return
  186. }
  187. // Run several workers in parallel.
  188. const numWorkers = 16
  189. b := syncutil.NewBundle(ctx)
  190. for i := 0; i < numWorkers; i++ {
  191. id := byte(i)
  192. b.Add(func(ctx context.Context) (err error) {
  193. err = worker(id)
  194. return
  195. })
  196. }
  197. err := b.Join()
  198. AssertEq(nil, err)
  199. // Exactly one worker should have opened successfully.
  200. AssertEq(1, openCount)
  201. // Read the contents of the file. It should contain that one worker's ID.
  202. contents, err := ioutil.ReadFile(filename)
  203. AssertEq(nil, err)
  204. AssertEq(1, len(contents))
  205. AssertLt(contents[0], numWorkers)
  206. // Delete the file.
  207. err = os.Remove(filename)
  208. AssertEq(nil, err)
  209. }
  210. }
  211. // Run an ogletest test that checks expectations for parallel calls to mkdir(2).
  212. func RunMkdirInParallelTest(
  213. ctx context.Context,
  214. dir string) {
  215. // Ensure that we get parallelism for this test.
  216. defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
  217. // Try for awhile to see if anything breaks.
  218. const duration = 500 * time.Millisecond
  219. startTime := time.Now()
  220. for time.Since(startTime) < duration {
  221. filename := path.Join(dir, "foo")
  222. // Set up a function that creates the directory, ignoring EEXIST errors.
  223. worker := func(id byte) (err error) {
  224. err = os.Mkdir(filename, 0700)
  225. if os.IsExist(err) {
  226. err = nil
  227. }
  228. if err != nil {
  229. err = fmt.Errorf("Worker %d: Mkdir: %v", id, err)
  230. return
  231. }
  232. return
  233. }
  234. // Run several workers in parallel.
  235. const numWorkers = 16
  236. b := syncutil.NewBundle(ctx)
  237. for i := 0; i < numWorkers; i++ {
  238. id := byte(i)
  239. b.Add(func(ctx context.Context) (err error) {
  240. err = worker(id)
  241. return
  242. })
  243. }
  244. err := b.Join()
  245. AssertEq(nil, err)
  246. // The directory should have been created, once.
  247. entries, err := ReadDirPicky(dir)
  248. AssertEq(nil, err)
  249. AssertEq(1, len(entries))
  250. AssertEq("foo", entries[0].Name())
  251. // Delete the directory.
  252. err = os.Remove(filename)
  253. AssertEq(nil, err)
  254. }
  255. }
  256. // Run an ogletest test that checks expectations for parallel calls to
  257. // symlink(2).
  258. func RunSymlinkInParallelTest(
  259. ctx context.Context,
  260. dir string) {
  261. // Ensure that we get parallelism for this test.
  262. defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
  263. // Try for awhile to see if anything breaks.
  264. const duration = 500 * time.Millisecond
  265. startTime := time.Now()
  266. for time.Since(startTime) < duration {
  267. filename := path.Join(dir, "foo")
  268. // Set up a function that creates the symlink, ignoring EEXIST errors.
  269. worker := func(id byte) (err error) {
  270. err = os.Symlink("blah", filename)
  271. if os.IsExist(err) {
  272. err = nil
  273. }
  274. if err != nil {
  275. err = fmt.Errorf("Worker %d: Symlink: %v", id, err)
  276. return
  277. }
  278. return
  279. }
  280. // Run several workers in parallel.
  281. const numWorkers = 16
  282. b := syncutil.NewBundle(ctx)
  283. for i := 0; i < numWorkers; i++ {
  284. id := byte(i)
  285. b.Add(func(ctx context.Context) (err error) {
  286. err = worker(id)
  287. return
  288. })
  289. }
  290. err := b.Join()
  291. AssertEq(nil, err)
  292. // The symlink should have been created, once.
  293. entries, err := ReadDirPicky(dir)
  294. AssertEq(nil, err)
  295. AssertEq(1, len(entries))
  296. AssertEq("foo", entries[0].Name())
  297. // Delete the directory.
  298. err = os.Remove(filename)
  299. AssertEq(nil, err)
  300. }
  301. }