diff --git a/lib/io/endpoint/host_endpoint.rb b/lib/io/endpoint/host_endpoint.rb index 96fcd4f..102a2fe 100644 --- a/lib/io/endpoint/host_endpoint.rb +++ b/lib/io/endpoint/host_endpoint.rb @@ -31,7 +31,6 @@ def inspect "\#<#{self.class} name=#{nodename.inspect} service=#{service.inspect} family=#{family.inspect} type=#{socktype.inspect} protocol=#{protocol.inspect} flags=#{flags.inspect}>" end - # @attribute [Array] The host specification array. attr :specification @@ -47,31 +46,126 @@ def service @specification[1] end - # Try to connect to the given host by connecting to each address in sequence until a connection is made. - # @yields {|socket| ...} If a block is given, yields the connected socket (may be invoked multiple times during connection attempts). - # @parameter socket [Socket] The socket which is being connected. - # @returns [Socket] the connected socket - # @raise if no connection could complete successfully - def connect(wrapper = self.wrapper, &block) - last_error = nil + # Try to connect ot the given host using the given wrapper. + # + # The implementation uses Happy Eyeballs (RFC 8305) algorithm if it makes sense to do so. This attempts IPv6 and IPv4 connections in parallel, preferring IPv6 but starting IPv4 attempts after a short delay to improve connection speed. + # + # @parameter happy_eyeballs_delay [Float] Delay in seconds before starting IPv4 connections (defaults to @options[:happy_eyeballs_delay] or 0.05) + # @yields {|socket| ...} the socket which is being connected, may be invoked more than once. + # @returns [Socket] the connected socket. + # @raises if no connection could complete successfully. + def connect(wrapper = self.wrapper, happy_eyeballs_delay: nil, &block) + happy_eyeballs_delay ||= @options.fetch(:happy_eyeballs_delay, 0.05) - Addrinfo.foreach(*@specification) do |address| + # Collect all addresses first: + addresses = Addrinfo.foreach(*@specification).to_a + + # If only one address, use simple sequential connection: + return connect_sequential(addresses, wrapper, &block) if addresses.size <= 1 + + # Separate IPv6 and fallback addresses: + ipv6_addresses, fallback_addresses = addresses.partition(&:ipv6?) + + # If we only have one protocol family, use sequential connection: + if ipv6_addresses.empty? || fallback_addresses.empty? + return connect_sequential(addresses, wrapper, &block) + end + + # Happy Eyeballs: try IPv6 immediately, fallback addresses after delay: + connected_socket = nil + connection_errors = [] + pending_count = 0 + ipv4_started = false + mutex = Mutex.new + condition_variable = ConditionVariable.new + + # Helper to attempt a connection: + attempt_connection = proc do |address| + should_continue = mutex.synchronize do + if connected_socket + false + else + pending_count += 1 + true + end + end + next unless should_continue + begin socket = wrapper.connect(address, **@options) - rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::EAGAIN => last_error - # Try again unless if possible, otherwise raise... - else - return socket unless block_given? - - begin - return yield(socket) - ensure - socket.close + mutex.synchronize do + if connected_socket + # Another connection succeeded first, close this one: + socket.close + else + connected_socket = socket + condition_variable.broadcast + end + end + rescue => error + mutex.synchronize do + connection_errors << error + pending_count -= 1 + condition_variable.broadcast end end end - raise last_error + # Start IPv6 connections immediately: + ipv6_addresses.each do |address| + wrapper.schedule do + attempt_connection.call(address) + end + end + + # Start fallback connections after delay: + fallback_delayed = wrapper.schedule do + sleep(happy_eyeballs_delay) + should_start = mutex.synchronize do + if connected_socket + false + else + ipv4_started = true + true + end + end + + if should_start + fallback_addresses.each do |address| + wrapper.schedule do + attempt_connection.call(address) + end + end + end + end + + # Wait for a successful connection or all failures + mutex.synchronize do + loop do + break if connected_socket + # All connections have completed if: + # - IPv4 connections have started (or were skipped) + # - No pending connections remain + break if ipv4_started && pending_count == 0 + condition_variable.wait(mutex) + end + end + + # Ensure fallback scheduling completes: + fallback_delayed.join if fallback_delayed.alive? + + if connected_socket + return connected_socket unless block_given? + + begin + return yield(connected_socket) + ensure + connected_socket.close + end + else + # All connections failed, raise the last error + raise connection_errors.last || IOError.new("Connection failed!") + end end # Invokes the given block for every address which can be bound to. @@ -93,6 +187,31 @@ def each yield AddressEndpoint.new(address, **@options) end end + + private + + # Sequential connection fallback for single address or single protocol family + def connect_sequential(addresses, wrapper, &block) + last_error = nil + + addresses.each do |address| + begin + socket = wrapper.connect(address, **@options) + rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::EAGAIN => last_error + # Try next address + else + return socket unless block_given? + + begin + return yield(socket) + ensure + socket.close + end + end + end + + raise last_error + end end # @parameter arguments nodename, service, family, socktype, protocol, flags. `socktype` will be set to Socket::SOCK_STREAM. diff --git a/test/io/endpoint/host_endpoint.rb b/test/io/endpoint/host_endpoint.rb index 2e7c0ae..5079791 100644 --- a/test/io/endpoint/host_endpoint.rb +++ b/test/io/endpoint/host_endpoint.rb @@ -47,6 +47,106 @@ expect(endpoint.inspect).to be == "#" end end + + with "Happy Eyeballs" do + it "can connect using Happy Eyeballs algorithm" do + bound = endpoint.bound + + bound.bind do |server| + expect(server).to be_a(Socket) + + thread = Thread.new do + peer, address = server.accept + peer.close + end + + # Wait for server to be ready + Thread.pass until thread.status == "sleep" + + server_endpoint = subject.new(["localhost", server.local_address.ip_port, nil, ::Socket::SOCK_STREAM]) + + client = server_endpoint.connect + expect(client).to be_a(Socket) + + # Wait for the connection to be closed + client.wait_readable + client.close + + thread.join + end + ensure + bound&.close + end + + it "raises error when all connections fail" do + # Try to connect to a port that's definitely not listening + endpoint = subject.new(["localhost", 65535, nil, ::Socket::SOCK_STREAM]) + + expect do + endpoint.connect + end.to raise_exception(Errno::ECONNREFUSED) + end + + it "respects happy_eyeballs_delay option" do + bound = endpoint.bound + + bound.bind do |server| + expect(server).to be_a(Socket) + + thread = Thread.new do + peer, address = server.accept + peer.close + end + + Thread.pass until thread.status == "sleep" + + server_endpoint = subject.new(["localhost", server.local_address.ip_port, nil, ::Socket::SOCK_STREAM], happy_eyeballs_delay: 0.1) + + start_time = Time.now + client = server_endpoint.connect + elapsed = Time.now - start_time + + # Connection should succeed quickly (before the delay) + expect(elapsed).to be < 0.1 + expect(client).to be_a(Socket) + + client.close + thread.join + end + ensure + bound&.close + end + + it "can override happy_eyeballs_delay per connection" do + bound = endpoint.bound + + bound.bind do |server| + expect(server).to be_a(Socket) + + thread = Thread.new do + peer, address = server.accept + peer.close + end + + Thread.pass until thread.status == "sleep" + + server_endpoint = subject.new(["localhost", server.local_address.ip_port, nil, ::Socket::SOCK_STREAM], happy_eyeballs_delay: 0.2) + + start_time = Time.now + client = server_endpoint.connect(happy_eyeballs_delay: 0.01) + elapsed = Time.now - start_time + + # Connection should succeed quickly (using the override delay) + expect(elapsed).to be < 0.1 + expect(client).to be_a(Socket) + + client.close + thread.join + end + ensure + bound&.close + end + end end describe IO::Endpoint do