Skip to content

Commit f9c31dc

Browse files
committed
Serialize reads and writes in Crystal::FdLock
Serializes reads and writes so we can assume any IO object will only have at most one read op and one write op. The benefits are: 1. it avoids a race condition in the polling event loops: - Fiber 1 then Fiber 2 try to read from fd; - Since fd isn't ready so both are waiting; - When fd becomes ready then Fiber 1 is resumed; - Fiber 1 doesn't read everything and returns; - Fiber 2 won't be resumed because events are edge-triggered; 2. we can simplify the UNIX event loops (epoll, kqueue, io_uring) that are guaranteed to only have at most one reader and one writer at any time.
1 parent a4bb233 commit f9c31dc

File tree

2 files changed

+291
-8
lines changed

2 files changed

+291
-8
lines changed

spec/std/crystal/fd_lock_spec.cr

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,90 @@ require "crystal/fd_lock"
33
require "wait_group"
44

55
describe Crystal::FdLock do
6+
describe "#read" do
7+
it "acquires read lock" do
8+
lock = Crystal::FdLock.new
9+
called = 0
10+
11+
lock.read { called += 1 }
12+
lock.read { called += 1 }
13+
14+
called.should eq(2)
15+
end
16+
17+
it "acquires exclusive lock" do
18+
lock = Crystal::FdLock.new
19+
increment = 0
20+
21+
WaitGroup.wait do |wg|
22+
10.times do
23+
wg.spawn do
24+
100_000.times do |i|
25+
lock.read do
26+
increment += 1
27+
# Fiber.yield if i % 100 == 1
28+
end
29+
end
30+
end
31+
end
32+
end
33+
34+
increment.should eq(1_000_000)
35+
end
36+
37+
it "raises when closed" do
38+
lock = Crystal::FdLock.new
39+
called = false
40+
41+
lock.try_close? { }
42+
expect_raises(IO::Error, "Closed") { lock.read { called = true; Fiber.yield } }
43+
44+
called.should eq(false)
45+
end
46+
end
47+
48+
describe "#write" do
49+
it "acquires write lock" do
50+
lock = Crystal::FdLock.new
51+
called = 0
52+
53+
lock.write { called += 1 }
54+
lock.write { called += 1 }
55+
56+
called.should eq(2)
57+
end
58+
59+
it "acquires exclusive lock" do
60+
lock = Crystal::FdLock.new
61+
increment = 0
62+
63+
WaitGroup.wait do |wg|
64+
10.times do
65+
wg.spawn do
66+
100_000.times do |i|
67+
lock.write do
68+
increment += 1
69+
# Fiber.yield if i % 100 == 1
70+
end
71+
end
72+
end
73+
end
74+
end
75+
76+
increment.should eq(1_000_000)
77+
end
78+
79+
it "raises when closed" do
80+
lock = Crystal::FdLock.new
81+
called = false
82+
83+
lock.try_close? { }
84+
expect_raises(IO::Error, "Closed") { lock.read { called = true } }
85+
86+
called.should eq(false)
87+
end
88+
end
89+
690
describe "#reference" do
791
it "acquires" do
892
lock = Crystal::FdLock.new
@@ -124,6 +208,68 @@ describe Crystal::FdLock do
124208
raise ex
125209
end
126210
end
211+
212+
it "resumes waiters" do
213+
lock = Crystal::FdLock.new
214+
215+
ready = WaitGroup.new(8)
216+
running = WaitGroup.new
217+
exceptions = Channel(Exception).new(8)
218+
219+
# acquire locks
220+
lock.read do
221+
lock.write do
222+
# spawn concurrent fibers
223+
4.times do |i|
224+
running.spawn do
225+
ready.done
226+
lock.read { }
227+
rescue ex
228+
exceptions.send(ex)
229+
end
230+
231+
running.spawn do
232+
ready.done
233+
lock.write { }
234+
rescue ex
235+
exceptions.send(ex)
236+
end
237+
end
238+
239+
# wait for all the concurrent fibers to be trying to lock
240+
ready.wait
241+
end
242+
end
243+
244+
# close, then wait for the fibers to be resumed (and fail)
245+
lock.try_close? { }.should eq(true)
246+
running.wait
247+
exceptions.close
248+
249+
# fibers should have failed (unlikely: one may succeed to lock)
250+
failed = 0
251+
while ex = exceptions.receive?
252+
failed += 1
253+
ex.should be_a(IO::Error)
254+
ex.message.should eq("Closed")
255+
end
256+
failed.should be > 0
257+
end
258+
end
259+
260+
it "locks read + write + shared reference" do
261+
lock = Crystal::FdLock.new
262+
called = 0
263+
264+
lock.read do
265+
lock.write do
266+
lock.reference do
267+
called += 1
268+
end
269+
end
270+
end
271+
272+
called.should eq(1)
127273
end
128274

