Skip to content

Commit c5742d0

Browse files
committed
Add Crystal::FdLock to count system fd references
This patch implements a reference counted lock to protect IO objects that depend on a reusable system fd (IO::FileDescriptor, File and Socket) to protect them against thread safety issues around close: - Thread 1 wants to read from fd 123; - The OS preempts Thread 1; - Thread 2 closes fd 123; - Thread 2 opens something else and the OS reuses fd 123; - The OS resumes Thread 1; - Thread 1 reads from the reused fd 123!!! The issue arises for any operation that would mutate the fd: write, fchown, ftruncate, setsockopt, ... as they risk affecting a reused fd instead of the expected one.
1 parent a15d355 commit c5742d0

File tree

4 files changed

+294
-0
lines changed

4 files changed

+294
-0
lines changed

spec/std/crystal/fd_lock_spec.cr

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
require "spec"
2+
require "../../support/interpreted.cr"
3+
require "crystal/fd_lock"
4+
require "wait_group"
5+
6+
describe Crystal::FdLock do
7+
describe "#reference" do
8+
it "acquires" do
9+
lock = Crystal::FdLock.new
10+
called = 0
11+
12+
lock.reference { called += 1 }
13+
lock.reference { called += 1 }
14+
15+
called.should eq(2)
16+
end
17+
18+
it "allows reentrancy (side effect)" do
19+
lock = Crystal::FdLock.new
20+
called = 0
21+
22+
lock.reference { called += 1 }
23+
lock.reference do
24+
lock.reference { called += 1 }
25+
end
26+
27+
called.should eq(2)
28+
end
29+
30+
it "acquires shared reference" do
31+
lock = Crystal::FdLock.new
32+
33+
ready = WaitGroup.new(1)
34+
release = Channel(String).new
35+
36+
spawn do
37+
lock.reference do
38+
ready.done
39+
40+
select
41+
when release.send("ok")
42+
when timeout(1.second)
43+
release.send("timeout")
44+
end
45+
end
46+
end
47+
48+
ready.wait
49+
lock.reference { }
50+
51+
release.receive.should eq("ok")
52+
end
53+
54+
it "raises when closed" do
55+
lock = Crystal::FdLock.new
56+
lock.try_close? { }
57+
58+
called = false
59+
expect_raises(IO::Error, "Closed") do
60+
lock.reference { called = true }
61+
end
62+
63+
called.should be_false
64+
end
65+
end
66+
67+
describe "#try_close?" do
68+
it "closes" do
69+
lock = Crystal::FdLock.new
70+
lock.closed?.should be_false
71+
72+
called = false
73+
lock.try_close? { called = true }.should be_true
74+
lock.closed?.should be_true
75+
called.should be_true
76+
end
77+
78+
it "closes once" do
79+
lock = Crystal::FdLock.new
80+
81+
called = 0
82+
83+
WaitGroup.wait do |wg|
84+
10.times do
85+
wg.spawn do
86+
lock.try_close? { called += 1 }
87+
lock.try_close? { called += 1 }
88+
end
89+
end
90+
end
91+
92+
called.should eq(1)
93+
end
94+
95+
# FIXME: the interpreter segfaults while running this spec (NULL pointer)
96+
pending_interpreted "waits for all references to return" do
97+
lock = Crystal::FdLock.new
98+
99+
ready = WaitGroup.new(10)
100+
exceptions = Channel(Exception).new(10)
101+
102+
WaitGroup.wait do |wg|
103+
10.times do
104+
wg.spawn do
105+
begin
106+
lock.reference do
107+
ready.done
108+
Fiber.yield
109+
end
110+
rescue ex
111+
exceptions.send(ex)
112+
end
113+
end
114+
end
115+
116+
ready.wait
117+
118+
called = false
119+
lock.try_close? { called = true }.should be_true
120+
lock.closed?.should be_true
121+
called.should be_true
122+
end
123+
exceptions.close
124+
125+
if ex = exceptions.receive?
126+
raise ex
127+
end
128+
end
129+
end
130+
131+
it "#reset" do
132+
lock = Crystal::FdLock.new
133+
lock.try_close? { }
134+
lock.reset
135+
lock.try_close? { }.should eq(true)
136+
end
137+
end

src/crystal/fd_lock.cr

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{% if flag?(:preview_mt) %}
2+
require "./fd_lock_mt"
3+
{% else %}
4+
require "./fd_lock_no_mt"
5+
{% end %}

