Skip to content

Commit 5d2fb82

Browse files
committed
Serialize reads and writes in Crystal::FdLock
Serializes reads and writes so we can assume any IO object will only have at most one read op and one write op. The benefits are: 1. it avoids a race condition in the polling event loops: - Fiber 1 then Fiber 2 try to read from fd; - Since fd isn't ready so both are waiting; - When fd becomes ready then Fiber 1 is resumed; - Fiber 1 doesn't read everything and returns; - Fiber 2 won't be resumed because events are edge-triggered; 2. we can simplify the UNIX event loops (epoll, kqueue, io_uring) that are guaranteed to only have at most one reader and one writer at any time.
1 parent ee8c330 commit 5d2fb82

File tree

2 files changed

+300
-8
lines changed

2 files changed

+300
-8
lines changed

spec/std/crystal/fd_lock_spec.cr

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,90 @@ require "crystal/fd_lock"
33
require "wait_group"
44

55
describe Crystal::FdLock do
6+
describe "#read" do
7+
it "acquires read lock" do
8+
lock = Crystal::FdLock.new
9+
called = 0
10+
11+
lock.read { called += 1 }
12+
lock.read { called += 1 }
13+
14+
called.should eq(2)
15+
end
16+
17+
it "acquires exclusive lock" do
18+
lock = Crystal::FdLock.new
19+
increment = 0
20+
21+
WaitGroup.wait do |wg|
22+
10.times do
23+
wg.spawn do
24+
100_000.times do |i|
25+
lock.read do
26+
increment += 1
27+
# Fiber.yield if i % 100 == 1
28+
end
29+
end
30+
end
31+
end
32+
end
33+
34+
increment.should eq(1_000_000)
35+
end
36+
37+
it "raises when closed" do
38+
lock = Crystal::FdLock.new
39+
called = false
40+
41+
lock.try_close? { }
42+
expect_raises(IO::Error, "Closed") { lock.read { called = true; Fiber.yield } }
43+
44+
called.should eq(false)
45+
end
46+
end
47+
48+
describe "#write" do
49+
it "acquires write lock" do
50+
lock = Crystal::FdLock.new
51+
called = 0
52+
53+
lock.write { called += 1 }
54+
lock.write { called += 1 }
55+
56+
called.should eq(2)
57+
end
58+
59+
it "acquires exclusive lock" do
60+
lock = Crystal::FdLock.new
61+
increment = 0
62+
63+
WaitGroup.wait do |wg|
64+
10.times do
65+
wg.spawn do
66+
100_000.times do |i|
67+
lock.write do
68+
increment += 1
69+
# Fiber.yield if i % 100 == 1
70+
end
71+
end
72+
end
73+
end
74+
end
75+
76+
increment.should eq(1_000_000)
77+
end
78+
79+
it "raises when closed" do
80+
lock = Crystal::FdLock.new
81+
called = false
82+
83+
lock.try_close? { }
84+
expect_raises(IO::Error, "Closed") { lock.read { called = true } }
85+
86+
called.should eq(false)
87+
end
88+
end
89+
690
describe "#reference" do
791
it "acquires" do
892
lock = Crystal::FdLock.new
@@ -124,6 +208,68 @@ describe Crystal::FdLock do
124208
raise ex
125209
end
126210
end
211+
212+
it "resumes waiters" do
213+
lock = Crystal::FdLock.new
214+
215+
ready = WaitGroup.new(8)
216+
running = WaitGroup.new
217+
exceptions = Channel(Exception).new(8)
218+
219+
# acquire locks
220+
lock.read do
221+
lock.write do
222+
# spawn concurrent fibers
223+
4.times do |i|
224+
running.spawn do
225+
ready.done
226+
lock.read { }
227+
rescue ex
228+
exceptions.send(ex)
229+
end
230+
231+
running.spawn do
232+
ready.done
233+
lock.write { }
234+
rescue ex
235+
exceptions.send(ex)
236+
end
237+
end
238+
239+
# wait for all the concurrent fibers to be trying to lock
240+
ready.wait
241+
end
242+
end
243+
244+
# close, then wait for the fibers to be resumed (and fail)
245+
lock.try_close? { }.should eq(true)
246+
running.wait
247+
exceptions.close
248+
249+
# fibers should have failed (unlikely: one may succeed to lock)
250+
failed = 0
251+
while ex = exceptions.receive?
252+
failed += 1
253+
ex.should be_a(IO::Error)
254+
ex.message.should eq("Closed")
255+
end
256+
failed.should be > 0
257+
end
258+
end
259+
260+
it "locks read + write + shared reference" do
261+
lock = Crystal::FdLock.new
262+
called = 0
263+
264+
lock.read do
265+
lock.write do
266+
lock.reference do
267+
called += 1
268+
end
269+
end
270+
end
271+
272+
called.should eq(1)
127273
end
128274

129275
it "#reset" do

