Skip to content

Commit cd3b941

Browse files
Add happy eyeballs.
1 parent c925ad8 commit cd3b941

File tree

2 files changed

+238
-19
lines changed

2 files changed

+238
-19
lines changed

lib/io/endpoint/host_endpoint.rb

Lines changed: 138 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ def inspect
3131
"\#<#{self.class} name=#{nodename.inspect} service=#{service.inspect} family=#{family.inspect} type=#{socktype.inspect} protocol=#{protocol.inspect} flags=#{flags.inspect}>"
3232
end
3333

34-
3534
# @attribute [Array] The host specification array.
3635
attr :specification
3736

@@ -47,31 +46,126 @@ def service
4746
@specification[1]
4847
end
4948

50-
# Try to connect to the given host by connecting to each address in sequence until a connection is made.
51-
# @yields {|socket| ...} If a block is given, yields the connected socket (may be invoked multiple times during connection attempts).
52-
# @parameter socket [Socket] The socket which is being connected.
53-
# @returns [Socket] the connected socket
54-
# @raise if no connection could complete successfully
55-
def connect(wrapper = self.wrapper, &block)
56-
last_error = nil
49+
# Try to connect ot the given host using the given wrapper.
50+
#
51+
# The implementation uses Happy Eyeballs (RFC 8305) algorithm if it makes sense to do so. This attempts IPv6 and IPv4 connections in parallel, preferring IPv6 but starting IPv4 attempts after a short delay to improve connection speed.
52+
#
53+
# @parameter happy_eyeballs_delay [Float] Delay in seconds before starting IPv4 connections (defaults to @options[:happy_eyeballs_delay] or 0.05)
54+
# @yields {|socket| ...} the socket which is being connected, may be invoked more than once.
55+
# @returns [Socket] the connected socket.
56+
# @raises if no connection could complete successfully.
57+
def connect(wrapper = self.wrapper, happy_eyeballs_delay: nil, &block)
58+
happy_eyeballs_delay ||= @options.fetch(:happy_eyeballs_delay, 0.05)
5759

58-
Addrinfo.foreach(*@specification) do |address|
60+
# Collect all addresses first:
61+
addresses = Addrinfo.foreach(*@specification).to_a
62+
63+
# If only one address, use simple sequential connection:
64+
return connect_sequential(addresses, wrapper, &block) if addresses.size <= 1
65+
66+
# Separate IPv6 and fallback addresses:
67+
ipv6_addresses, fallback_addresses = addresses.partition(&:ipv6?)
68+
69+
# If we only have one protocol family, use sequential connection:
70+
if ipv6_addresses.empty? || fallback_addresses.empty?
71+
return connect_sequential(addresses, wrapper, &block)
72+
end
73+
74+
# Happy Eyeballs: try IPv6 immediately, fallback addresses after delay:
75+
connected_socket = nil
76+
connection_errors = []
77+
pending_count = 0
78+
ipv4_started = false
79+
mutex = Mutex.new
80+
condition_variable = ConditionVariable.new
81+
82+
# Helper to attempt a connection:
83+
attempt_connection = proc do |address|
84+
should_continue = mutex.synchronize do
85+
if connected_socket
86+
false
87+
else
88+
pending_count += 1
89+
true
90+
end
91+
end
92+
next unless should_continue
93+
5994
begin
6095
socket = wrapper.connect(address, **@options)
61-
rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::EAGAIN => last_error
62-
# Try again unless if possible, otherwise raise...
63-
else
64-
return socket unless block_given?
65-
66-
begin
67-
return yield(socket)
68-
ensure
69-
socket.close
96+
mutex.synchronize do
97+
if connected_socket
98+
# Another connection succeeded first, close this one:
99+
socket.close
100+
else
101+
connected_socket = socket
102+
condition_variable.broadcast
103+
end
104+
end
105+
rescue => error
106+
mutex.synchronize do
107+
connection_errors << error
108+
pending_count -= 1
109+
condition_variable.broadcast
70110
end
71111
end
72112
end
73113

74-
raise last_error
114+
# Start IPv6 connections immediately:
115+
ipv6_addresses.each do |address|
116+
wrapper.schedule do
117+
attempt_connection.call(address)
118+
end
119+
end
120+
121+
# Start fallback connections after delay:
122+
fallback_delayed = wrapper.schedule do
123+
sleep(happy_eyeballs_delay)
124+
should_start = mutex.synchronize do
125+
if connected_socket
126+
false
127+
else
128+
ipv4_started = true
129+
true
130+
end
131+
end
132+
133+
if should_start
134+
fallback_addresses.each do |address|
135+
wrapper.schedule do
136+
attempt_connection.call(address)
137+
end
138+
end
139+
end
140+
end
141+
142+
# Wait for a successful connection or all failures
143+
mutex.synchronize do
144+
loop do
145+
break if connected_socket
146+
# All connections have completed if:
147+
# - IPv4 connections have started (or were skipped)
148+
# - No pending connections remain
149+
break if ipv4_started && pending_count == 0
150+
condition_variable.wait(mutex)
151+
end
152+
end
153+
154+
# Ensure fallback scheduling completes:
155+
fallback_delayed.join if fallback_delayed.alive?
156+
157+
if connected_socket
158+
return connected_socket unless block_given?
159+
160+
begin
161+
return yield(connected_socket)
162+
ensure
163+
connected_socket.close
164+
end
165+
else
166+
# All connections failed, raise the last error
167+
raise connection_errors.last || IOError.new("Connection failed!")
168+
end
75169
end
76170

