|
1 | | -{% if flag?(:preview_mt) %} |
2 | | - require "./fd_lock_mt" |
3 | | -{% else %} |
4 | | - require "./fd_lock_no_mt" |
5 | | -{% end %} |
| 1 | +# The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause, |
| 2 | +# Copyright Google): |
| 3 | +# https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go |
| 4 | + |
| 5 | +# :nodoc: |
| 6 | +# |
| 7 | +# Tracks active references over a system file descriptor (fd) and serializes |
| 8 | +# reads and writes. |
| 9 | +# |
| 10 | +# Every access to the fd that may affect its system state or system buffers must |
| 11 | +# acquire a shared lock. |
| 12 | +# |
| 13 | +# The fdlock can be closed at any time, but the actual system close will wait |
| 14 | +# until there are no more references left. This avoids potential races when a |
| 15 | +# thread might try to read a fd that has been closed and has been reused by the |
| 16 | +# OS for example. |
| 17 | +struct Crystal::FdLock |
| 18 | + CLOSED = 1_u32 << 0 # the fdlock has been closed |
| 19 | + REF = 1_u32 << 1 # the reference counter increment |
| 20 | + MASK = ~(REF - 1) # mask for the reference counter |
| 21 | + |
| 22 | + @m = Atomic(UInt32).new(0_u32) |
| 23 | + @closing : Fiber? |
| 24 | + |
| 25 | + # Borrows a reference for the duration of the block. Raises if the fdlock is |
| 26 | + # closed while trying to borrow. |
| 27 | + def reference(& : -> F) : F forall F |
| 28 | + m, success = @m.compare_and_set(0_u32, REF, :acquire, :relaxed) |
| 29 | + increment_slow(m) unless success |
| 30 | + |
| 31 | + begin |
| 32 | + yield |
| 33 | + ensure |
| 34 | + m = @m.sub(REF, :release) |
| 35 | + handle_last_ref(m) |
| 36 | + end |
| 37 | + end |
| 38 | + |
| 39 | + private def increment_slow(m) |
| 40 | + while true |
| 41 | + if (m & CLOSED) == CLOSED |
| 42 | + raise IO::Error.new("Closed") |
| 43 | + end |
| 44 | + m, success = @m.compare_and_set(m, m + REF, :acquire, :relaxed) |
| 45 | + break if success |
| 46 | + end |
| 47 | + end |
| 48 | + |
| 49 | + private def handle_last_ref(m) |
| 50 | + return unless (m & CLOSED) == CLOSED # is closed? |
| 51 | + return unless (m & MASK) == REF # was the last ref? |
| 52 | + |
| 53 | + # the last ref after close is responsible to resume the closing fiber |
| 54 | + if fiber = @closing |
| 55 | + fiber.enqueue |
| 56 | + else |
| 57 | + raise NilAssertionError.new("BUG: expected a closing fiber to resume.") |
| 58 | + end |
| 59 | + end |
| 60 | + |
| 61 | + # Closes the fdlock. Blocks for as long as there are references. |
| 62 | + # |
| 63 | + # The *callback* block must cancel any external waiters (e.g. pending evloop |
| 64 | + # reads or writes). |
| 65 | + # |
| 66 | + # Returns true if the fdlock has been closed: no fiber can acquire a reference |
| 67 | + # anymore, the calling fiber fully owns the fd and can safely close it. |
| 68 | + # |
| 69 | + # Returns false if the fdlock has already been closed: the calling fiber |
| 70 | + # doesn't own the fd and musn't close it, as there might still be active |
| 71 | + # references and another fiber will close anyway. |
| 72 | + def try_close?(&callback : ->) : Bool |
| 73 | + attempts = 0 |
| 74 | + |
| 75 | + while true |
| 76 | + m = @m.get(:relaxed) |
| 77 | + |
| 78 | + if (m & CLOSED) == CLOSED |
| 79 | + # already closed: abort |
| 80 | + return false |
| 81 | + end |
| 82 | + |
| 83 | + # close + increment ref |
| 84 | + m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed) |
| 85 | + break if success |
| 86 | + |
| 87 | + attempts = Thread.delay(attempts) |
| 88 | + end |
| 89 | + |
| 90 | + # set the current fiber as the closing fiber (to be resumed by the last ref) |
| 91 | + @closing = Fiber.current |
| 92 | + |
| 93 | + # decrement the last ref |
| 94 | + m = @m.sub(REF, :release) |
| 95 | + |
| 96 | + begin |
| 97 | + yield |
| 98 | + ensure |
| 99 | + # wait for the last ref... unless we're the last ref! |
| 100 | + Fiber.suspend unless (m & MASK) == REF |
| 101 | + end |
| 102 | + |
| 103 | + @closing = nil |
| 104 | + true |
| 105 | + end |
| 106 | + |
| 107 | + # Resets the fdlock back to its pristine state so it can be used again. |
| 108 | + # Assumes the caller owns the fdlock. This is required by |
| 109 | + # `TCPSocket#initialize`. |
| 110 | + def reset : Nil |
| 111 | + @m.lazy_set(0_u32) |
| 112 | + @closing = nil |
| 113 | + end |
| 114 | + |
| 115 | + def closed? : Bool |
| 116 | + (@m.get(:relaxed) & CLOSED) == CLOSED |
| 117 | + end |
| 118 | +end |
0 commit comments