Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 138 additions & 19 deletions lib/io/endpoint/host_endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def inspect
"\#<#{self.class} name=#{nodename.inspect} service=#{service.inspect} family=#{family.inspect} type=#{socktype.inspect} protocol=#{protocol.inspect} flags=#{flags.inspect}>"
end


# @attribute [Array] The host specification array.
attr :specification

Expand All @@ -47,31 +46,126 @@ def service
@specification[1]
end

# Try to connect to the given host by connecting to each address in sequence until a connection is made.
# @yields {|socket| ...} If a block is given, yields the connected socket (may be invoked multiple times during connection attempts).
# @parameter socket [Socket] The socket which is being connected.
# @returns [Socket] the connected socket
# @raise if no connection could complete successfully
def connect(wrapper = self.wrapper, &block)
last_error = nil
# Try to connect ot the given host using the given wrapper.
#
# The implementation uses Happy Eyeballs (RFC 8305) algorithm if it makes sense to do so. This attempts IPv6 and IPv4 connections in parallel, preferring IPv6 but starting IPv4 attempts after a short delay to improve connection speed.
#
# @parameter happy_eyeballs_delay [Float] Delay in seconds before starting IPv4 connections (defaults to @options[:happy_eyeballs_delay] or 0.05)
# @yields {|socket| ...} the socket which is being connected, may be invoked more than once.
# @returns [Socket] the connected socket.
# @raises if no connection could complete successfully.
def connect(wrapper = self.wrapper, happy_eyeballs_delay: nil, &block)
happy_eyeballs_delay ||= @options.fetch(:happy_eyeballs_delay, 0.05)

Addrinfo.foreach(*@specification) do |address|
# Collect all addresses first:
addresses = Addrinfo.foreach(*@specification).to_a

# If only one address, use simple sequential connection:
return connect_sequential(addresses, wrapper, &block) if addresses.size <= 1

# Separate IPv6 and fallback addresses:
ipv6_addresses, fallback_addresses = addresses.partition(&:ipv6?)

# If we only have one protocol family, use sequential connection:
if ipv6_addresses.empty? || fallback_addresses.empty?
return connect_sequential(addresses, wrapper, &block)
end

# Happy Eyeballs: try IPv6 immediately, fallback addresses after delay:
connected_socket = nil
connection_errors = []
pending_count = 0
ipv4_started = false
mutex = Mutex.new
condition_variable = ConditionVariable.new

# Helper to attempt a connection:
attempt_connection = proc do |address|
should_continue = mutex.synchronize do
if connected_socket
false
else
pending_count += 1
true
end
end
next unless should_continue

begin
socket = wrapper.connect(address, **@options)
rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::EAGAIN => last_error
# Try again unless if possible, otherwise raise...
else
return socket unless block_given?

begin
return yield(socket)
ensure
socket.close
mutex.synchronize do
if connected_socket
# Another connection succeeded first, close this one:
socket.close
else
connected_socket = socket
condition_variable.broadcast
end
end
rescue => error
mutex.synchronize do
connection_errors << error
pending_count -= 1
condition_variable.broadcast
end
end
end

raise last_error
# Start IPv6 connections immediately:
ipv6_addresses.each do |address|
wrapper.schedule do
attempt_connection.call(address)
end
end

# Start fallback connections after delay:
fallback_delayed = wrapper.schedule do
sleep(happy_eyeballs_delay)
should_start = mutex.synchronize do
if connected_socket
false
else
ipv4_started = true
true
end
end

if should_start
fallback_addresses.each do |address|
wrapper.schedule do
attempt_connection.call(address)
end
end
end
end

# Wait for a successful connection or all failures
mutex.synchronize do
loop do
break if connected_socket
# All connections have completed if:
# - IPv4 connections have started (or were skipped)
# - No pending connections remain
break if ipv4_started && pending_count == 0
condition_variable.wait(mutex)
end
end

# Ensure fallback scheduling completes:
fallback_delayed.join if fallback_delayed.alive?

if connected_socket
return connected_socket unless block_given?

begin
return yield(connected_socket)
ensure
connected_socket.close
end
else
# All connections failed, raise the last error
raise connection_errors.last || IOError.new("Connection failed!")
end
end

# Invokes the given block for every address which can be bound to.
Expand All @@ -93,6 +187,31 @@ def each
yield AddressEndpoint.new(address, **@options)
end
end

private

# Sequential connection fallback for single address or single protocol family
def connect_sequential(addresses, wrapper, &block)
last_error = nil

addresses.each do |address|
begin
socket = wrapper.connect(address, **@options)
rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::EAGAIN => last_error
# Try next address
else
return socket unless block_given?

begin
return yield(socket)
ensure
socket.close
end
end
end

raise last_error
end
end

# @parameter arguments nodename, service, family, socktype, protocol, flags. `socktype` will be set to Socket::SOCK_STREAM.
Expand Down
100 changes: 100 additions & 0 deletions test/io/endpoint/host_endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,106 @@
expect(endpoint.inspect).to be == "#<IO::Endpoint::HostEndpoint name=\"localhost\" service=0 family=nil type=1 protocol=nil flags=nil>"
end
end

with "Happy Eyeballs" do
it "can connect using Happy Eyeballs algorithm" do
bound = endpoint.bound

bound.bind do |server|
expect(server).to be_a(Socket)

thread = Thread.new do
peer, address = server.accept
peer.close
end

# Wait for server to be ready
Thread.pass until thread.status == "sleep"

server_endpoint = subject.new(["localhost", server.local_address.ip_port, nil, ::Socket::SOCK_STREAM])

client = server_endpoint.connect
expect(client).to be_a(Socket)

# Wait for the connection to be closed
client.wait_readable
client.close

thread.join
end
ensure
bound&.close
end

it "raises error when all connections fail" do
# Try to connect to a port that's definitely not listening
endpoint = subject.new(["localhost", 65535, nil, ::Socket::SOCK_STREAM])

expect do
endpoint.connect
end.to raise_exception(Errno::ECONNREFUSED)
end

it "respects happy_eyeballs_delay option" do
bound = endpoint.bound

bound.bind do |server|
expect(server).to be_a(Socket)

thread = Thread.new do
peer, address = server.accept
peer.close
end

Thread.pass until thread.status == "sleep"

server_endpoint = subject.new(["localhost", server.local_address.ip_port, nil, ::Socket::SOCK_STREAM], happy_eyeballs_delay: 0.1)

start_time = Time.now
client = server_endpoint.connect
elapsed = Time.now - start_time

# Connection should succeed quickly (before the delay)
expect(elapsed).to be < 0.1
expect(client).to be_a(Socket)

client.close
thread.join
end
ensure
bound&.close
end

it "can override happy_eyeballs_delay per connection" do
bound = endpoint.bound

bound.bind do |server|
expect(server).to be_a(Socket)

thread = Thread.new do
peer, address = server.accept
peer.close
end

Thread.pass until thread.status == "sleep"

server_endpoint = subject.new(["localhost", server.local_address.ip_port, nil, ::Socket::SOCK_STREAM], happy_eyeballs_delay: 0.2)

start_time = Time.now
client = server_endpoint.connect(happy_eyeballs_delay: 0.01)
elapsed = Time.now - start_time

# Connection should succeed quickly (using the override delay)
expect(elapsed).to be < 0.1
expect(client).to be_a(Socket)

client.close
thread.join
end
ensure
bound&.close
end
end
end

describe IO::Endpoint do
Expand Down
Loading