interrupt_fs.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. // Copyright 2015 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package interruptfs
  15. import (
  16. "fmt"
  17. "os"
  18. "sync"
  19. "golang.org/x/net/context"
  20. "github.com/jacobsa/fuse"
  21. "github.com/jacobsa/fuse/fuseops"
  22. "github.com/jacobsa/fuse/fuseutil"
  23. )
  24. var rootAttrs = fuseops.InodeAttributes{
  25. Nlink: 1,
  26. Mode: os.ModeDir | 0777,
  27. }
  28. const fooID = fuseops.RootInodeID + 1
  29. var fooAttrs = fuseops.InodeAttributes{
  30. Nlink: 1,
  31. Mode: 0777,
  32. Size: 1234,
  33. }
  34. // A file system containing exactly one file, named "foo". ReadFile and
  35. // FlushFile ops can be made to hang until interrupted. Exposes a method for
  36. // synchronizing with the arrival of a read or a flush.
  37. //
  38. // Must be created with New.
  39. type InterruptFS struct {
  40. fuseutil.NotImplementedFileSystem
  41. mu sync.Mutex
  42. blockForReads bool // GUARDED_BY(mu)
  43. blockForFlushes bool // GUARDED_BY(mu)
  44. // Must hold the mutex when closing these.
  45. readReceived chan struct{}
  46. flushReceived chan struct{}
  47. }
  48. func New() (fs *InterruptFS) {
  49. fs = &InterruptFS{
  50. readReceived: make(chan struct{}),
  51. flushReceived: make(chan struct{}),
  52. }
  53. return
  54. }
  55. ////////////////////////////////////////////////////////////////////////
  56. // Public interface
  57. ////////////////////////////////////////////////////////////////////////
  58. // Block until the first read is received.
  59. func (fs *InterruptFS) WaitForFirstRead() {
  60. <-fs.readReceived
  61. }
  62. // Block until the first flush is received.
  63. func (fs *InterruptFS) WaitForFirstFlush() {
  64. <-fs.flushReceived
  65. }
  66. // Enable blocking until interrupted for the next (and subsequent) read ops.
  67. func (fs *InterruptFS) EnableReadBlocking() {
  68. fs.mu.Lock()
  69. defer fs.mu.Unlock()
  70. fs.blockForReads = true
  71. }
  72. // Enable blocking until interrupted for the next (and subsequent) flush ops.
  73. func (fs *InterruptFS) EnableFlushBlocking() {
  74. fs.mu.Lock()
  75. defer fs.mu.Unlock()
  76. fs.blockForFlushes = true
  77. }
  78. ////////////////////////////////////////////////////////////////////////
  79. // FileSystem methods
  80. ////////////////////////////////////////////////////////////////////////
  81. func (fs *InterruptFS) StatFS(
  82. ctx context.Context,
  83. op *fuseops.StatFSOp) (err error) {
  84. return
  85. }
  86. func (fs *InterruptFS) LookUpInode(
  87. ctx context.Context,
  88. op *fuseops.LookUpInodeOp) (err error) {
  89. // We support only one parent.
  90. if op.Parent != fuseops.RootInodeID {
  91. err = fmt.Errorf("Unexpected parent: %v", op.Parent)
  92. return
  93. }
  94. // We support only one name.
  95. if op.Name != "foo" {
  96. err = fuse.ENOENT
  97. return
  98. }
  99. // Fill in the response.
  100. op.Entry.Child = fooID
  101. op.Entry.Attributes = fooAttrs
  102. return
  103. }
  104. func (fs *InterruptFS) GetInodeAttributes(
  105. ctx context.Context,
  106. op *fuseops.GetInodeAttributesOp) (err error) {
  107. switch op.Inode {
  108. case fuseops.RootInodeID:
  109. op.Attributes = rootAttrs
  110. case fooID:
  111. op.Attributes = fooAttrs
  112. default:
  113. err = fmt.Errorf("Unexpected inode ID: %v", op.Inode)
  114. return
  115. }
  116. return
  117. }
  118. func (fs *InterruptFS) OpenFile(
  119. ctx context.Context,
  120. op *fuseops.OpenFileOp) (err error) {
  121. return
  122. }
  123. func (fs *InterruptFS) ReadFile(
  124. ctx context.Context,
  125. op *fuseops.ReadFileOp) (err error) {
  126. fs.mu.Lock()
  127. shouldBlock := fs.blockForReads
  128. // Signal that a read has been received, if this is the first.
  129. select {
  130. case <-fs.readReceived:
  131. default:
  132. close(fs.readReceived)
  133. }
  134. fs.mu.Unlock()
  135. // Wait for cancellation if enabled.
  136. if shouldBlock {
  137. done := ctx.Done()
  138. if done == nil {
  139. panic("Expected non-nil channel.")
  140. }
  141. <-done
  142. err = ctx.Err()
  143. }
  144. return
  145. }
  146. func (fs *InterruptFS) FlushFile(
  147. ctx context.Context,
  148. op *fuseops.FlushFileOp) (err error) {
  149. fs.mu.Lock()
  150. shouldBlock := fs.blockForFlushes
  151. // Signal that a flush has been received, if this is the first.
  152. select {
  153. case <-fs.flushReceived:
  154. default:
  155. close(fs.flushReceived)
  156. }
  157. fs.mu.Unlock()
  158. // Wait for cancellation if enabled.
  159. if shouldBlock {
  160. done := ctx.Done()
  161. if done == nil {
  162. panic("Expected non-nil channel.")
  163. }
  164. <-done
  165. err = ctx.Err()
  166. }
  167. return
  168. }