Skip to content

Commit fd6e376

Browse files
Add happy eyeballs.
1 parent 5233372 commit fd6e376

File tree

2 files changed

+238
-18
lines changed

2 files changed

+238
-18
lines changed

lib/io/endpoint/host_endpoint.rb

Lines changed: 138 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ def inspect
2323
"\#<#{self.class} name=#{nodename.inspect} service=#{service.inspect} family=#{family.inspect} type=#{socktype.inspect} protocol=#{protocol.inspect} flags=#{flags.inspect}>"
2424
end
2525

26-
2726
attr :specification
2827

2928
def hostname
@@ -34,30 +33,126 @@ def service
3433
@specification[1]
3534
end
3635

37-
# Try to connect to the given host by connecting to each address in sequence until a connection is made.
38-
# @yield [Socket] the socket which is being connected, may be invoked more than once
39-
# @return [Socket] the connected socket
40-
# @raise if no connection could complete successfully
41-
def connect(wrapper = self.wrapper, &block)
42-
last_error = nil
36+
# Try to connect ot the given host using the given wrapper.
37+
#
38+
# The implementation uses Happy Eyeballs (RFC 8305) algorithm if it makes sense to do so. This attempts IPv6 and IPv4 connections in parallel, preferring IPv6 but starting IPv4 attempts after a short delay to improve connection speed.
39+
#
40+
# @parameter happy_eyeballs_delay [Float] Delay in seconds before starting IPv4 connections (defaults to @options[:happy_eyeballs_delay] or 0.05)
41+
# @yields {|socket| ...} the socket which is being connected, may be invoked more than once.
42+
# @returns [Socket] the connected socket.
43+
# @raises if no connection could complete successfully.
44+
def connect(wrapper = self.wrapper, happy_eyeballs_delay: nil, &block)
45+
happy_eyeballs_delay ||= @options.fetch(:happy_eyeballs_delay, 0.05)
4346

44-
Addrinfo.foreach(*@specification) do |address|
47+
# Collect all addresses first:
48+
addresses = Addrinfo.foreach(*@specification).to_a
49+
50+
# If only one address, use simple sequential connection:
51+
return connect_sequential(addresses, wrapper, &block) if addresses.size <= 1
52+
53+
# Separate IPv6 and fallback addresses:
54+
ipv6_addresses, fallback_addresses = addresses.partition(&:ipv6?)
55+
56+
# If we only have one protocol family, use sequential connection:
57+
if ipv6_addresses.empty? || fallback_addresses.empty?
58+
return connect_sequential(addresses, wrapper, &block)
59+
end
60+
61+
# Happy Eyeballs: try IPv6 immediately, fallback addresses after delay:
62+
connected_socket = nil
63+
connection_errors = []
64+
pending_count = 0
65+
ipv4_started = false
66+
mutex = Mutex.new
67+
condition_variable = ConditionVariable.new
68+
69+
# Helper to attempt a connection:
70+
attempt_connection = proc do |address|
71+
should_continue = mutex.synchronize do
72+
if connected_socket
73+
false
74+
else
75+
pending_count += 1
76+
true
77+
end
78+
end
79+
next unless should_continue
80+
4581
begin
4682
socket = wrapper.connect(address, **@options)
47-
rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::EAGAIN => last_error
48-
# Try again unless if possible, otherwise raise...
49-
else
50-
return socket unless block_given?
51-
52-
begin
53-
return yield(socket)
54-
ensure
55-
socket.close
83+
mutex.synchronize do
84+
if connected_socket
85+
# Another connection succeeded first, close this one:
86+
socket.close
87+
else
88+
connected_socket = socket
89+
condition_variable.broadcast
90+
end
91+
end
92+
rescue => error
93+
mutex.synchronize do
94+
connection_errors << error
95+
pending_count -= 1
96+
condition_variable.broadcast
5697
end
5798
end
5899
end
59100

60-
raise last_error
101+
# Start IPv6 connections immediately:
102+
ipv6_addresses.each do |address|
103+
wrapper.schedule do
104+
attempt_connection.call(address)
105+
end
106+
end
107+
108+
# Start fallback connections after delay:
109+
fallback_delayed = wrapper.schedule do
110+
sleep(happy_eyeballs_delay)
111+
should_start = mutex.synchronize do
112+
if connected_socket
113+
false
114+
else
115+
ipv4_started = true
116+
true
117+
end
118+
end
119+
120+
if should_start
121+
fallback_addresses.each do |address|
122+
wrapper.schedule do
123+
attempt_connection.call(address)
124+
end
125+
end
126+
end
127+
end
128+
129+
# Wait for a successful connection or all failures
130+
mutex.synchronize do
131+
loop do
132+
break if connected_socket
133+
# All connections have completed if:
134+
# - IPv4 connections have started (or were skipped)
135+
# - No pending connections remain
136+
break if ipv4_started && pending_count == 0
137+
condition_variable.wait(mutex)
138+
end
139+
end
140+
141+
# Ensure fallback scheduling completes:
142+
fallback_delayed.join if fallback_delayed.alive?
143+
144+
if connected_socket
145+
return connected_socket unless block_given?
146+
147+
begin
148+
return yield(connected_socket)
149+
ensure
150+
connected_socket.close
151+
end
152+
else
153+
# All connections failed, raise the last error
154+
raise connection_errors.last || IOError.new("Connection failed!")
155+
end
61156
end
62157

