Skip to content

Commit e766e2c

Browse files
authored
Merge pull request #877 from abicky/support-seed-broker-with-multiple-addresses
Support seed brokers' hostname with multiple addresses
2 parents a56d16b + ef7c21c commit e766e2c

File tree

5 files changed

+86
-23
lines changed

5 files changed

+86
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Changes and additions to the library will be listed here.
66

77
- Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866).
88
- Add support for `murmur2` based partitioning.
9+
- Add `resolve_seed_brokers` option to support seed brokers' hostname with multiple addresses (#877).
910

1011
## 1.3.0
1112

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,12 @@ require "kafka"
176176
kafka = Kafka.new(["kafka1:9092", "kafka2:9092"], client_id: "my-application")
177177
```
178178

179+
You can also use a hostname with seed brokers' IP addresses:
180+
181+
```ruby
182+
kafka = Kafka.new("seed-brokers:9092", client_id: "my-application", resolve_seed_brokers: true)
183+
```
184+
179185
### Producing Messages to Kafka
180186

181187
The simplest way to write a message to a Kafka topic is to call `#deliver_message`:

lib/kafka/client.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,22 @@ class Client
7474
# the SSL certificate and the signing chain of the certificate have the correct domains
7575
# based on the CA certificate
7676
#
77+
# @param resolve_seed_brokers [Boolean] whether to resolve each hostname of the seed brokers.
78+
# If a broker is resolved to multiple IP addresses, the client tries to connect to each
79+
# of the addresses until it can connect.
80+
#
7781
# @return [Client]
7882
def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil,
7983
ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
8084
ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil,
8185
sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil,
8286
sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil,
83-
sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true)
87+
sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true,
88+
resolve_seed_brokers: false)
8489
@logger = TaggedLogger.new(logger)
8590
@instrumenter = Instrumenter.new(client_id: client_id)
8691
@seed_brokers = normalize_seed_brokers(seed_brokers)
92+
@resolve_seed_brokers = resolve_seed_brokers
8793

8894
ssl_context = SslContext.build(
8995
ca_cert_file_path: ssl_ca_cert_file_path,
@@ -809,6 +815,7 @@ def initialize_cluster
809815
seed_brokers: @seed_brokers,
810816
broker_pool: broker_pool,
811817
logger: @logger,
818+
resolve_seed_brokers: @resolve_seed_brokers,
812819
)
813820
end
814821

lib/kafka/cluster.rb

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22

33
require "kafka/broker_pool"
4+
require "resolv"
45
require "set"
56

67
module Kafka
@@ -18,14 +19,16 @@ class Cluster
1819
# @param seed_brokers [Array<URI>]
1920
# @param broker_pool [Kafka::BrokerPool]
2021
# @param logger [Logger]
21-
def initialize(seed_brokers:, broker_pool:, logger:)
22+
# @param resolve_seed_brokers [Boolean] See {Kafka::Client#initialize}
23+
def initialize(seed_brokers:, broker_pool:, logger:, resolve_seed_brokers: false)
2224
if seed_brokers.empty?
2325
raise ArgumentError, "At least one seed broker must be configured"
2426
end
2527

2628
@logger = TaggedLogger.new(logger)
2729
@seed_brokers = seed_brokers
2830
@broker_pool = broker_pool
31+
@resolve_seed_brokers = resolve_seed_brokers
2932
@cluster_info = nil
3033
@stale = true
3134

@@ -418,32 +421,35 @@ def get_leader_id(topic, partition)
418421
# @return [Protocol::MetadataResponse] the cluster metadata.
419422
def fetch_cluster_info
420423
errors = []
421-
422424
@seed_brokers.shuffle.each do |node|
423-
@logger.info "Fetching cluster metadata from #{node}"
424-
425-
begin
426-
broker = @broker_pool.connect(node.hostname, node.port)
427-
cluster_info = broker.fetch_metadata(topics: @target_topics)
428-
429-
if cluster_info.brokers.empty?
430-
@logger.error "No brokers in cluster"
431-
else
432-
@logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"
433-
434-
@stale = false
435-
436-
return cluster_info
425+
(@resolve_seed_brokers ? Resolv.getaddresses(node.hostname).shuffle : [node.hostname]).each do |hostname_or_ip|
426+
node_info = node.to_s
427+
node_info << " (#{hostname_or_ip})" if node.hostname != hostname_or_ip
428+
@logger.info "Fetching cluster metadata from #{node_info}"
429+
430+
begin
431+
broker = @broker_pool.connect(hostname_or_ip, node.port)
432+
cluster_info = broker.fetch_metadata(topics: @target_topics)
433+
434+
if cluster_info.brokers.empty?
435+
@logger.error "No brokers in cluster"
436+
else
437+
@logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"
438+
439+
@stale = false
440+
441+
return cluster_info
442+
end
443+
rescue Error => e
444+
@logger.error "Failed to fetch metadata from #{node_info}: #{e}"
445+
errors << [node_info, e]
446+
ensure
447+
broker.disconnect unless broker.nil?
437448
end
438-
rescue Error => e
439-
@logger.error "Failed to fetch metadata from #{node}: #{e}"
440-
errors << [node, e]
441-
ensure
442-
broker.disconnect unless broker.nil?
443449
end
444450
end
445451

446-
error_description = errors.map {|node, exception| "- #{node}: #{exception}" }.join("\n")
452+
error_description = errors.map {|node_info, exception| "- #{node_info}: #{exception}" }.join("\n")
447453

448454
raise ConnectionError, "Could not connect to any of the seed brokers:\n#{error_description}"
449455
end

spec/cluster_spec.rb

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,47 @@
103103
}.to raise_exception(ArgumentError)
104104
end
105105
end
106+
107+
describe "#cluster_info" do
108+
let(:cluster) {
109+
Kafka::Cluster.new(
110+
seed_brokers: [URI("kafka://test1:9092")],
111+
broker_pool: broker_pool,
112+
logger: LOGGER,
113+
resolve_seed_brokers: resolve_seed_brokers,
114+
)
115+
}
116+
117+
before do
118+
allow(broker).to receive(:fetch_metadata) { raise Kafka::ConnectionError, "Operation timed out" }
119+
allow(broker).to receive(:disconnect)
120+
end
121+
122+
context "when resolve_seed_brokers is false" do
123+
let(:resolve_seed_brokers) { false }
124+
125+
it "tries the seed broker hostnames as is" do
126+
expect(broker_pool).to receive(:connect).with("test1", 9092) { broker }
127+
expect {
128+
cluster.cluster_info
129+
}.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092: Operation timed out})
130+
end
131+
end
132+
133+
context "when resolve_seed_brokers is true" do
134+
let(:resolve_seed_brokers) { true }
135+
136+
before do
137+
allow(Resolv).to receive(:getaddresses) { ["127.0.0.1", "::1"] }
138+
end
139+
140+
it "tries all the resolved IP addresses" do
141+
expect(broker_pool).to receive(:connect).with("127.0.0.1", 9092) { broker }
142+
expect(broker_pool).to receive(:connect).with("::1", 9092) { broker }
143+
expect {
144+
cluster.cluster_info
145+
}.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092 \(127\.0\.0\.1\): Operation timed out})
146+
end
147+
end
148+
end
106149
end

0 commit comments

Comments
 (0)