connection.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. // Copyright 2015 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package fuse
  15. import (
  16. "fmt"
  17. "io"
  18. "log"
  19. "os"
  20. "path"
  21. "runtime"
  22. "sync"
  23. "syscall"
  24. "golang.org/x/net/context"
  25. "github.com/jacobsa/fuse/fuseops"
  26. "github.com/jacobsa/fuse/internal/buffer"
  27. "github.com/jacobsa/fuse/internal/freelist"
  28. "github.com/jacobsa/fuse/internal/fusekernel"
  29. )
  30. type contextKeyType uint64
  31. var contextKey interface{} = contextKeyType(0)
  32. // Ask the Linux kernel for larger read requests.
  33. //
  34. // As of 2015-03-26, the behavior in the kernel is:
  35. //
  36. // * (http://goo.gl/bQ1f1i, http://goo.gl/HwBrR6) Set the local variable
  37. // ra_pages to be init_response->max_readahead divided by the page size.
  38. //
  39. // * (http://goo.gl/gcIsSh, http://goo.gl/LKV2vA) Set
  40. // backing_dev_info::ra_pages to the min of that value and what was sent
  41. // in the request's max_readahead field.
  42. //
  43. // * (http://goo.gl/u2SqzH) Use backing_dev_info::ra_pages when deciding
  44. // how much to read ahead.
  45. //
  46. // * (http://goo.gl/JnhbdL) Don't read ahead at all if that field is zero.
  47. //
  48. // Reading a page at a time is a drag. Ask for a larger size.
  49. const maxReadahead = 1 << 20
  50. // Connection represents a connection to the fuse kernel process. It is used to
  51. // receive and reply to requests from the kernel.
  52. type Connection struct {
  53. cfg MountConfig
  54. debugLogger *log.Logger
  55. errorLogger *log.Logger
  56. // The device through which we're talking to the kernel, and the protocol
  57. // version that we're using to talk to it.
  58. dev *os.File
  59. protocol fusekernel.Protocol
  60. mu sync.Mutex
  61. // A map from fuse "unique" request ID (*not* the op ID for logging used
  62. // above) to a function that cancel's its associated context.
  63. //
  64. // GUARDED_BY(mu)
  65. cancelFuncs map[uint64]func()
  66. // Freelists, serviced by freelists.go.
  67. inMessages freelist.Freelist // GUARDED_BY(mu)
  68. outMessages freelist.Freelist // GUARDED_BY(mu)
  69. }
  70. // State that is maintained for each in-flight op. This is stuffed into the
  71. // context that the user uses to reply to the op.
  72. type opState struct {
  73. inMsg *buffer.InMessage
  74. outMsg *buffer.OutMessage
  75. op interface{}
  76. }
  77. // Create a connection wrapping the supplied file descriptor connected to the
  78. // kernel. You must eventually call c.close().
  79. //
  80. // The loggers may be nil.
  81. func newConnection(
  82. cfg MountConfig,
  83. debugLogger *log.Logger,
  84. errorLogger *log.Logger,
  85. dev *os.File) (c *Connection, err error) {
  86. c = &Connection{
  87. cfg: cfg,
  88. debugLogger: debugLogger,
  89. errorLogger: errorLogger,
  90. dev: dev,
  91. cancelFuncs: make(map[uint64]func()),
  92. }
  93. // Initialize.
  94. err = c.Init()
  95. if err != nil {
  96. c.close()
  97. err = fmt.Errorf("Init: %v", err)
  98. return
  99. }
  100. return
  101. }
  102. // Init performs the work necessary to cause the mount process to complete.
  103. func (c *Connection) Init() (err error) {
  104. // Read the init op.
  105. ctx, op, err := c.ReadOp()
  106. if err != nil {
  107. err = fmt.Errorf("Reading init op: %v", err)
  108. return
  109. }
  110. initOp, ok := op.(*initOp)
  111. if !ok {
  112. c.Reply(ctx, syscall.EPROTO)
  113. err = fmt.Errorf("Expected *initOp, got %T", op)
  114. return
  115. }
  116. // Make sure the protocol version spoken by the kernel is new enough.
  117. min := fusekernel.Protocol{
  118. fusekernel.ProtoVersionMinMajor,
  119. fusekernel.ProtoVersionMinMinor,
  120. }
  121. if initOp.Kernel.LT(min) {
  122. c.Reply(ctx, syscall.EPROTO)
  123. err = fmt.Errorf("Version too old: %v", initOp.Kernel)
  124. return
  125. }
  126. // Downgrade our protocol if necessary.
  127. c.protocol = fusekernel.Protocol{
  128. fusekernel.ProtoVersionMaxMajor,
  129. fusekernel.ProtoVersionMaxMinor,
  130. }
  131. if initOp.Kernel.LT(c.protocol) {
  132. c.protocol = initOp.Kernel
  133. }
  134. // Respond to the init op.
  135. initOp.Library = c.protocol
  136. initOp.MaxReadahead = maxReadahead
  137. initOp.MaxWrite = buffer.MaxWriteSize
  138. initOp.Flags = 0
  139. // Tell the kernel not to use pitifully small 4 KiB writes.
  140. initOp.Flags |= fusekernel.InitBigWrites
  141. // Enable writeback caching if the user hasn't asked us not to.
  142. if !c.cfg.DisableWritebackCaching {
  143. initOp.Flags |= fusekernel.InitWritebackCache
  144. }
  145. c.Reply(ctx, nil)
  146. return
  147. }
  148. // Log information for an operation with the given ID. calldepth is the depth
  149. // to use when recovering file:line information with runtime.Caller.
  150. func (c *Connection) debugLog(
  151. fuseID uint64,
  152. calldepth int,
  153. format string,
  154. v ...interface{}) {
  155. if c.debugLogger == nil {
  156. return
  157. }
  158. // Get file:line info.
  159. var file string
  160. var line int
  161. var ok bool
  162. _, file, line, ok = runtime.Caller(calldepth)
  163. if !ok {
  164. file = "???"
  165. }
  166. fileLine := fmt.Sprintf("%v:%v", path.Base(file), line)
  167. // Format the actual message to be printed.
  168. msg := fmt.Sprintf(
  169. "Op 0x%08x %24s] %v",
  170. fuseID,
  171. fileLine,
  172. fmt.Sprintf(format, v...))
  173. // Print it.
  174. c.debugLogger.Println(msg)
  175. }
  176. // LOCKS_EXCLUDED(c.mu)
  177. func (c *Connection) recordCancelFunc(
  178. fuseID uint64,
  179. f func()) {
  180. c.mu.Lock()
  181. defer c.mu.Unlock()
  182. if _, ok := c.cancelFuncs[fuseID]; ok {
  183. panic(fmt.Sprintf("Already have cancel func for request %v", fuseID))
  184. }
  185. c.cancelFuncs[fuseID] = f
  186. }
  187. // Set up state for an op that is about to be returned to the user, given its
  188. // underlying fuse opcode and request ID.
  189. //
  190. // Return a context that should be used for the op.
  191. //
  192. // LOCKS_EXCLUDED(c.mu)
  193. func (c *Connection) beginOp(
  194. opCode uint32,
  195. fuseID uint64) (ctx context.Context) {
  196. // Start with the parent context.
  197. ctx = c.cfg.OpContext
  198. // Set up a cancellation function.
  199. //
  200. // Special case: On Darwin, osxfuse aggressively reuses "unique" request IDs.
  201. // This matters for Forget requests, which have no reply associated and
  202. // therefore have IDs that are immediately eligible for reuse. For these, we
  203. // should not record any state keyed on their ID.
  204. //
  205. // Cf. https://github.com/osxfuse/osxfuse/issues/208
  206. if opCode != fusekernel.OpForget {
  207. var cancel func()
  208. ctx, cancel = context.WithCancel(ctx)
  209. c.recordCancelFunc(fuseID, cancel)
  210. }
  211. return
  212. }
  213. // Clean up all state associated with an op to which the user has responded,
  214. // given its underlying fuse opcode and request ID. This must be called before
  215. // a response is sent to the kernel, to avoid a race where the request's ID
  216. // might be reused by osxfuse.
  217. //
  218. // LOCKS_EXCLUDED(c.mu)
  219. func (c *Connection) finishOp(
  220. opCode uint32,
  221. fuseID uint64) {
  222. c.mu.Lock()
  223. defer c.mu.Unlock()
  224. // Even though the op is finished, context.WithCancel requires us to arrange
  225. // for the cancellation function to be invoked. We also must remove it from
  226. // our map.
  227. //
  228. // Special case: we don't do this for Forget requests. See the note in
  229. // beginOp above.
  230. if opCode != fusekernel.OpForget {
  231. cancel, ok := c.cancelFuncs[fuseID]
  232. if !ok {
  233. panic(fmt.Sprintf("Unknown request ID in finishOp: %v", fuseID))
  234. }
  235. cancel()
  236. delete(c.cancelFuncs, fuseID)
  237. }
  238. }
  239. // LOCKS_EXCLUDED(c.mu)
  240. func (c *Connection) handleInterrupt(fuseID uint64) {
  241. c.mu.Lock()
  242. defer c.mu.Unlock()
  243. // NOTE(jacobsa): fuse.txt in the Linux kernel documentation
  244. // (https://goo.gl/H55Dnr) defines the kernel <-> userspace protocol for
  245. // interrupts.
  246. //
  247. // In particular, my reading of it is that an interrupt request cannot be
  248. // delivered to userspace before the original request. The part about the
  249. // race and EAGAIN appears to be aimed at userspace programs that
  250. // concurrently process requests (cf. http://goo.gl/BES2rs).
  251. //
  252. // So in this method if we can't find the ID to be interrupted, it means that
  253. // the request has already been replied to.
  254. //
  255. // Cf. https://github.com/osxfuse/osxfuse/issues/208
  256. // Cf. http://comments.gmane.org/gmane.comp.file-systems.fuse.devel/14675
  257. cancel, ok := c.cancelFuncs[fuseID]
  258. if !ok {
  259. return
  260. }
  261. cancel()
  262. }
  263. // Read the next message from the kernel. The message must later be destroyed
  264. // using destroyInMessage.
  265. func (c *Connection) readMessage() (m *buffer.InMessage, err error) {
  266. // Allocate a message.
  267. m = c.getInMessage()
  268. // Loop past transient errors.
  269. for {
  270. // Attempt a reaed.
  271. err = m.Init(c.dev)
  272. // Special cases:
  273. //
  274. // * ENODEV means fuse has hung up.
  275. //
  276. // * EINTR means we should try again. (This seems to happen often on
  277. // OS X, cf. http://golang.org/issue/11180)
  278. //
  279. if pe, ok := err.(*os.PathError); ok {
  280. switch pe.Err {
  281. case syscall.ENODEV:
  282. err = io.EOF
  283. case syscall.EINTR:
  284. err = nil
  285. continue
  286. }
  287. }
  288. if err != nil {
  289. c.putInMessage(m)
  290. m = nil
  291. return
  292. }
  293. return
  294. }
  295. }
  296. // Write the supplied message to the kernel.
  297. func (c *Connection) writeMessage(msg []byte) (err error) {
  298. // Avoid the retry loop in os.File.Write.
  299. n, err := syscall.Write(int(c.dev.Fd()), msg)
  300. if err != nil {
  301. return
  302. }
  303. if n != len(msg) {
  304. err = fmt.Errorf("Wrote %d bytes; expected %d", n, len(msg))
  305. return
  306. }
  307. return
  308. }
  309. // ReadOp consumes the next op from the kernel process, returning the op and a
  310. // context that should be used for work related to the op. It returns io.EOF if
  311. // the kernel has closed the connection.
  312. //
  313. // If err != nil, the user is responsible for later calling c.Reply with the
  314. // returned context.
  315. //
  316. // This function delivers ops in exactly the order they are received from
  317. // /dev/fuse. It must not be called multiple times concurrently.
  318. //
  319. // LOCKS_EXCLUDED(c.mu)
  320. func (c *Connection) ReadOp() (ctx context.Context, op interface{}, err error) {
  321. // Keep going until we find a request we know how to convert.
  322. for {
  323. // Read the next message from the kernel.
  324. var inMsg *buffer.InMessage
  325. inMsg, err = c.readMessage()
  326. if err != nil {
  327. return
  328. }
  329. // Convert the message to an op.
  330. outMsg := c.getOutMessage()
  331. op, err = convertInMessage(inMsg, outMsg, c.protocol)
  332. if err != nil {
  333. c.putOutMessage(outMsg)
  334. err = fmt.Errorf("convertInMessage: %v", err)
  335. return
  336. }
  337. // Choose an ID for this operation for the purposes of logging, and log it.
  338. if c.debugLogger != nil {
  339. c.debugLog(inMsg.Header().Unique, 1, "<- %s", describeRequest(op))
  340. }
  341. // Special case: handle interrupt requests inline.
  342. if interruptOp, ok := op.(*interruptOp); ok {
  343. c.handleInterrupt(interruptOp.FuseID)
  344. continue
  345. }
  346. // Set up a context that remembers information about this op.
  347. ctx = c.beginOp(inMsg.Header().Opcode, inMsg.Header().Unique)
  348. ctx = context.WithValue(ctx, contextKey, opState{inMsg, outMsg, op})
  349. // Return the op to the user.
  350. return
  351. }
  352. }
  353. // Skip errors that happen as a matter of course, since they spook users.
  354. func (c *Connection) shouldLogError(
  355. op interface{},
  356. err error) bool {
  357. // We don't log non-errors.
  358. if err == nil {
  359. return false
  360. }
  361. // We can't log if there's nothing to log to.
  362. if c.errorLogger == nil {
  363. return false
  364. }
  365. switch op.(type) {
  366. case *fuseops.LookUpInodeOp:
  367. // It is totally normal for the kernel to ask to look up an inode by name
  368. // and find the name doesn't exist. For example, this happens when linking
  369. // a new file.
  370. if err == syscall.ENOENT {
  371. return false
  372. }
  373. case *fuseops.GetXattrOp:
  374. if err == syscall.ENODATA || err == syscall.ERANGE {
  375. return false
  376. }
  377. case *unknownOp:
  378. // Don't bother the user with methods we intentionally don't support.
  379. if err == syscall.ENOSYS {
  380. return false
  381. }
  382. }
  383. return true
  384. }
  385. // Reply replies to an op previously read using ReadOp, with the supplied error
  386. // (or nil if successful). The context must be the context returned by ReadOp.
  387. //
  388. // LOCKS_EXCLUDED(c.mu)
  389. func (c *Connection) Reply(ctx context.Context, opErr error) {
  390. // Extract the state we stuffed in earlier.
  391. var key interface{} = contextKey
  392. foo := ctx.Value(key)
  393. state, ok := foo.(opState)
  394. if !ok {
  395. panic(fmt.Sprintf("Reply called with invalid context: %#v", ctx))
  396. }
  397. op := state.op
  398. inMsg := state.inMsg
  399. outMsg := state.outMsg
  400. fuseID := inMsg.Header().Unique
  401. // Make sure we destroy the messages when we're done.
  402. defer c.putInMessage(inMsg)
  403. defer c.putOutMessage(outMsg)
  404. // Clean up state for this op.
  405. c.finishOp(inMsg.Header().Opcode, inMsg.Header().Unique)
  406. // Debug logging
  407. if c.debugLogger != nil {
  408. if opErr == nil {
  409. c.debugLog(fuseID, 1, "-> OK (%s)", describeResponse(op))
  410. } else {
  411. c.debugLog(fuseID, 1, "-> Error: %q", opErr.Error())
  412. }
  413. }
  414. // Error logging
  415. if c.shouldLogError(op, opErr) {
  416. c.errorLogger.Printf("%T error: %v", op, opErr)
  417. }
  418. // Send the reply to the kernel, if one is required.
  419. noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr)
  420. if !noResponse {
  421. err := c.writeMessage(outMsg.Bytes())
  422. if err != nil && c.errorLogger != nil {
  423. c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.Bytes())
  424. }
  425. }
  426. }
  427. // Close the connection. Must not be called until operations that were read
  428. // from the connection have been responded to.
  429. func (c *Connection) close() (err error) {
  430. // Posix doesn't say that close can be called concurrently with read or
  431. // write, but luckily we exclude the possibility of a race by requiring the
  432. // user to respond to all ops first.
  433. err = c.dev.Close()
  434. return
  435. }