Skip to content

Commit 296779f

Browse files
authored
allow key and partition key on new topics (#611)
1 parent 78e4e98 commit 296779f

File tree

3 files changed

+50
-0
lines changed

3 files changed

+50
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Rdkafka Changelog
22

33
## 0.21.1 (Unreleased)
4+
- [Enhancement] Allow for producing to non-existing topics with `key` and `partition_key` present.
45
- [Enhancement] Replace TTL-based partition count cache with a global cache that reuses `librdkafka` statistics data when possible.
56
- [Enhancement] Support producing and consuming of headers with mulitple values (KIP-82).
67
- [Enhancement] Allow native Kafka customization poll time.

lib/rdkafka/producer.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,13 @@ def partition_count(topic)
239239

240240
topic_metadata ? topic_metadata[:partition_count] : -1
241241
end
242+
rescue Rdkafka::RdkafkaError => e
243+
# If the topic does not exist, it will be created or if not allowed another error will be
244+
# raised. We here return -1 so this can happen without early error happening on metadata
245+
# discovery.
246+
return -1 if e.code == :unknown_topic_or_part
247+
248+
raise(e)
242249
end
243250

244251
# Produces a message to a Kafka topic. The message is added to rdkafka's queue, call {DeliveryHandle#wait wait} on the returned delivery handle to make sure it is delivered.

spec/rdkafka/producer_spec.rb

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,48 @@ def call(_, handle)
364364
expect(message.key).to eq "key utf8"
365365
end
366366

367+
it "should produce a message to a non-existing topic with key and partition key" do
368+
new_topic = "it-#{SecureRandom.uuid}"
369+
370+
handle = producer.produce(
371+
# Needs to be a new topic each time
372+
topic: new_topic,
373+
payload: "payload",
374+
key: "key",
375+
partition_key: "partition_key",
376+
label: "label"
377+
)
378+
379+
# Should be pending at first
380+
expect(handle.pending?).to be true
381+
expect(handle.label).to eq "label"
382+
383+
# Check delivery handle and report
384+
report = handle.wait(max_wait_timeout: 5)
385+
expect(handle.pending?).to be false
386+
expect(report).not_to be_nil
387+
expect(report.partition).to eq 0
388+
expect(report.offset).to be >= 0
389+
expect(report.label).to eq "label"
390+
391+
# Flush and close producer
392+
producer.flush
393+
producer.close
394+
395+
# Consume message and verify its content
396+
message = wait_for_message(
397+
topic: new_topic,
398+
delivery_report: report,
399+
consumer: consumer
400+
)
401+
expect(message.partition).to eq 0
402+
expect(message.payload).to eq "payload"
403+
expect(message.key).to eq "key"
404+
# Since api.version.request is on by default we will get
405+
# the message creation timestamp if it's not set.
406+
expect(message.timestamp).to be_within(10).of(Time.now)
407+
end
408+
367409
context "timestamp" do
368410
it "should raise a type error if not nil, integer or time" do
369411
expect {

0 commit comments

Comments
 (0)