Skip to content

Commit 76f6f23

Browse files
authored
Merge branch 'master' into support-custom-assignment-strategy
2 parents 0f7a9e9 + 51b69e9 commit 76f6f23

14 files changed

+88
-41
lines changed

.ruby-version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.5.1
1+
2.7.1

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ Changes and additions to the library will be listed here.
66

77
- Support custom assignment strategy (#846).
88

9+
## 1.2.0
10+
11+
- Add producer consumer interceptors (#837).
12+
- Add support for configuring the client partitioner (#848).
13+
914
## 1.1.0
1015

1116
- Extra sanity checking when marking offsets as processed (#824).

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,26 @@ partition = PartitioningScheme.assign(partitions, event)
350350
producer.produce(event, topic: "events", partition: partition)
351351
```
352352

353+
Another option is to configure a custom client partitioner that implements `call(partition_count, message)` and uses the same schema as the other client. For example:
354+
355+
```ruby
356+
class CustomPartitioner
357+
def call(partition_count, message)
358+
...
359+
end
360+
end
361+
362+
partitioner = CustomPartitioner.new
363+
Kafka.new(partitioner: partitioner, ...)
364+
```
365+
366+
Or, simply create a Proc handling the partitioning logic instead of having to add a new class. For example:
367+
368+
```ruby
369+
partitioner = -> (partition_count, message) { ... }
370+
Kafka.new(partitioner: partitioner, ...)
371+
```
372+
353373
#### Buffering and Error Handling
354374

355375
The producer is designed for resilience in the face of temporary network errors, Kafka broker failovers, and other issues that prevent the client from writing messages to the destination topics. It does this by employing local, in-memory buffers. Only when messages are acknowledged by a Kafka broker will they be removed from the buffer.

lib/kafka/async_producer.rb

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ module Kafka
5959
# producer.shutdown
6060
#
6161
class AsyncProducer
62-
THREAD_MUTEX = Mutex.new
63-
6462
# Initializes a new AsyncProducer.
6563
#
6664
# @param sync_producer [Kafka::Producer] the synchronous producer that should
@@ -94,6 +92,8 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli
9492

9593
# The timer will no-op if the delivery interval is zero.
9694
@timer = Timer.new(queue: @queue, interval: delivery_interval)
95+
96+
@thread_mutex = Mutex.new
9797
end
9898

9999
# Produces a message to the specified topic.
@@ -131,6 +131,8 @@ def produce(value, topic:, **options)
131131
# @see Kafka::Producer#deliver_messages
132132
# @return [nil]
133133
def deliver_messages
134+
ensure_threads_running!
135+
134136
@queue << [:deliver_messages, nil]
135137

136138
nil
@@ -142,6 +144,8 @@ def deliver_messages
142144
# @see Kafka::Producer#shutdown
143145
# @return [nil]
144146
def shutdown
147+
ensure_threads_running!
148+
145149
@timer_thread && @timer_thread.exit
146150
@queue << [:shutdown, nil]
147151
@worker_thread && @worker_thread.join
@@ -152,17 +156,22 @@ def shutdown
152156
private
153157

154158
def ensure_threads_running!
155-
THREAD_MUTEX.synchronize do
156-
@worker_thread = nil unless @worker_thread && @worker_thread.alive?
157-
@worker_thread ||= Thread.new { @worker.run }
158-
end
159+
return if worker_thread_alive? && timer_thread_alive?
159160

160-
THREAD_MUTEX.synchronize do
161-
@timer_thread = nil unless @timer_thread && @timer_thread.alive?
162-
@timer_thread ||= Thread.new { @timer.run }
161+
@thread_mutex.synchronize do
162+
@worker_thread = Thread.new { @worker.run } unless worker_thread_alive?
163+
@timer_thread = Thread.new { @timer.run } unless timer_thread_alive?
163164
end
164165
end
165166

167+
def worker_thread_alive?
168+
!!@worker_thread && @worker_thread.alive?
169+
end
170+
171+
def timer_thread_alive?
172+
!!@timer_thread && @timer_thread.alive?
173+
end
174+
166175
def buffer_overflow(topic, message)
167176
@instrumenter.instrument("buffer_overflow.async_producer", {
168177
topic: topic,
@@ -246,10 +255,10 @@ def run
246255

247256
private
248257

249-
def produce(*args)
258+
def produce(value, **kwargs)
250259
retries = 0
251260
begin
252-
@producer.produce(*args)
261+
@producer.produce(value, **kwargs)
253262
rescue BufferOverflow => e
254263
deliver_messages
255264
if @max_retries == -1

lib/kafka/client.rb

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,15 @@ class Client
6262
#
6363
# @param sasl_over_ssl [Boolean] whether to enforce SSL with SASL
6464
#
65+
# @param ssl_ca_certs_from_system [Boolean] whether to use the CA certs from the
66+
# system's default certificate store.
67+
#
68+
# @param partitioner [Partitioner, nil] the partitioner that should be used by the client.
69+
#
6570
# @param sasl_oauth_token_provider [Object, nil] OAuthBearer Token Provider instance that
6671
# implements method token. See {Sasl::OAuth#initialize}
6772
#
68-
# @param verify_hostname [Boolean, true] whether to verify that the host serving
73+
# @param ssl_verify_hostname [Boolean, true] whether to verify that the host serving
6974
# the SSL certificate and the signing chain of the certificate have the correct domains
7075
# based on the CA certificate
7176
#
@@ -75,7 +80,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time
7580
ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil,
7681
sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil,
7782
sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil,
78-
sasl_over_ssl: true, ssl_ca_certs_from_system: false, sasl_oauth_token_provider: nil, ssl_verify_hostname: true)
83+
sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true)
7984
@logger = TaggedLogger.new(logger)
8085
@instrumenter = Instrumenter.new(client_id: client_id)
8186
@seed_brokers = normalize_seed_brokers(seed_brokers)
@@ -119,6 +124,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time
119124
)
120125

121126
@cluster = initialize_cluster
127+
@partitioner = partitioner || Partitioner.new
122128
end
123129

124130
# Delivers a single message to the Kafka cluster.
@@ -157,7 +163,7 @@ def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partit
157163

158164
if partition.nil?
159165
partition_count = @cluster.partitions_for(topic).count
160-
partition = Partitioner.partition_for_key(partition_count, message)
166+
partition = @partitioner.call(partition_count, message)
161167
end
162168

163169
buffer = MessageBuffer.new
@@ -295,6 +301,7 @@ def producer(
295301
retry_backoff: retry_backoff,
296302
max_buffer_size: max_buffer_size,
297303
max_buffer_bytesize: max_buffer_bytesize,
304+
partitioner: @partitioner,
298305
interceptors: interceptors
299306
)
300307
end

lib/kafka/datadog.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ class StatsdSubscriber < ActiveSupport::Subscriber
9696
private
9797

9898
%w[increment histogram count timing gauge].each do |type|
99-
define_method(type) do |*args|
100-
emit(type, *args)
99+
define_method(type) do |*args, **kwargs|
100+
emit(type, *args, **kwargs)
101101
end
102102
end
103103

lib/kafka/partitioner.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class Partitioner
1919
# @param message [Kafka::PendingMessage] the message that should be assigned
2020
# a partition.
2121
# @return [Integer] the partition number.
22-
def self.partition_for_key(partition_count, message)
22+
def call(partition_count, message)
2323
raise ArgumentError if partition_count == 0
2424

2525
# If no explicit partition key is specified we use the message key instead.

lib/kafka/producer.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ class Producer
131131
class AbortTransaction < StandardError; end
132132

133133
def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:,
134-
required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, interceptors: [])
134+
required_acks:, max_retries:, retry_backoff:, max_buffer_size:,
135+
max_buffer_bytesize:, partitioner:, interceptors: [])
135136
@cluster = cluster
136137
@transaction_manager = transaction_manager
137138
@logger = TaggedLogger.new(logger)
@@ -143,6 +144,7 @@ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compresso
143144
@max_buffer_size = max_buffer_size
144145
@max_buffer_bytesize = max_buffer_bytesize
145146
@compressor = compressor
147+
@partitioner = partitioner
146148
@interceptors = Interceptors.new(interceptors: interceptors, logger: logger)
147149

148150
# The set of topics that are produced to.
@@ -458,7 +460,7 @@ def assign_partitions!
458460

459461
if partition.nil?
460462
partition_count = @cluster.partitions_for(message.topic).count
461-
partition = Partitioner.partition_for_key(partition_count, message)
463+
partition = @partitioner.call(partition_count, message)
462464
end
463465

464466
@buffer.write(

lib/kafka/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# frozen_string_literal: true
22

33
module Kafka
4-
VERSION = "1.1.0"
4+
VERSION = "1.2.0"
55
end

spec/client_spec.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
context "when sasl_over_ssl is unspecified" do
2424
it "raises ArgumentError due to missing SSL config" do
2525
expect {
26-
described_class.new(client_opts)
26+
described_class.new(**client_opts)
2727
}.to raise_error(ArgumentError, /SASL authentication requires that SSL is configured/)
2828
end
2929
end
@@ -33,7 +33,7 @@
3333

3434
it "raises ArgumentError due to missing SSL config" do
3535
expect {
36-
described_class.new(client_opts)
36+
described_class.new(**client_opts)
3737
}.to raise_error(ArgumentError, /SASL authentication requires that SSL is configured/)
3838
end
3939
end
@@ -42,14 +42,14 @@
4242
before { client_opts.update(sasl_over_ssl: false) }
4343

4444
it "creates a new Kafka::Client object" do
45-
expect { described_class.new(client_opts) }.to_not raise_exception
45+
expect { described_class.new(**client_opts) }.to_not raise_exception
4646
end
4747
end
4848
end
4949
end
5050

5151
describe "#deliver_message" do
52-
subject(:client) { described_class.new(client_opts) }
52+
subject(:client) { described_class.new(**client_opts) }
5353

5454
it "requires `topic` to be a String" do
5555
expect {

0 commit comments

Comments
 (0)