Skip to content

Commit 6f25837

Browse files
committed
Add Crystal::FdLock
Implements a reference counted lock to eventually protect IO objects that depend on a reusable system file descriptor (Socket, File and IO::FileDescriptor) against thread safety issues around close: - Thread 1 wants to read from fd 123; - The OS preempts Thread 1; - Thread 2 closes fd 123; - Thread 2 opens something else and the OS reuses fd 123; - The OS resumes Thread 1; - Thread 1 reads from the *reused* fd 123!!! The same issue arises with any mutable operation on the fd: write, fchown, ftruncate, setsockopt, ... Serializes reads and writes so we can assume any IO object will only have at most one read op and one write op. This will simplify resuming pending waiters when we close an IO (especially cross event loop instances) and avoids a race condition in the polling event loops: - Fiber 1 then Fiber 2 try to read from fd; - Since fd isn't ready so both are waiting; - When fd becomes ready then Fiber 1 is resumed; - Fiber 1 doesn't read everything and returns; - Fiber 2 won't be resumed because events are edge-triggered; NOTE: there are two implementations, a thread safe and a simpler thread unsafe one, because the interpreter segfaults when using the thread safe one, despite the interpreted code being single threaded (it must have something to do with the refcount or resuming the closing fiber).
1 parent 4728534 commit 6f25837

File tree

4 files changed

+617
-0
lines changed

4 files changed

+617
-0
lines changed