77171
# Invokes the given block for every address which can be bound to.
@@ -93,6 +187,31 @@ def each
93187
yield AddressEndpoint.new(address, **@options)
94188
end
95189
end
190+
191+
private
192+
193+
# Sequential connection fallback for single address or single protocol family
194+
def connect_sequential(addresses, wrapper, &block)
195+
last_error = nil
196+
197+
addresses.each do |address|
198+
begin
199+
socket = wrapper.connect(address, **@options)
200+
rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::EAGAIN => last_error
201+
# Try next address
202+
else
203+
return socket unless block_given?
204+
205+
begin
206+
return yield(socket)
207+
ensure
208+
socket.close
209+
end
210+
end
211+
end
212+
213+
raise last_error
214+
end
96215
end
97216

98217
# @parameter arguments nodename, service, family, socktype, protocol, flags. `socktype` will be set to Socket::SOCK_STREAM.

test/io/endpoint/host_endpoint.rb

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,106 @@
4747
expect(endpoint.inspect).to be == "#<IO::Endpoint::HostEndpoint name=\"localhost\" service=0 family=nil type=1 protocol=nil flags=nil>"
4848
end
4949
end
50+
51+
with "Happy Eyeballs" do
52+
it "can connect using Happy Eyeballs algorithm" do
53+
bound = endpoint.bound
54+
55+
bound.bind do |server|
56+
expect(server).to be_a(Socket)
57+
58+
thread = Thread.new do
59+
peer, address = server.accept
60+
peer.close
61+
end
62+
63+
# Wait for server to be ready
64+
Thread.pass until thread.status == "sleep"
65+
66+
server_endpoint = subject.new(["localhost", server.local_address.ip_port, nil, ::Socket::SOCK_STREAM])
67+
68+
client = server_endpoint.connect
69+
expect(client).to be_a(Socket)
70+
71+
# Wait for the connection to be closed
72+
client.wait_readable
73+
client.close
74+
75+
thread.join
76+
end
77+
ensure
78+
bound&.close
79+
end
80+
81+
it "raises error when all connections fail" do
82+
# Try to connect to a port that's definitely not listening
83+
endpoint = subject.new(["localhost", 65535, nil, ::Socket::SOCK_STREAM])
84+
85+
expect do
86+
endpoint.connect
87+
end.to raise_exception(Errno::ECONNREFUSED)
88+
end
89+
90+
it "respects happy_eyeballs_delay option" do
91+
bound = endpoint.bound
92+
93+
bound.bind do |server|
94+
expect(server).to be_a(Socket)
95+
96+
thread = Thread.new do
97+
peer, address = server.accept
98+
peer.close
99+
end
100+
101+
Thread.pass until thread.status == "sleep"
102+
103+
server_endpoint = subject.new(["localhost", server.local_address.ip_port, nil, ::Socket::SOCK_STREAM], happy_eyeballs_delay: 0.1)
104+
105+
start_time = Time.now
106+
client = server_endpoint.connect
107+
elapsed = Time.now - start_time
108+
109+
# Connection should succeed quickly (before the delay)
110+
expect(elapsed).to be < 0.1
111+
expect(client).to be_a(Socket)
112+
113+
client.close
114+
thread.join
115+
end
116+
ensure
117+
bound&.close
118+
end
119+
120+
it "can override happy_eyeballs_delay per connection" do
121+
bound = endpoint.bound
122+
123+
bound.bind do |server|
124+
expect(server).to be_a(Socket)
125+
126+
thread = Thread.new do
127+
peer, address = server.accept
128+
peer.close
129+
end
130+
131+
Thread.pass until thread.status == "sleep"
132+
133+
server_endpoint = subject.new(["localhost", server.local_address.ip_port, nil, ::Socket::SOCK_STREAM], happy_eyeballs_delay: 0.2)
134+
135+
start_time = Time.now
136+
client = server_endpoint.connect(happy_eyeballs_delay: 0.01)
137+
elapsed = Time.now - start_time
138+
139+
# Connection should succeed quickly (using the override delay)
140+
expect(elapsed).to be < 0.1
141+
expect(client).to be_a(Socket)
142+
143+
client.close
144+
thread.join
145+
end
146+
ensure
147+
bound&.close
148+
end
149+
end
50150
end
51151

52152
describe IO::Endpoint do

0 commit comments

Comments
 (0)