Skip to content

Commit 877b33e

Browse files
committed
Ensure we don't hang in Socket#accept due to spurious readiness.
It's possible, especially on dual stack, to have issues where a server may become readable, but by the time accept is called, the connection is gone. This can cause a deadlock between the semaphore and the accept call, which can hang indefinitely.
1 parent a6b9bd9 commit 877b33e

File tree

2 files changed

+54
-15
lines changed

2 files changed

+54
-15
lines changed

examples/limited/config.ru

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ run do |env|
2020
# There is no guarantee that there is a connection or that the connection has a token:
2121
token = limited_semaphore_token(request)
2222

23-
Console.info(self, "Sleeping 10 seconds", token: token)
24-
2523
if env["PATH_INFO"] == "/fast"
2624
if token
2725
# Keeping the connection alive here is problematic because if the next request is slow, it will "block the server" since we have relinquished the token already.
@@ -30,10 +28,10 @@ run do |env|
3028
end
3129

3230
# Simulated "fast / non-blocking" request:
33-
sleep(1)
31+
sleep(0.01)
3432
else
3533
# Simulated "slow / blocking" request:
36-
sleep(10)
34+
sleep(0.1)
3735
end
3836

3937
[200, {}, ["Hello World"]]

examples/limited/limited.rb

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# frozen_string_literal: true
2+
13
module Limited
24
# Thread local storage for the semaphore (per-worker):
35
Thread.attr_accessor :limited_semaphore
@@ -12,21 +14,34 @@ def self.instance
1214
# Create a new semaphore with the given limit.
1315
def initialize(limit = 1)
1416
@queue = Thread::Queue.new
17+
18+
Console.debug(self, "Initializing queue...", limit: limit)
1519
limit.times{release}
1620
end
1721

1822
# Release the semaphore.
1923
def release
24+
Console.debug(self, "Releasing semaphore...")
2025
@queue.push(true)
2126
end
2227

2328
# Acquire the semaphore. May block until the semaphore is available.
2429
def acquire
30+
Console.debug(self, "Acquiring semaphore...")
2531
@queue.pop
26-
32+
Console.debug(self, "Acquired semaphore...")
33+
2734
return Token.new(self)
2835
end
2936

37+
def try_acquire
38+
if @queue.empty?
39+
return nil
40+
else
41+
return acquire
42+
end
43+
end
44+
3045
# A token that can be used to release the semaphore once and once only.
3146
class Token
3247
def initialize(semaphore)
@@ -44,19 +59,42 @@ def release
4459

4560
# A wrapper implementation for the endpoint that limits the number of connections that can be accepted.
4661
class Wrapper < IO::Endpoint::Wrapper
47-
def socket_accept(server)
62+
# Wait for an inbound connection to be ready to be accepted.
63+
def wait_for_inbound_connection(server)
4864
semaphore = Semaphore.instance
4965

5066
# Wait until there is a connection ready to be accepted:
51-
server.wait_readable
67+
while true
68+
server.wait_readable
5269

53-
# Acquire the semaphore:
54-
Console.info(self, "Acquiring semaphore...")
55-
token = semaphore.acquire
56-
57-
# Accept the connection:
58-
socket, address = super
59-
Console.info(self, "Accepted connection from #{address.inspect}", socket: socket)
70+
# Acquire the semaphore:
71+
if token = semaphore.try_acquire
72+
return token
73+
end
74+
end
75+
end
76+
77+
# Once the server is readable and we've acquired the token, we can accept the connection (if it's still there).
78+
def socket_accept_nonblock(server, token)
79+
return server.accept_nonblock
80+
rescue IO::WaitReadable, Errno::EINTR
81+
token.release
82+
return nil
83+
end
84+
85+
# Accept a connection from the server, limited by the per-worker (thread or process) semaphore.
86+
def socket_accept(server)
87+
while true
88+
if token = wait_for_inbound_connection(server)
89+
# In principle, there is a connection ready to be accepted:
90+
socket, address = socket_accept_nonblock(server, token)
91+
92+
if socket
93+
Console.debug(self, "Accepted connection from #{address.inspect}", socket: socket)
94+
break
95+
end
96+
end
97+
end
6098

6199
# Provide access to the token, so that the connection limit could be released prematurely if it is determined that the request will not overload the server:
62100
socket.define_singleton_method :token do
@@ -67,11 +105,14 @@ def socket_accept(server)
67105
socket.define_singleton_method :close do
68106
super()
69107
ensure
70-
Console.info(self, "Closing connection from #{address.inspect}", socket: socket)
108+
Console.debug(self, "Releasing connection from #{address.inspect}", socket: socket)
71109
token.release
72110
end
73111

112+
success = true
74113
return socket, address
114+
ensure
115+
token&.release unless success
75116
end
76117
end
77118
end

0 commit comments

Comments
 (0)