spec/std/crystal/fd_lock_spec.cr

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
require "spec"
2+
require "../../support/interpreted.cr"
3+
require "crystal/fd_lock"
4+
require "wait_group"
5+
6+
describe Crystal::FdLock do
7+
describe "#read" do
8+
it "acquires read lock" do
9+
lock = Crystal::FdLock.new
10+
called = 0
11+
12+
lock.read { called += 1 }
13+
lock.read { called += 1 }
14+
15+
called.should eq(2)
16+
end
17+
18+
it "acquires exclusive lock" do
19+
lock = Crystal::FdLock.new
20+
increment = 0
21+
22+
WaitGroup.wait do |wg|
23+
10.times do
24+
wg.spawn do
25+
100_000.times do |i|
26+
lock.read do
27+
increment += 1
28+
# Fiber.yield if i % 100 == 1
29+
end
30+
end
31+
end
32+
end
33+
end
34+
35+
increment.should eq(1_000_000)
36+
end
37+
38+
it "raises when closed" do
39+
lock = Crystal::FdLock.new
40+
called = false
41+
42+
lock.try_close? { }
43+
expect_raises(IO::Error, "Closed") { lock.read { called = true; Fiber.yield } }
44+
45+
called.should eq(false)
46+
end
47+
end
48+
49+
describe "#write" do
50+
it "acquires write lock" do
51+
lock = Crystal::FdLock.new
52+
called = 0
53+
54+
lock.write { called += 1 }
55+
lock.write { called += 1 }
56+
57+
called.should eq(2)
58+
end
59+
60+
it "acquires exclusive lock" do
61+
lock = Crystal::FdLock.new
62+
increment = 0
63+
64+
WaitGroup.wait do |wg|
65+
10.times do
66+
wg.spawn do
67+
100_000.times do |i|
68+
lock.write do
69+
increment += 1
70+
# Fiber.yield if i % 100 == 1
71+
end
72+
end
73+
end
74+
end
75+
end
76+
77+
increment.should eq(1_000_000)
78+
end
79+
80+
it "raises when closed" do
81+
lock = Crystal::FdLock.new
82+
called = false
83+
84+
lock.try_close? { }
85+
expect_raises(IO::Error, "Closed") { lock.read { called = true } }
86+
87+
called.should eq(false)
88+
end
89+
end
90+
91+
describe "#reference" do
92+
it "acquires" do
93+
lock = Crystal::FdLock.new
94+
called = 0
95+
96+
lock.reference { called += 1 }
97+
lock.reference { called += 1 }
98+
99+
called.should eq(2)
100+
end
101+
102+
it "allows reentrancy (side effect)" do
103+
lock = Crystal::FdLock.new
104+
called = 0
105+
106+
lock.reference { called += 1 }
107+
lock.reference do
108+
lock.reference { called += 1 }
109+
end
110+
111+
called.should eq(2)
112+
end
113+
114+
it "acquires shared reference" do
115+
lock = Crystal::FdLock.new
116+
117+
ready = WaitGroup.new(1)
118+
release = Channel(String).new
119+
120+
spawn do
121+
lock.reference do
122+
ready.done
123+
124+
select
125+
when release.send("ok")
126+
when timeout(1.second)
127+
release.send("timeout")
128+
end
129+
end
130+
end
131+
132+
ready.wait
133+
lock.reference { }
134+
135+
release.receive.should eq("ok")
136+
end
137+
138+
it "raises when closed" do
139+
lock = Crystal::FdLock.new
140+
lock.try_close? { }
141+
142+
called = false
143+
expect_raises(IO::Error, "Closed") do
144+
lock.reference { called = true }
145+
end
146+
147+
called.should be_false
148+
end
149+
end
150+
151+
describe "#try_close?" do
152+
it "closes" do
153+
lock = Crystal::FdLock.new
154+
lock.closed?.should be_false
155+
156+
called = false
157+
lock.try_close? { called = true }.should be_true
158+
lock.closed?.should be_true
159+
called.should be_true
160+
end
161+
162+
it "closes once" do
163+
lock = Crystal::FdLock.new
164+
165+
called = 0
166+
167+
WaitGroup.wait do |wg|
168+
10.times do
169+
wg.spawn do
170+
lock.try_close? { called += 1 }
171+
lock.try_close? { called += 1 }
172+
end
173+
end
174+
end
175+
176+
called.should eq(1)
177+
end
178+
179+
# FIXME: the interpreter segfaults while running this spec (NULL pointer)
180+
pending_interpreted "waits for all references to return" do
181+
lock = Crystal::FdLock.new
182+
183+
ready = WaitGroup.new(10)
184+
exceptions = Channel(Exception).new(10)
185+
186+
WaitGroup.wait do |wg|
187+
10.times do
188+
wg.spawn do
189+
begin
190+
lock.reference do
191+
ready.done
192+
Fiber.yield
193+
end
194+
rescue ex
195+
exceptions.send(ex)
196+
end
197+
end
198+
end
199+
200+
ready.wait
201+
202+
called = false
203+
lock.try_close? { called = true }.should be_true
204+
lock.closed?.should be_true
205+
called.should be_true
206+
end
207+
exceptions.close
208+
209+
if ex = exceptions.receive?
210+
raise ex
211+
end
212+
end
213+
214+
it "resumes waiters" do
215+
lock = Crystal::FdLock.new
216+
217+
ready = WaitGroup.new(8)
218+
running = WaitGroup.new
219+
exceptions = Channel(Exception).new(8)
220+
221+
# acquire locks
222+
lock.read do
223+
lock.write do
224+
# spawn concurrent fibers
225+
4.times do |i|
226+
running.spawn do
227+
ready.done
228+
lock.read { }
229+
rescue ex
230+
exceptions.send(ex)
231+
end
232+
233+
running.spawn do
234+
ready.done
235+
lock.write { }
236+
rescue ex
237+
exceptions.send(ex)
238+
end
239+
end
240+
241+
# wait for all the concurrent fibers to be trying to lock
242+
ready.wait
243+
end
244+
end
245+
246+
# close, then wait for the fibers to be resumed (and fail)
247+
lock.try_close? { }.should eq(true)
248+
running.wait
249+
exceptions.close
250+
251+
# fibers should have failed (unlikely: one may succeed to lock)
252+
failed = 0
253+
while ex = exceptions.receive?
254+
failed += 1
255+
ex.should be_a(IO::Error)
256+
ex.message.should eq("Closed")
257+
end
258+
failed.should be > 0
259+
end
260+
end
261+
262+
it "locks read + write + shared reference" do
263+
lock = Crystal::FdLock.new
264+
called = 0
265+
266+
lock.read do
267+
lock.write do
268+
lock.reference do
269+
called += 1
270+
end
271+
end
272+
end
273+
274+
called.should eq(1)
275+
end
276+
277+
it "#reset" do
278+
lock = Crystal::FdLock.new
279+
lock.try_close? { }
280+
lock.reset
281+
lock.try_close? { }.should eq(true)
282+
end
283+
end

src/crystal/fd_lock.cr

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{% if flag?(:preview_mt) %}
2+
require "./fd_lock_mt"
3+
{% else %}
4+
require "./fd_lock_no_mt"
5+
{% end %}

0 commit comments

Comments
 (0)