Skip to content

Commit f9aca80

Browse files
committed
Add Crystal::FdLock to count system fd references
This patch implements a reference counted lock to protect IO objects that depend on a reusable system fd (IO::FileDescriptor, File and Socket) to protect them against thread safety issues around close: - Thread 1 wants to read from fd 123; - The OS preempts Thread 1; - Thread 2 closes fd 123; - Thread 2 opens something else and the OS reuses fd 123; - The OS resumes Thread 1; - Thread 1 reads from the reused fd 123!!! The issue arises for any operation that would mutate the fd: write, fchown, ftruncate, setsockopt, ... as they risk affecting a reused fd instead of the expected one. NOTE: The patch provides two implementations of Crystal::FdLock, one is thread safe for MT, while the other is thread unsafe because the interpreter segfaults when using the thread safe implementation, despite the interpreted code being single threaded (it has something to do with the refcount and resuming the closing fiber).
1 parent b8a57ee commit f9aca80

File tree

4 files changed

+294
-0
lines changed

4 files changed

+294
-0
lines changed

spec/std/crystal/fd_lock_spec.cr

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
require "spec"
2+
require "../../support/interpreted.cr"
3+
require "crystal/fd_lock"
4+
require "wait_group"
5+
6+
describe Crystal::FdLock do
7+
describe "#reference" do
8+
it "acquires" do
9+
lock = Crystal::FdLock.new
10+
called = 0
11+
12+
lock.reference { called += 1 }
13+
lock.reference { called += 1 }
14+
15+
called.should eq(2)
16+
end
17+
18+
it "allows reentrancy (side effect)" do
19+
lock = Crystal::FdLock.new
20+
called = 0
21+
22+
lock.reference { called += 1 }
23+
lock.reference do
24+
lock.reference { called += 1 }
25+
end
26+
27+
called.should eq(2)
28+
end
29+
30+
it "acquires shared reference" do
31+
lock = Crystal::FdLock.new
32+
33+
ready = WaitGroup.new(1)
34+
release = Channel(String).new
35+
36+
spawn do
37+
lock.reference do
38+
ready.done
39+
40+
select
41+
when release.send("ok")
42+
when timeout(1.second)
43+
release.send("timeout")
44+
end
45+
end
46+
end
47+
48+
ready.wait
49+
lock.reference { }
50+
51+
release.receive.should eq("ok")
52+
end
53+
54+
it "raises when closed" do
55+
lock = Crystal::FdLock.new
56+
lock.try_close? { }
57+
58+
called = false
59+
expect_raises(IO::Error, "Closed") do
60+
lock.reference { called = true }
61+
end
62+
63+
called.should be_false
64+
end
65+
end
66+
67+
describe "#try_close?" do
68+
it "closes" do
69+
lock = Crystal::FdLock.new
70+
lock.closed?.should be_false
71+
72+
called = false
73+
lock.try_close? { called = true }.should be_true
74+
lock.closed?.should be_true
75+
called.should be_true
76+
end
77+
78+
it "closes once" do
79+
lock = Crystal::FdLock.new
80+
81+
called = 0
82+
83+
WaitGroup.wait do |wg|
84+
10.times do
85+
wg.spawn do
86+
lock.try_close? { called += 1 }
87+
lock.try_close? { called += 1 }
88+
end
89+
end
90+
end
91+
92+
called.should eq(1)
93+
end
94+
95+
# FIXME: the interpreter segfaults while running this spec (NULL pointer)
96+
pending_interpreted "waits for all references to return" do
97+
lock = Crystal::FdLock.new
98+
99+
ready = WaitGroup.new(10)
100+
exceptions = Channel(Exception).new(10)
101+
102+
WaitGroup.wait do |wg|
103+
10.times do
104+
wg.spawn do
105+
begin
106+
lock.reference do
107+
ready.done
108+
Fiber.yield
109+
end
110+
rescue ex
111+
exceptions.send(ex)
112+
end
113+
end
114+
end
115+
116+
ready.wait
117+
118+
called = false
119+
lock.try_close? { called = true }.should be_true
120+
lock.closed?.should be_true
121+
called.should be_true
122+
end
123+
exceptions.close
124+
125+
if ex = exceptions.receive?
126+
raise ex
127+
end
128+
end
129+
end
130+
131+
it "#reset" do
132+
lock = Crystal::FdLock.new
133+
lock.try_close? { }
134+
lock.reset
135+
lock.try_close? { }.should eq(true)
136+
end
137+
end

src/crystal/fd_lock.cr

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{% if flag?(:preview_mt) %}
2+
require "./fd_lock_mt"
3+
{% else %}
4+
require "./fd_lock_no_mt"
5+
{% end %}