src/crystal/fd_lock.cr

Lines changed: 154 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,163 @@
11
# The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause,
22
# Copyright Google):
33
# https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go
4+
#
5+
# The internal details (spinlock, designated waker) of the locks are heavily
6+
# influenced by the nsync library (LICENSE: Apache-2.0, Copyright Google):
7+
# https://github.com/google/nsync
48

59
# :nodoc:
610
#
711
# Tracks active references over a system file descriptor (fd) and serializes
812
# reads and writes.
913
#
10-
# Every access to the fd that may affect its system state or system buffers must
11-
# acquire a shared lock.
14+
# Every read on the fd must lock read, every write must lock write and every
15+
# other operation (fcntl, setsockopt, ...) must acquire a shared lock. There can
16+
# be at most one reader + one writer + many references (other operations) at the
17+
# same time.
1218
#
1319
# The fdlock can be closed at any time, but the actual system close will wait
1420
# until there are no more references left. This avoids potential races when a
1521
# thread might try to read a fd that has been closed and has been reused by the
1622
# OS for example.
23+
#
24+
# Serializes reads and writes: only one attempt to read (or write) at a time can
25+
# go through, which avoids situations where 2 readers are waiting, then the
26+
# first reader is resumed but doesn't consume everything, then the second reader
27+
# will never be resumed. With this lock, a waiting reader will always be resumed.
28+
#
29+
# Lock concepts
30+
#
31+
# Spinlock: slow-path for lock/unlock will spin until it acquires the spinlock
32+
# bit to add/remove waiters; the CPU is relaxed between each attempt.
33+
#
34+
# Designated waker: set on unlock to report that a waiter has been scheduler and
35+
# there's no need to wake another one. It's unset when a waiter acquires or
36+
# fails to acquire and adds itself again as a waiter. This leads to an
37+
# impressive performance boost when the lock is contended.
1738
struct Crystal::FdLock
1839
CLOSED = 1_u32 << 0 # the fdlock has been closed
19-
REF = 1_u32 << 1 # the reference counter increment
40+
RLOCK = 1_u32 << 1 # reader lock
41+
RWAIT = 1_u32 << 2 # reader wait bit (at least one reader)
42+
RSPIN = 1_u32 << 3 # reader spinlock (protects @readers)
43+
RWAKER = 1_u32 << 4 # reader designated waker (a reader is being awoken)
44+
WLOCK = 1_u32 << 5 # writer lock
45+
WWAIT = 1_u32 << 6 # writer wait bit (at least one writer)
46+
WSPIN = 1_u32 << 7 # writer spinlock (protects @writers)
47+
WWAKER = 1_u32 << 8 # writer designated waker (a writer is being awoken)
48+
REF = 1_u32 << 9 # the reference counter increment
2049
MASK = ~(REF - 1) # mask for the reference counter
2150

