Skip to content

Commit 933d36f

Browse files
committed
Ensure we don't hang in Socket#accept due to spurious readiness.
It's possible, especially on dual stack, to have issues where a server may become readable, but by the time accept is called, the connection is gone. This can cause a deadlock between the semaphore and the accept call, which can hang indefinitely.
1 parent a6b9bd9 commit 933d36f

File tree

2 files changed

+54
-14
lines changed

2 files changed

+54
-14
lines changed

examples/limited/config.ru

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ run do |env|
2020
# There is no guarantee that there is a connection or that the connection has a token:
2121
token = limited_semaphore_token(request)
2222

23-
Console.info(self, "Sleeping 10 seconds", token: token)
24-
2523
if env["PATH_INFO"] == "/fast"
2624
if token
2725
# Keeping the connection alive here is problematic because if the next request is slow, it will "block the server" since we have relinquished the token already.
@@ -30,10 +28,10 @@ run do |env|
3028
end
3129

3230
# Simulated "fast / non-blocking" request:
33-
sleep(1)
31+
sleep(0.01)
3432
else
3533
# Simulated "slow / blocking" request:
36-
sleep(10)
34+
sleep(0.1)
3735
end
3836

3937
[200, {}, ["Hello World"]]

examples/limited/limited.rb

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# frozen_string_literal: true
2+
13
module Limited
24
# Thread local storage for the semaphore (per-worker):
35
Thread.attr_accessor :limited_semaphore
@@ -12,21 +14,33 @@ def self.instance
1214
# Create a new semaphore with the given limit.
1315
def initialize(limit = 1)
1416
@queue = Thread::Queue.new
17+
18+
Console.info(self, "Initializing queue...", limit: limit)
1519
limit.times{release}
1620
end
1721

1822
# Release the semaphore.
1923
def release
24+
Console.info(self, "Releasing semaphore...")
2025
@queue.push(true)
2126
end
2227

2328
# Acquire the semaphore. May block until the semaphore is available.
2429
def acquire
30+
Console.info(self, "Acquiring semaphore...")
2531
@queue.pop
2632

2733
return Token.new(self)
2834
end
2935

36+
def try_acquire
37+
if @queue.empty?
38+
return nil
39+
else
40+
return acquire
41+
end
42+
end
43+
3044
# A token that can be used to release the semaphore once and once only.
3145
class Token
3246
def initialize(semaphore)
@@ -44,19 +58,44 @@ def release
4458

4559
# A wrapper implementation for the endpoint that limits the number of connections that can be accepted.
4660
class Wrapper < IO::Endpoint::Wrapper
47-
def socket_accept(server)
61+
# Wait for an inbound connection to be ready to be accepted.
62+
def wait_for_inbound_connection(server)
4863
semaphore = Semaphore.instance
4964

5065
# Wait until there is a connection ready to be accepted:
51-
server.wait_readable
52-
53-
# Acquire the semaphore:
54-
Console.info(self, "Acquiring semaphore...")
55-
token = semaphore.acquire
66+
while true
67+
server.wait_readable
5668

57-
# Accept the connection:
58-
socket, address = super
59-
Console.info(self, "Accepted connection from #{address.inspect}", socket: socket)
69+
# Acquire the semaphore:
70+
Console.debug(self, "Acquiring semaphore...")
71+
if token = semaphore.try_acquire
72+
Console.debug(self, "Acquired semaphore...")
73+
return token
74+
end
75+
end
76+
end
77+
78+
# Once the server is readable and we've acquired the token, we can accept the connection (if it's still there).
79+
def socket_accept_nonblock(server, token)
80+
return server.accept_nonblock
81+
rescue IO::WaitReadable, Errno::EINTR
82+
token.release
83+
return nil
84+
end
85+
86+
# Accept a connection from the server, limited by the per-worker (thread or process) semaphore.
87+
def socket_accept(server)
88+
while true
89+
if token = wait_for_inbound_connection(server)
90+
# In principle, there is a connection ready to be accepted:
91+
socket, address = socket_accept_nonblock(server, token)
92+
93+
if socket
94+
Console.debug(self, "Accepted connection from #{address.inspect}", socket: socket)
95+
break
96+
end
97+
end
98+
end
6099

61100
# Provide access to the token, so that the connection limit could be released prematurely if it is determined that the request will not overload the server:
62101
socket.define_singleton_method :token do
@@ -67,11 +106,14 @@ def socket_accept(server)
67106
socket.define_singleton_method :close do
68107
super()
69108
ensure
70-
Console.info(self, "Closing connection from #{address.inspect}", socket: socket)
109+
Console.debug(self, "Releasing connection from #{address.inspect}", socket: socket)
71110
token.release
72111
end
73112

113+
success = true
74114
return socket, address
115+
ensure
116+
token&.release unless success
75117
end
76118
end
77119
end

0 commit comments

Comments
 (0)