Skip to content

Commit 15cd6c0

Browse files
committed
Ensure we don't hang in Socket#accept due to spurious readiness.
It's possible, especially on dual stack, to have issues where a server may become readable, but by the time accept is called, the connection is gone. This can cause a deadlock between the semaphore and the accept call, which can hang indefinitely.
1 parent a6b9bd9 commit 15cd6c0

File tree

2 files changed

+53
-14
lines changed

2 files changed

+53
-14
lines changed

examples/limited/config.ru

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ run do |env|
2020
# There is no guarantee that there is a connection or that the connection has a token:
2121
token = limited_semaphore_token(request)
2222

23-
Console.info(self, "Sleeping 10 seconds", token: token)
24-
2523
if env["PATH_INFO"] == "/fast"
2624
if token
2725
# Keeping the connection alive here is problematic because if the next request is slow, it will "block the server" since we have relinquished the token already.
@@ -30,10 +28,10 @@ run do |env|
3028
end
3129

3230
# Simulated "fast / non-blocking" request:
33-
sleep(1)
31+
sleep(0.01)
3432
else
3533
# Simulated "slow / blocking" request:
36-
sleep(10)
34+
sleep(0.1)
3735
end
3836

3937
[200, {}, ["Hello World"]]

examples/limited/limited.rb

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# frozen_string_literal: true
12
module Limited
23
# Thread local storage for the semaphore (per-worker):
34
Thread.attr_accessor :limited_semaphore
@@ -12,21 +13,33 @@ def self.instance
1213
# Create a new semaphore with the given limit.
1314
def initialize(limit = 1)
1415
@queue = Thread::Queue.new
16+
17+
Console.info(self, "Initializing queue...", limit: limit)
1518
limit.times{release}
1619
end
1720

1821
# Release the semaphore.
1922
def release
23+
Console.info(self, "Releasing semaphore...")
2024
@queue.push(true)
2125
end
2226

2327
# Acquire the semaphore. May block until the semaphore is available.
2428
def acquire
29+
Console.info(self, "Acquiring semaphore...")
2530
@queue.pop
2631

2732
return Token.new(self)
2833
end
2934

35+
def try_acquire
36+
if @queue.empty?
37+
return nil
38+
else
39+
return acquire
40+
end
41+
end
42+
3043
# A token that can be used to release the semaphore once and once only.
3144
class Token
3245
def initialize(semaphore)
@@ -44,19 +57,44 @@ def release
4457

4558
# A wrapper implementation for the endpoint that limits the number of connections that can be accepted.
4659
class Wrapper < IO::Endpoint::Wrapper
47-
def socket_accept(server)
60+
# Wait for an inbound connection to be ready to be accepted.
61+
def wait_for_inbound_connection(server)
4862
semaphore = Semaphore.instance
4963

5064
# Wait until there is a connection ready to be accepted:
51-
server.wait_readable
52-
53-
# Acquire the semaphore:
54-
Console.info(self, "Acquiring semaphore...")
55-
token = semaphore.acquire
65+
while true
66+
server.wait_readable
5667

57-
# Accept the connection:
58-
socket, address = super
59-
Console.info(self, "Accepted connection from #{address.inspect}", socket: socket)
68+
# Acquire the semaphore:
69+
Console.debug(self, "Acquiring semaphore...")
70+
if token = semaphore.try_acquire
71+
Console.debug(self, "Acquired semaphore...")
72+
return token
73+
end
74+
end
75+
end
76+
77+
# Once the server is readable and we've acquired the token, we can accept the connection (if it's still there).
78+
def socket_accept_nonblock(server, token)
79+
return server.accept_nonblock
80+
rescue IO::WaitReadable, Errno::EINTR
81+
token.release
82+
return nil
83+
end
84+
85+
# Accept a connection from the server, limited by the per-worker (thread or process) semaphore.
86+
def socket_accept(server)
87+
while true
88+
if token = wait_for_inbound_connection(server)
89+
# In principle, there is a connection ready to be accepted:
90+
socket, address = socket_accept_nonblock(server, token)
91+
92+
if socket
93+
Console.debug(self, "Accepted connection from #{address.inspect}", socket: socket)
94+
break
95+
end
96+
end
97+
end
6098

6199
# Provide access to the token, so that the connection limit could be released prematurely if it is determined that the request will not overload the server:
62100
socket.define_singleton_method :token do
@@ -67,11 +105,14 @@ def socket_accept(server)
67105
socket.define_singleton_method :close do
68106
super()
69107
ensure
70-
Console.info(self, "Closing connection from #{address.inspect}", socket: socket)
108+
Console.debug(self, "Releasing connection from #{address.inspect}", socket: socket)
71109
token.release
72110
end
73111

112+
success = true
74113
return socket, address
114+
ensure
115+
token&.release unless success
75116
end
76117
end
77118
end

0 commit comments

Comments
 (0)