src/crystal/fd_lock_mt.cr

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause,
2+
# Copyright Google):
3+
# https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go
4+
5+
# :nodoc:
6+
#
7+
# Tracks active references over a system file descriptor (fd) and serializes
8+
# reads and writes.
9+
#
10+
# Every access to the fd that may affect its system state or system buffers must
11+
# acquire a shared lock.
12+
#
13+
# The fdlock can be closed at any time, but the actual system close will wait
14+
# until there are no more references left. This avoids potential races when a
15+
# thread might try to read a fd that has been closed and has been reused by the
16+
# OS for example.
17+
#
18+
# FIXME: the interpreter segfaults when interpreted code uses this type; for now
19+
# it uses the thread unsafe alternative (fd_lock_no_mt).
20+
struct Crystal::FdLock
21+
CLOSED = 1_u32 << 0 # the fdlock has been closed
22+
REF = 1_u32 << 1 # the reference counter increment
23+
MASK = ~(REF - 1) # mask for the reference counter
24+
25+
@m = Atomic(UInt32).new(0_u32)
26+
@closing : Fiber?
27+
28+
# Borrows a reference for the duration of the block. Raises if the fdlock is
29+
# closed while trying to borrow.
30+
def reference(& : -> F) : F forall F
31+
m, success = @m.compare_and_set(0_u32, REF, :acquire, :relaxed)
32+
increment_slow(m) unless success
33+
34+
begin
35+
yield
36+
ensure
37+
m = @m.sub(REF, :release)
38+
handle_last_ref(m)
39+
end
40+
end
41+
42+
private def increment_slow(m)
43+
while true
44+
if (m & CLOSED) == CLOSED
45+
raise IO::Error.new("Closed")
46+
end
47+
m, success = @m.compare_and_set(m, m + REF, :acquire, :relaxed)
48+
break if success
49+
end
50+
end
51+
52+
private def handle_last_ref(m)
53+
return unless (m & CLOSED) == CLOSED # is closed?
54+
return unless (m & MASK) == REF # was the last ref?
55+
56+
# the last ref after close is responsible to resume the closing fiber
57+
@closing.not_nil!("BUG: expected a closing fiber to resume.").enqueue
58+
end
59+
60+
# Closes the fdlock. Blocks for as long as there are references.
61+
#
62+
# The *callback* block must cancel any external waiters (e.g. pending evloop
63+
# reads or writes).
64+
#
65+
# Returns true if the fdlock has been closed: no fiber can acquire a reference
66+
# anymore, the calling fiber fully owns the fd and can safely close it.
67+
#
68+
# Returns false if the fdlock has already been closed: the calling fiber
69+
# doesn't own the fd and musn't close it, as there might still be active
70+
# references and another fiber will close anyway.
71+
def try_close?(&callback : ->) : Bool
72+
attempts = 0
73+
74+
while true
75+
m = @m.get(:relaxed)
76+
77+
if (m & CLOSED) == CLOSED
78+
# already closed: abort
79+
return false
80+
end
81+
82+
# close + increment ref
83+
m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed)
84+
break if success
85+
86+
attempts = Thread.delay(attempts)
87+
end
88+
89+
# set the current fiber as the closing fiber (to be resumed by the last ref)
90+
@closing = Fiber.current
91+
92+
# decrement the last ref
93+
m = @m.sub(REF, :release)
94+
95+
begin
96+
yield
97+
ensure
98+
# wait for the last ref... unless we're the last ref!
99+
Fiber.suspend unless (m & MASK) == REF
100+
end
101+
102+
@closing = nil
103+
true
104+
end
105+
106+
# Resets the fdlock back to its pristine state so it can be used again.
107+
# Assumes the caller owns the fdlock. This is required by
108+
# `TCPSocket#initialize`.
109+
def reset : Nil
110+
@m.lazy_set(0_u32)
111+
@closing = nil
112+
end
113+
114+
def closed? : Bool
115+
(@m.get(:relaxed) & CLOSED) == CLOSED
116+
end
117+
end

src/crystal/fd_lock_no_mt.cr

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# :nodoc:
2+
#
3+
# Simpler, but thread unsafe, alternative to Crystal::FdLock that only
4+
# serializes reads and writes and otherwise doesn't count references or waits
5+
# for references before closing. This is mostly needed for the interpreter that
6+
# happens to segfault with the thread safe alternative (see fd_lock_mt).
7+
struct Crystal::FdLock
8+
CLOSED = 1_u8 << 0
9+
10+
@m = 0_u8
11+
12+
def reference(&)
13+
raise IO::Error.new("Closed") if closed?
14+
yield
15+
end
16+
17+
def reset : Nil
18+
@m = 0_u8
19+
end
20+
21+
def closed?
22+
(@m & CLOSED) == CLOSED
23+
end
24+
25+
def try_close?(&)
26+
if closed?
27+
false
28+
else
29+
@m |= CLOSED
30+
31+
yield
32+
true
33+
end
34+
end
35+
end

0 commit comments

Comments
 (0)