129275
it "#reset" do

src/crystal/fd_lock.cr

Lines changed: 145 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,154 @@
11
# The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause,
22
# Copyright Google):
33
# https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go
4+
#
5+
# The internal details (spinlock, long waiter, designated waker) of the locks
6+
# are heavily influenced by the nsync library (LICENSE: Apache-2.0, Copyright
7+
# Google):
8+
# https://github.com/google/nsync
49

510
# :nodoc:
611
#
712
# Tracks active references over a system file descriptor (fd) and serializes
813
# reads and writes.
914
#
10-
# Every access to the fd that may affect its system state or system buffers must
11-
# acquire a shared lock.
15+
# Every read on the fd must lock read, every write must lock write and every
16+
# other operation (fcntl, setsockopt, ...) must acquire a shared lock. There can
17+
# be at most one reader + one writer + many references (other operations) at the
18+
# same time.
1219
#
1320
# The fdlock can be closed at any time, but the actual system close will wait
1421
# until there are no more references left. This avoids potential races when a
1522
# thread might try to read a fd that has been closed and has been reused by the
1623
# OS for example.
24+
#
25+
# Serializes reads and writes: only one attempt to read (or write) at a time can
26+
# go through, which avoids situations where 2 readers are waiting, then the
27+
# first reader is resumed but doesn't consume everything, then the second reader
28+
# will never be resumed. With this lock, a waiting reader will always be resumed.
1729
struct Crystal::FdLock
1830
CLOSED = 1_u32 << 0 # the fdlock has been closed
19-
REF = 1_u32 << 1 # the reference counter increment
31+
RLOCK = 1_u32 << 1 # reader lock
32+
RWAIT = 1_u32 << 2 # reader wait bit (at least one reader)
33+
RSPIN = 1_u32 << 3 # reader spinlock (protects @readers)
34+
RWAKER = 1_u32 << 4 # reader designated waker (a reader is being awoken)
35+
WLOCK = 1_u32 << 5 # writer lock
36+
WWAIT = 1_u32 << 6 # writer wait bit (at least one writer)
37+
WSPIN = 1_u32 << 7 # writer spinlock (protects @writers)
38+
WWAKER = 1_u32 << 8 # writer designated waker (a writer is being awoken)
39+
REF = 1_u32 << 9 # the reference counter increment
2040
MASK = ~(REF - 1) # mask for the reference counter
2141