63158
# Invokes the given block for every address which can be bound to.
@@ -77,6 +172,31 @@ def each
77172
yield AddressEndpoint.new(address, **@options)
78173
end
79174
end
175+
176+
private
177+
178+
# Sequential connection fallback for single address or single protocol family
179+
def connect_sequential(addresses, wrapper, &block)
180+
last_error = nil
181+
182+
addresses.each do |address|
183+
begin
184+
socket = wrapper.connect(address, **@options)
185+
rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::EAGAIN => last_error
186+
# Try next address
187+
else
188+
return socket unless block_given?
189+
190+
begin
191+
return yield(socket)
192+
ensure
193+
socket.close
194+
end
195+
end
196+
end
197+
198+
raise last_error
199+
end
80200
end
81201

82202
# @param arguments nodename, service, family, socktype, protocol, flags. `socktype` will be set to Socket::SOCK_STREAM.

test/io/endpoint/host_endpoint.rb

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,106 @@
4747
expect(endpoint.inspect).to be == "#<IO::Endpoint::HostEndpoint name=\"localhost\" service=0 family=nil type=1 protocol=nil flags=nil>"
4848
end
4949
end
50+
51+
with "Happy Eyeballs" do
52+
it "can connect using Happy Eyeballs algorithm" do
53+
bound = endpoint.bound
54+
55+
bound.bind do |server|
56+
expect(server).to be_a(Socket)
57+
58+
thread = Thread.new do
59+
peer, address = server.accept
60+
peer.close
61+
end
62+
63+
# Wait for server to be ready
64+
Thread.pass until thread.status == "sleep"
65+
66+
server_endpoint = subject.new(["localhost", server.local_address.ip_port, nil, ::Socket::SOCK_STREAM])
67+
68+
client = server_endpoint.connect
69+
expect(client).to be_a(Socket)
70+
71+
# Wait for the connection to be closed
72+
client.wait_readable
73+
client.close
74+
75+
thread.join
76+
end
77+
ensure
78+
bound&.close
79+
end
80+
81+
it "raises error when all connections fail" do
82+
# Try to connect to a port that's definitely not listening
83+
endpoint = subject.new(["localhost", 65535, nil, ::Socket::SOCK_STREAM])
84+
85+
expect do
86+
endpoint.connect
87+
end.to raise_exception(Errno::ECONNREFUSED)
88+
end
89+
90+
it "respects happy_eyeballs_delay option" do
91+
bound = endpoint.bound
92+
93+
bound.bind do |server|
94+
expect(server).to be_a(Socket)
95+
96+
thread = Thread.new do
97+
peer, address = server.accept
98+
peer.close
99+
end
100+
101+
Thread.pass until thread.status == "sleep"
102+
103+
server_endpoint = subject.new(["localhost", server.local_address.ip_port, nil, ::Socket::SOCK_STREAM], happy_eyeballs_delay: 0.1)
104+
105+
start_time = Time.now
106+
client = server_endpoint.connect
107+
elapsed = Time.now - start_time
108+
109+
# Connection should succeed quickly (before the delay)
110+
expect(elapsed).to be < 0.1
111+
expect(client).to be_a(Socket)
112+
113+
client.close
114+
thread.join
115+
end
116+
ensure
117+
bound&.close
118+
end
119+
120+
it "can override happy_eyeballs_delay per connection" do
121+
bound = endpoint.bound
122+
123+
bound.bind do |server|
124+
expect(server).to be_a(Socket)
125+
126+
thread = Thread.new do
127+
peer, address = server.accept
128+
peer.close
129+
end
130+
131+
Thread.pass until thread.status == "sleep"
132+
133+
server_endpoint = subject.new(["localhost", server.local_address.ip_port, nil, ::Socket::SOCK_STREAM], happy_eyeballs_delay: 0.2)
134+
135+
start_time = Time.now
136+
client = server_endpoint.connect(happy_eyeballs_delay: 0.01)
137+
elapsed = Time.now - start_time
138+
139+
# Connection should succeed quickly (using the override delay)
140+
expect(elapsed).to be < 0.1
141+
expect(client).to be_a(Socket)
142+
143+
client.close
144+
thread.join
145+
end
146+
ensure
147+
bound&.close
148+
end
149+
end
50150
end
51151

52152
describe IO::Endpoint do

0 commit comments

Comments
 (0)