2251
@m = Atomic(UInt32).new(0_u32)
2352
@closing : Fiber?
53+
@readers = PointerLinkedList(Fiber::PointerLinkedListNode).new
54+
@writers = PointerLinkedList(Fiber::PointerLinkedListNode).new
55+
56+
# Locks for read and increments the references by one for the duration of the
57+
# block. Raises if the fdlock is closed while trying to acquire the lock.
58+
def read(& : -> F) : F forall F
59+
m, success = @m.compare_and_set(0_u32, RLOCK + REF, :acquire, :relaxed)
60+
lock_slow(RLOCK, RWAIT, RSPIN, RWAKER, pointerof(@readers)) unless success
61+
62+
begin
63+
yield
64+
ensure
65+
m, success = @m.compare_and_set(RLOCK + REF, 0_u32, :release, :relaxed)
66+
m = unlock_slow(RLOCK, RWAIT, RSPIN, RWAKER, pointerof(@readers)) unless success
67+
handle_last_ref(m)
68+
end
69+
end
70+
71+
# Locks for write and increments the references by one for the duration of the
72+
# block. Raises if the fdlock is closed while trying to acquire the lock.
73+
def write(& : -> F) : F forall F
74+
m, success = @m.compare_and_set(0_u32, WLOCK + REF, :acquire, :relaxed)
75+
lock_slow(WLOCK, WWAIT, WSPIN, WWAKER, pointerof(@writers)) unless success
76+
77+
begin
78+
yield
79+
ensure
80+
m, success = @m.compare_and_set(WLOCK + REF, 0_u32, :release, :relaxed)
81+
m = unlock_slow(WLOCK, WWAIT, WSPIN, WWAKER, pointerof(@writers)) unless success
82+
handle_last_ref(m)
83+
end
84+
end
85+
86+
@[NoInline]
87+
private def lock_slow(xlock, xwait, xspin, xwaker, waiters)
88+
waiter = Fiber::PointerLinkedListNode.new(Fiber.current)
89+
attempts = 0
90+
clear = 0_u32
91+
92+
while true
93+
m = @m.get(:relaxed)
94+
95+
if (m & CLOSED) == CLOSED
96+
# abort
97+
raise IO::Error.new("Closed")
98+
elsif (m & xlock) == 0_u32
99+
# acquire the lock + increment ref
100+
m, success = @m.compare_and_set(m, ((m | xlock) + REF) & ~clear, :acquire, :relaxed)
101+
return if success
102+
elsif (m & xspin) == 0_u32
103+
# acquire spinlock + forward declare pending waiter
104+
m, success = @m.compare_and_set(m, (m | xspin | xwait) & ~clear, :acquire, :relaxed)
105+
if success
106+
waiters.value.push(pointerof(waiter))
107+
108+
# release spinlock before suspending the fiber
109+
@m.and(~xspin, :release)
110+
111+
Fiber.suspend
112+
113+
# the designated waker has woken: clear the flag
114+
clear |= xwaker
115+
end
116+
end
117+
118+
attempts = Thread.delay(attempts)
119+
end
120+
end
121+
122+
@[NoInline]
123+
private def unlock_slow(xlock, xwait, xspin, xwaker, waiters)
124+
attempts = 0
125+
126+
while true
127+
m = @m.get(:relaxed)
128+
129+
if (m & CLOSED) == CLOSED
130+
# decrement ref and abort
131+
m = @m.sub(REF, :relaxed)
132+
return m
133+
elsif (m & xwait) == 0_u32 || (m & xwaker) != 0_u32
134+
# no waiter, or there is a designated waker (no need to wake another
135+
# one): unlock & decrement ref
136+
m, success = @m.compare_and_set(m, (m & ~xlock) - REF, :release, :relaxed)
137+
return m if success
138+
elsif (m & xspin) == 0_u32
139+
# there is a waiter and no designated waker: acquire spinlock + declare
140+
# a designated waker + release lock & decrement ref early
141+
m, success = @m.compare_and_set(m, ((m | xspin | xwaker) & ~xlock) - REF, :acquire_release, :relaxed)
142+
if success
143+
waiter = waiters.value.shift?
144+
145+
# clear flags and release spinlock
146+
clear = xspin
147+
clear |= xwaker unless waiter # no designated waker
148+
clear |= xwait if waiters.value.empty? # no more waiters
149+
@m.and(~clear, :release)
150+
151+
waiter.value.enqueue if waiter
152+
153+
# return the m that decremented ref (for #handle_last_ref)
154+
return m
155+
end
156+
end
157+
158+
attempts = Thread.delay(attempts)
159+
end
160+
end
24161

25162
# Borrows a reference for the duration of the block. Raises if the fdlock is
26163
# closed while trying to borrow.
@@ -58,20 +195,25 @@ struct Crystal::FdLock
58195
end
59196
end
60197

61-
# Closes the fdlock. Blocks for as long as there are references.
198+
# Closes the fdlock. Wakes waiting readers and writers. Blocks for as long as
199+
# there are references.
62200
#
63201
# The *callback* block must cancel any external waiters (e.g. pending evloop
64202
# reads or writes).
65203
#
66-
# Returns true if the fdlock has been closed: no fiber can acquire a reference
67-
# anymore, the calling fiber fully owns the fd and can safely close it.
204+
# Returns true if the fdlock has been closed: no fiber can lock for read,
205+
# write or acquire a reference anymore, the calling fiber fully owns the fd
206+
# and can safely close it.
68207
#
69208
# Returns false if the fdlock has already been closed: the calling fiber
70209
# doesn't own the fd and musn't close it, as there might still be active
71210
# references and another fiber will close anyway.
72211
def try_close?(&callback : ->) : Bool
73212
attempts = 0
74213

214+
# close + increment ref + acquire both spinlocks so we own both @readers and
215+
# @writers; parallel attempts to acquire a spinlock will fail, notice that
216+
# the lock is closed, and abort
75217
while true
76218
m = @m.get(:relaxed)
77219

@@ -80,8 +222,7 @@ struct Crystal::FdLock
80222
return false
81223
end
82224

83-
# close + increment ref
84-
m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed)
225+
m, success = @m.compare_and_set(m, (m + REF) | CLOSED | RSPIN | WSPIN, :acquire, :relaxed)
85226
break if success
86227

87228
attempts = Thread.delay(attempts)
@@ -90,6 +231,11 @@ struct Crystal::FdLock
90231
# set the current fiber as the closing fiber (to be resumed by the last ref)
91232
@closing = Fiber.current
92233

234+
# resume waiters so they can fail (the fdlock is closed); this is safe
235+
# because we acquired the spinlocks above:
236+
@readers.consume_each(&.value.enqueue)
237+
@writers.consume_each(&.value.enqueue)
238+
93239
# decrement the last ref
94240
m = @m.sub(REF, :release)
95241

0 commit comments

Comments
 (0)