2242
@m = Atomic(UInt32).new(0_u32)
2343
@closing : Fiber?
44+
@readers = PointerLinkedList(Fiber::PointerLinkedListNode).new
45+
@writers = PointerLinkedList(Fiber::PointerLinkedListNode).new
46+
47+
# Locks for read and increments the references by one for the duration of the
48+
# block. Raises if the fdlock is closed while trying to acquire the lock.
49+
def read(& : -> F) : F forall F
50+
m, success = @m.compare_and_set(0_u32, RLOCK + REF, :acquire, :relaxed)
51+
lock_slow(RLOCK, RWAIT, RSPIN, RWAKER, pointerof(@readers)) unless success
52+
53+
begin
54+
yield
55+
ensure
56+
m, success = @m.compare_and_set(RLOCK + REF, 0_u32, :release, :relaxed)
57+
m = unlock_slow(RLOCK, RWAIT, RSPIN, RWAKER, pointerof(@readers)) unless success
58+
handle_last_ref(m)
59+
end
60+
end
61+
62+
# Locks for write and increments the references by one for the duration of the
63+
# block. Raises if the fdlock is closed while trying to acquire the lock.
64+
def write(& : -> F) : F forall F
65+
m, success = @m.compare_and_set(0_u32, WLOCK + REF, :acquire, :relaxed)
66+
lock_slow(WLOCK, WWAIT, WSPIN, WWAKER, pointerof(@writers)) unless success
67+
68+
begin
69+
yield
70+
ensure
71+
m, success = @m.compare_and_set(WLOCK + REF, 0_u32, :release, :relaxed)
72+
m = unlock_slow(WLOCK, WWAIT, WSPIN, WWAKER, pointerof(@writers)) unless success
73+
handle_last_ref(m)
74+
end
75+
end
76+
77+
@[NoInline]
78+
private def lock_slow(xlock, xwait, xspin, xwaker, waiters)
79+
waiter = Fiber::PointerLinkedListNode.new(Fiber.current)
80+
attempts = 0
81+
clear = 0_u32
82+
83+
while true
84+
m = @m.get(:relaxed)
85+
86+
if (m & CLOSED) == CLOSED
87+
# abort
88+
raise IO::Error.new("Closed")
89+
elsif (m & xlock) == 0_u32
90+
# acquire the lock + increment ref
91+
m, success = @m.compare_and_set(m, ((m | xlock) + REF) & ~clear, :acquire, :relaxed)
92+
return if success
93+
elsif (m & xspin) == 0_u32
94+
# acquire spinlock + forward declare pending waiter
95+
m, success = @m.compare_and_set(m, (m | xspin | xwait) & ~clear, :acquire, :relaxed)
96+
if success
97+
waiters.value.push(pointerof(waiter))
98+
99+
# release spinlock before suspending the fiber
100+
@m.and(~xspin, :release)
101+
102+
Fiber.suspend
103+
104+
# the designated waker has woken: clear the flag
105+
clear |= xwaker
106+
end
107+
end
108+
109+
attempts = Thread.delay(attempts)
110+
end
111+
end
112+
113+
@[NoInline]
114+
private def unlock_slow(xlock, xwait, xspin, xwaker, waiters)
115+
attempts = 0
116+
117+
while true
118+
m = @m.get(:relaxed)
119+
120+
if (m & CLOSED) == CLOSED
121+
# decrement ref and abort
122+
m = @m.sub(REF, :relaxed)
123+
return m
124+
elsif (m & xwait) == 0_u32 || (m & xwaker) != 0_u32
125+
# no waiter, or there is a designated waker (no need to wake another
126+
# one): unlock & decrement ref
127+
m, success = @m.compare_and_set(m, (m & ~xlock) - REF, :release, :relaxed)
128+
return m if success
129+
elsif (m & xspin) == 0_u32
130+
# there is a waiter and no designated waker: acquire spinlock + declare
131+
# a designated waker + release lock & decrement ref early
132+
m, success = @m.compare_and_set(m, ((m | xspin | xwaker) & ~xlock) - REF, :acquire_release, :relaxed)
133+
if success
134+
waiter = waiters.value.shift?
135+
136+
# clear flags and release spinlock
137+
clear = xspin
138+
clear |= xwaker unless waiter # no designated waker
139+
clear |= xwait if waiters.value.empty? # no more waiters
140+
@m.and(~clear, :release)
141+
142+
waiter.value.enqueue if waiter
143+
144+
# return the m that decremented ref (for #handle_last_ref)
145+
return m
146+
end
147+
end
148+
149+
attempts = Thread.delay(attempts)
150+
end
151+
end
24152

25153
# Borrows a reference for the duration of the block. Raises if the fdlock is
26154
# closed while trying to borrow.
@@ -58,20 +186,25 @@ struct Crystal::FdLock
58186
end
59187
end
60188

61-
# Closes the fdlock. Blocks for as long as there are references.
189+
# Closes the fdlock. Wakes waiting readers and writers. Blocks for as long as
190+
# there are references.
62191
#
63192
# The *callback* block must cancel any external waiters (e.g. pending evloop
64193
# reads or writes).
65194
#
66-
# Returns true if the fdlock has been closed: no fiber can acquire a reference
67-
# anymore, the calling fiber fully owns the fd and can safely close it.
195+
# Returns true if the fdlock has been closed: no fiber can lock for read,
196+
# write or acquire a reference anymore, the calling fiber fully owns the fd
197+
# and can safely close it.
68198
#
69199
# Returns false if the fdlock has already been closed: the calling fiber
70200
# doesn't own the fd and musn't close it, as there might still be active
71201
# references and another fiber will close anyway.
72202
def try_close?(&callback : ->) : Bool
73203
attempts = 0
74204

205+
# close + increment ref + acquire both spinlocks so we own both @readers and
206+
# @writers; parallel attempts to acquire a spinlock will fail, notice that
207+
# the lock is closed, and abort
75208
while true
76209
m = @m.get(:relaxed)
77210

@@ -80,8 +213,7 @@ struct Crystal::FdLock
80213
return false
81214
end
82215

83-
# close + increment ref
84-
m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed)
216+
m, success = @m.compare_and_set(m, (m + REF) | CLOSED | RSPIN | WSPIN, :acquire, :relaxed)
85217
break if success
86218

87219
attempts = Thread.delay(attempts)
@@ -90,6 +222,11 @@ struct Crystal::FdLock
90222
# set the current fiber as the closing fiber (to be resumed by the last ref)
91223
@closing = Fiber.current
92224

225+
# resume waiters so they can fail (the fdlock is closed); this is safe
226+
# because we acquired the spinlocks above:
227+
@readers.consume_each(&.value.enqueue)
228+
@writers.consume_each(&.value.enqueue)
229+
93230
# decrement the last ref
94231
m = @m.sub(REF, :release)
95232

0 commit comments

Comments
 (0)