11#  The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause,
22#  Copyright Google):
33#  https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go
4+ # 
5+ #  The internal details (spinlock, designated waker) of the locks are heavily
6+ #  influenced by the nsync library (LICENSE: Apache-2.0, Copyright Google):
7+ #  https://github.com/google/nsync
48
59#  :nodoc:
610# 
711#  Tracks active references over a system file descriptor (fd) and serializes
812#  reads and writes.
913# 
10- #  Every access to the fd that may affect its system state or system buffers must
11- #  acquire a shared lock.
14+ #  Every read on the fd must lock read, every write must lock write and every
15+ #  other operation (fcntl, setsockopt, ...) must acquire a shared lock. There can
16+ #  be at most one reader + one writer + many references (other operations) at the
17+ #  same time.
1218# 
1319#  The fdlock can be closed at any time, but the actual system close will wait
1420#  until there are no more references left. This avoids potential races when a
1521#  thread might try to read a fd that has been closed and has been reused by the
1622#  OS for example.
23+ # 
24+ #  Serializes reads and writes: only one attempt to read (or write) at a time can
25+ #  go through, which avoids situations where 2 readers are waiting, then the
26+ #  first reader is resumed but doesn't consume everything, then the second reader
27+ #  will never be resumed. With this lock, a waiting reader will always be resumed.
28+ # 
29+ #  Lock concepts
30+ # 
31+ #  Spinlock: slow-path for lock/unlock will spin until it acquires the spinlock
32+ #  bit to add/remove waiters; the CPU is relaxed between each attempt.
33+ # 
34+ #  Designated waker: set on unlock to report that a waiter has been scheduler and
35+ #  there's no need to wake another one. It's unset when a waiter acquires or
36+ #  fails to acquire and adds itself again as a waiter. This leads to an
37+ #  impressive performance boost when the lock is contended.
1738struct  Crystal::FdLock 
1839  CLOSED  =  1 _u32  <<  0  #  the fdlock has been closed
19-   REF     =  1 _u32  <<  1  #  the reference counter increment
40+   RLOCK   =  1 _u32  <<  1  #  reader lock
41+   RWAIT   =  1 _u32  <<  2  #  reader wait bit (at least one reader)
42+   RSPIN   =  1 _u32  <<  3  #  reader spinlock (protects @readers)
43+   RWAKER  =  1 _u32  <<  4  #  reader designated waker (a reader is being awoken)
44+   WLOCK   =  1 _u32  <<  5  #  writer lock
45+   WWAIT   =  1 _u32  <<  6  #  writer wait bit (at least one writer)
46+   WSPIN   =  1 _u32  <<  7  #  writer spinlock (protects @writers)
47+   WWAKER  =  1 _u32  <<  8  #  writer designated waker (a writer is being awoken)
48+   REF     =  1 _u32  <<  9  #  the reference counter increment
2049  MASK    =  ~ (REF  -  1 ) #  mask for the reference counter
2150
2251  @m  =  Atomic (UInt32 ).new(0 _u32 )
2352  @closing  : Fiber ?
53+   @readers  =  PointerLinkedList (Fiber ::PointerLinkedListNode ).new
54+   @writers  =  PointerLinkedList (Fiber ::PointerLinkedListNode ).new
55+ 
56+   #  Locks for read and increments the references by one for the duration of the
57+   #  block. Raises if the fdlock is closed while trying to acquire the lock.
58+   def  read (&  : - >  F ) : F  forall  F 
59+     m, success =  @m .compare_and_set(0 _u32 , RLOCK  +  REF , :acquire , :relaxed )
60+     lock_slow(RLOCK , RWAIT , RSPIN , RWAKER , pointerof (@readers )) unless  success
61+ 
62+     begin 
63+       yield 
64+     ensure 
65+       m, success =  @m .compare_and_set(RLOCK  +  REF , 0 _u32 , :release , :relaxed )
66+       m =  unlock_slow(RLOCK , RWAIT , RSPIN , RWAKER , pointerof (@readers )) unless  success
67+       handle_last_ref(m)
68+     end 
69+   end 
70+ 
71+   #  Locks for write and increments the references by one for the duration of the
72+   #  block. Raises if the fdlock is closed while trying to acquire the lock.
73+   def  write (&  : - >  F ) : F  forall  F 
74+     m, success =  @m .compare_and_set(0 _u32 , WLOCK  +  REF , :acquire , :relaxed )
75+     lock_slow(WLOCK , WWAIT , WSPIN , WWAKER , pointerof (@writers )) unless  success
76+ 
77+     begin 
78+       yield 
79+     ensure 
80+       m, success =  @m .compare_and_set(WLOCK  +  REF , 0 _u32 , :release , :relaxed )
81+       m =  unlock_slow(WLOCK , WWAIT , WSPIN , WWAKER , pointerof (@writers )) unless  success
82+       handle_last_ref(m)
83+     end 
84+   end 
85+ 
86+   @[NoInline ]
87+   private  def  lock_slow (xlock , xwait , xspin , xwaker , waiters )
88+     waiter =  Fiber ::PointerLinkedListNode .new(Fiber .current)
89+     attempts =  0 
90+     clear =  0 _u32 
91+ 
92+     while  true 
93+       m =  @m .get(:relaxed )
94+ 
95+       if  (m &  CLOSED ) ==  CLOSED 
96+         #  abort
97+         raise  IO ::Error .new(" Closed"  )
98+       elsif  (m &  xlock) ==  0 _u32 
99+         #  acquire the lock + increment ref
100+         m, success =  @m .compare_and_set(m, ((m |  xlock) +  REF ) &  ~ clear, :acquire , :relaxed )
101+         return  if  success
102+       elsif  (m &  xspin) ==  0 _u32 
103+         #  acquire spinlock + forward declare pending waiter
104+         m, success =  @m .compare_and_set(m, (m |  xspin |  xwait) &  ~ clear, :acquire , :relaxed )
105+         if  success
106+           waiters.value.push(pointerof (waiter))
107+ 
108+           #  release spinlock before suspending the fiber
109+           @m .and (~ xspin, :release )
110+ 
111+           Fiber .suspend
112+ 
113+           #  the designated waker has woken: clear the flag
114+           clear |=  xwaker
115+         end 
116+       end 
117+ 
118+       attempts =  Thread .delay(attempts)
119+     end 
120+   end 
121+ 
122+   @[NoInline ]
123+   private  def  unlock_slow (xlock , xwait , xspin , xwaker , waiters )
124+     attempts =  0 
125+ 
126+     while  true 
127+       m =  @m .get(:relaxed )
128+ 
129+       if  (m &  CLOSED ) ==  CLOSED 
130+         #  decrement ref and abort
131+         m =  @m .sub(REF , :relaxed )
132+         return  m
133+       elsif  (m &  xwait) ==  0 _u32  ||  (m &  xwaker) !=  0 _u32 
134+         #  no waiter, or there is a designated waker (no need to wake another
135+         #  one): unlock & decrement ref
136+         m, success =  @m .compare_and_set(m, (m &  ~ xlock) -  REF , :release , :relaxed )
137+         return  m if  success
138+       elsif  (m &  xspin) ==  0 _u32 
139+         #  there is a waiter and no designated waker: acquire spinlock + declare
140+         #  a designated waker + release lock & decrement ref early
141+         m, success =  @m .compare_and_set(m, ((m |  xspin |  xwaker) &  ~ xlock) -  REF , :acquire_release , :relaxed )
142+         if  success
143+           waiter =  waiters.value.shift?
144+ 
145+           #  clear flags and release spinlock
146+           clear =  xspin
147+           clear |=  xwaker unless  waiter          #  no designated waker
148+           clear |=  xwait if  waiters.value.empty? #  no more waiters
149+           @m .and (~ clear, :release )
150+ 
151+           waiter.value.enqueue if  waiter
152+ 
153+           #  return the m that decremented ref (for #handle_last_ref)
154+           return  m
155+         end 
156+       end 
157+ 
158+       attempts =  Thread .delay(attempts)
159+     end 
160+   end 
24161
25162  #  Borrows a reference for the duration of the block. Raises if the fdlock is
26163  #  closed while trying to borrow.
@@ -58,20 +195,25 @@ struct Crystal::FdLock
58195    end 
59196  end 
60197
61-   #  Closes the fdlock. Blocks for as long as there are references.
198+   #  Closes the fdlock. Wakes waiting readers and writers. Blocks for as long as
199+   #  there are references.
62200  # 
63201  #  The *callback* block must cancel any external waiters (e.g. pending evloop
64202  #  reads or writes).
65203  # 
66-   #  Returns true if the fdlock has been closed: no fiber can acquire a reference
67-   #  anymore, the calling fiber fully owns the fd and can safely close it.
204+   #  Returns true if the fdlock has been closed: no fiber can lock for read,
205+   #  write or acquire a reference anymore, the calling fiber fully owns the fd
206+   #  and can safely close it.
68207  # 
69208  #  Returns false if the fdlock has already been closed: the calling fiber
70209  #  doesn't own the fd and musn't close it, as there might still be active
71210  #  references and another fiber will close anyway.
72211  def  try_close ?(& callback : - > ) : Bool 
73212    attempts =  0 
74213
214+     #  close + increment ref + acquire both spinlocks so we own both @readers and
215+     #  @writers; parallel attempts to acquire a spinlock will fail, notice that
216+     #  the lock is closed, and abort
75217    while  true 
76218      m =  @m .get(:relaxed )
77219
@@ -80,8 +222,7 @@ struct Crystal::FdLock
80222        return  false 
81223      end 
82224
83-       #  close + increment ref
84-       m, success =  @m .compare_and_set(m, (m +  REF ) |  CLOSED , :acquire , :relaxed )
225+       m, success =  @m .compare_and_set(m, (m +  REF ) |  CLOSED  |  RSPIN  |  WSPIN , :acquire , :relaxed )
85226      break  if  success
86227
87228      attempts =  Thread .delay(attempts)
@@ -90,6 +231,11 @@ struct Crystal::FdLock
90231    #  set the current fiber as the closing fiber (to be resumed by the last ref)
91232    @closing  =  Fiber .current
92233
234+     #  resume waiters so they can fail (the fdlock is closed); this is safe
235+     #  because we acquired the spinlocks above:
236+     @readers .consume_each(& .value.enqueue)
237+     @writers .consume_each(& .value.enqueue)
238+ 
93239    #  decrement the last ref
94240    m =  @m .sub(REF , :release )
95241
0 commit comments