| 
1 |  | -{% if flag?(:preview_mt) %}  | 
2 |  | -  require "./fd_lock_mt"  | 
3 |  | -{% else %}  | 
4 |  | -  require "./fd_lock_no_mt"  | 
5 |  | -{% end %}  | 
 | 1 | +# The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause,  | 
 | 2 | +# Copyright Google):  | 
 | 3 | +# https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go  | 
 | 4 | + | 
 | 5 | +# :nodoc:  | 
 | 6 | +#  | 
 | 7 | +# Tracks active references over a system file descriptor (fd) and serializes  | 
 | 8 | +# reads and writes.  | 
 | 9 | +#  | 
 | 10 | +# Every access to the fd that may affect its system state or system buffers must  | 
 | 11 | +# acquire a shared lock.  | 
 | 12 | +#  | 
 | 13 | +# The fdlock can be closed at any time, but the actual system close will wait  | 
 | 14 | +# until there are no more references left. This avoids potential races when a  | 
 | 15 | +# thread might try to read a fd that has been closed and has been reused by the  | 
 | 16 | +# OS for example.  | 
 | 17 | +struct Crystal::FdLock  | 
 | 18 | +  CLOSED = 1_u32 << 0 # the fdlock has been closed  | 
 | 19 | +  REF    = 1_u32 << 1 # the reference counter increment  | 
 | 20 | +  MASK   = ~(REF - 1) # mask for the reference counter  | 
 | 21 | + | 
 | 22 | +  @m = Atomic(UInt32).new(0_u32)  | 
 | 23 | +  @closing : Fiber?  | 
 | 24 | + | 
 | 25 | +  # Borrows a reference for the duration of the block. Raises if the fdlock is  | 
 | 26 | +  # closed while trying to borrow.  | 
 | 27 | +  def reference(& : -> F) : F forall F  | 
 | 28 | +    m, success = @m.compare_and_set(0_u32, REF, :acquire, :relaxed)  | 
 | 29 | +    increment_slow(m) unless success  | 
 | 30 | + | 
 | 31 | +    begin  | 
 | 32 | +      yield  | 
 | 33 | +    ensure  | 
 | 34 | +      m = @m.sub(REF, :release)  | 
 | 35 | +      handle_last_ref(m)  | 
 | 36 | +    end  | 
 | 37 | +  end  | 
 | 38 | + | 
 | 39 | +  private def increment_slow(m)  | 
 | 40 | +    while true  | 
 | 41 | +      if (m & CLOSED) == CLOSED  | 
 | 42 | +        raise IO::Error.new("Closed")  | 
 | 43 | +      end  | 
 | 44 | +      m, success = @m.compare_and_set(m, m + REF, :acquire, :relaxed)  | 
 | 45 | +      break if success  | 
 | 46 | +    end  | 
 | 47 | +  end  | 
 | 48 | + | 
 | 49 | +  private def handle_last_ref(m)  | 
 | 50 | +    return unless (m & CLOSED) == CLOSED # is closed?  | 
 | 51 | +    return unless (m & MASK) == REF      # was the last ref?  | 
 | 52 | + | 
 | 53 | +    # the last ref after close is responsible to resume the closing fiber  | 
 | 54 | +    if fiber = @closing  | 
 | 55 | +      fiber.enqueue  | 
 | 56 | +    else  | 
 | 57 | +      raise NilAssertionError.new("BUG: expected a closing fiber to resume.")  | 
 | 58 | +    end  | 
 | 59 | +  end  | 
 | 60 | + | 
 | 61 | +  # Closes the fdlock. Blocks for as long as there are references.  | 
 | 62 | +  #  | 
 | 63 | +  # The *callback* block must cancel any external waiters (e.g. pending evloop  | 
 | 64 | +  # reads or writes).  | 
 | 65 | +  #  | 
 | 66 | +  # Returns true if the fdlock has been closed: no fiber can acquire a reference  | 
 | 67 | +  # anymore, the calling fiber fully owns the fd and can safely close it.  | 
 | 68 | +  #  | 
 | 69 | +  # Returns false if the fdlock has already been closed: the calling fiber  | 
 | 70 | +  # doesn't own the fd and musn't close it, as there might still be active  | 
 | 71 | +  # references and another fiber will close anyway.  | 
 | 72 | +  def try_close?(&callback : ->) : Bool  | 
 | 73 | +    attempts = 0  | 
 | 74 | + | 
 | 75 | +    while true  | 
 | 76 | +      m = @m.get(:relaxed)  | 
 | 77 | + | 
 | 78 | +      if (m & CLOSED) == CLOSED  | 
 | 79 | +        # already closed: abort  | 
 | 80 | +        return false  | 
 | 81 | +      end  | 
 | 82 | + | 
 | 83 | +      # close + increment ref  | 
 | 84 | +      m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed)  | 
 | 85 | +      break if success  | 
 | 86 | + | 
 | 87 | +      attempts = Thread.delay(attempts)  | 
 | 88 | +    end  | 
 | 89 | + | 
 | 90 | +    # set the current fiber as the closing fiber (to be resumed by the last ref)  | 
 | 91 | +    @closing = Fiber.current  | 
 | 92 | + | 
 | 93 | +    # decrement the last ref  | 
 | 94 | +    m = @m.sub(REF, :release)  | 
 | 95 | + | 
 | 96 | +    begin  | 
 | 97 | +      yield  | 
 | 98 | +    ensure  | 
 | 99 | +      # wait for the last ref... unless we're the last ref!  | 
 | 100 | +      Fiber.suspend unless (m & MASK) == REF  | 
 | 101 | +    end  | 
 | 102 | + | 
 | 103 | +    @closing = nil  | 
 | 104 | +    true  | 
 | 105 | +  end  | 
 | 106 | + | 
 | 107 | +  # Resets the fdlock back to its pristine state so it can be used again.  | 
 | 108 | +  # Assumes the caller owns the fdlock. This is required by  | 
 | 109 | +  # `TCPSocket#initialize`.  | 
 | 110 | +  def reset : Nil  | 
 | 111 | +    @m.lazy_set(0_u32)  | 
 | 112 | +    @closing = nil  | 
 | 113 | +  end  | 
 | 114 | + | 
 | 115 | +  def closed? : Bool  | 
 | 116 | +    (@m.get(:relaxed) & CLOSED) == CLOSED  | 
 | 117 | +  end  | 
 | 118 | +end  | 
0 commit comments