src/crystal/fd_lock_mt.cr

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause,
2+
# Copyright Google):
3+
# https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go
4+
5+
# :nodoc:
6+
#
7+
# Tracks active references over a system file descriptor (fd) and serializes
8+
# reads and writes.
9+
#
10+
# Every access to the fd that may affect its system state or system buffers must
11+
# acquire a shared lock.
12+
#
13+
# The fdlock can be closed at any time, but the actual system close will wait
14+
# until there are no more references left. This avoids potential races when a
15+
# thread might try to read a fd that has been closed and has been reused by the
16+
# OS for example.
17+
#
18+
# FIXME: the interpreter segfaults when interpreted code uses this type; for now
19+
# it uses the thread unsafe alternative (fd_lock_no_mt).
20+
struct Crystal::FdLock
21+
CLOSED = 1_u32 << 0 # the fdlock has been closed
22+
REF = 1_u32 << 1 # the reference counter increment
23+
MASK = ~(REF - 1) # mask for the reference counter
24+
25+
@m = Atomic(UInt32).new(0_u32)
26+
@closing : Fiber?
27+
28+
# Borrows a reference for the duration of the block. Raises if the fdlock is
29+
# closed while trying to borrow.
30+
def reference(& : -> F) : F forall F
31+
m, success = @m.compare_and_set(0_u32, REF, :acquire, :relaxed)
32+
increment_slow(m) unless success
33+
34+
begin
35+
yield
36+
ensure
37+
m = @m.sub(REF, :release)
38+
handle_last_ref(m)
39+
end
40+
end
41+
42+
private def increment_slow(m)
43+
while true
44+
if (m & CLOSED) == CLOSED
45+
raise IO::Error.new("Closed")
46+
end
47+
m, success = @m.compare_and_set(m, m + REF, :acquire, :relaxed)
48+
break if success
49+
end
50+
end
51+
52+
private def handle_last_ref(m)
53+
return unless (m & CLOSED) == CLOSED # is closed?
54+
return unless (m & MASK) == REF # was the last ref?
55+
56+
# the last ref after close is responsible to resume the closing fiber
57+
@closing.not_nil!("BUG: expected a closing fiber to resume.").enqueue
58+
end
59+
60+
# Closes the fdlock. Blocks for as long as there are references.
61+
#
62+
# The *callback* block must cancel any external waiters (e.g. pending evloop
63+
# reads or writes).
64+
#
65+
# Returns true if the fdlock has been closed: no fiber can acquire a reference
66+
# anymore, the calling fiber fully owns the fd and can safely close it.
67+
#
68+
# Returns false if the fdlock has already been closed: the calling fiber
69+
# doesn't own the fd and musn't close it, as there might still be active
70+
# references and another fiber will close anyway.
71+
def try_close?(&callback : ->) : Bool
72+
attempts = 0
73+
74+
while true
75+
m = @m.get(:relaxed)
76+
77+
if (m & CLOSED) == CLOSED
78+
# already closed: abort
79+
return false
80+
end
81+
82+
# close + increment ref
83+
m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed)
84+
break if success
85+
86+
attempts = Thread.delay(attempts)
87+
end
88+
89+
# set the current fiber as the closing fiber (to be resumed by the last ref)
90+
@closing = Fiber.current
91+
92+
# decrement the last ref
93+
m = @m.sub(REF, :release)
94+
95+
begin
96+
yield
97+
ensure
98+
# wait for the last ref... unless we're the last ref!
99+
Fiber.suspend unless (m & MASK) == REF
100+
end
101+
102+
@closing = nil
103+
true
104+
end
105+
106+
# Resets the fdlock back to its pristine state so it can be used again.
107+
# Assumes the caller owns the fdlock. This is required by
108+
# `TCPSocket#initialize`.
109+
def reset : Nil
110+
@m.lazy_set(0_u32)
111+
@closing = nil
112+
end
113+
114+
def closed? : Bool
115+
(@m.get(:relaxed) & CLOSED) == CLOSED
116+
end
117+
end

src/crystal/fd_lock_no_mt.cr

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# :nodoc:
2+
#
3+
# Simpler, but thread unsafe, alternative to Crystal::FdLock that only
4+
# serializes reads and writes and otherwise doesn't count references or waits
5+
# for references before closing. This is mostly needed for the interpreter that
6+
# happens to segfault with the thread safe alternative (see fd_lock_mt).
7+
struct Crystal::FdLock
8+
CLOSED = 1_u8 << 0
9+
10+
@m = 0_u8
11+
12+
def reference(&)
13+
raise IO::Error.new("Closed") if closed?
14+
yield
15+
end
16+
17+
def reset : Nil
18+
@m = 0_u8
19+
end
20+
21+
def closed?
22+
(@m & CLOSED) == CLOSED
23+
end
24+
25+
def try_close?(&)
26+
if closed?
27+
false
28+
else
29+
@m |= CLOSED
30+
31+
yield
32+
true
33+
end
34+
end
35+
end

0 commit comments

Comments
 (0)