Skip to content

Commit ec945bf

Browse files
Ian DriverIan Driver
authored andcommitted
Fix #2969: Add timeout to establish_connection to prevent infinite loop
- Add timeout check in establish_connection method using send_timeout - Prevents infinite loop when connection drops during handshake protocol - Disables problematic nodes and logs timeout warnings - Add test case to verify timeout functionality works correctly Fixes issue where logs stop being flushed when handshake gets stuck in unstable network environments with proxy components. Signed-off-by: Ian Driver <[email protected]>
1 parent d146812 commit ec945bf

File tree

2 files changed

+33
-0
lines changed

2 files changed

+33
-0
lines changed

lib/fluent/plugin/out_forward.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,17 @@ def verify_connection
620620
end
621621

622622
def establish_connection(sock, ri)
623+
start_time = Fluent::Clock.now
624+
timeout = @sender.send_timeout
625+
623626
while ri.state != :established
627+
# Check for timeout to prevent infinite loop
628+
if Fluent::Clock.now - start_time > timeout
629+
@log.warn "handshake timeout after #{timeout}s", host: @host, port: @port
630+
disable!
631+
break
632+
end
633+
624634
begin
625635
# TODO: On Ruby 2.2 or earlier, read_nonblock doesn't work expectedly.
626636
# We need rewrite around here using new socket/server plugin helper.

test/plugin/test_out_forward.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1406,4 +1406,27 @@ def plugin_id_for_test?
14061406
assert_equal 0, @d.instance.healthy_nodes_count
14071407
assert_equal 0, @d.instance.registered_nodes_count
14081408
end
1409+
1410+
test 'establish_connection_timeout' do
1411+
@d = d = create_driver(%[
1412+
send_timeout 1
1413+
<server>
1414+
host #{TARGET_HOST}
1415+
port #{@target_port}
1416+
</server>
1417+
])
1418+
1419+
node = d.instance.nodes.first
1420+
mock_sock = flexmock('socket')
1421+
mock_sock.should_receive(:read_nonblock).with(anything).and_return('').at_least.once
1422+
1423+
ri = Fluent::Plugin::ForwardOutput::ConnectionManager::RequestInfo.new(:helo)
1424+
1425+
assert_true node.available?
1426+
node.establish_connection(mock_sock, ri)
1427+
assert_false node.available?
1428+
1429+
logs = d.logs
1430+
assert{ logs.any?{|log| log.include?('handshake timeout after 1s') } }
1431+
end
14091432
end

0 commit comments

Comments
 (0)