Skip to content

Commit 141c510

Browse files
authored
use failed partition count result shorter than the cache (#404)
1 parent c662adf commit 141c510

File tree

2 files changed

+17
-8
lines changed

2 files changed

+17
-8
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
- [Enhancement] Alias `topic_name` as `topic` in the delivery report (mensfeld)
77
- [Enhancement] Provide `label` producer handler and report reference for improved traceability (mensfeld)
88
- [Enhancement] Include the error when invoking `create_result` on producer handle (mensfeld)
9-
- [Enhancement] Skip intermediate array creation on delivery report callback execution (one per message).
9+
- [Enhancement] Skip intermediate array creation on delivery report callback execution (one per message) (mensfeld).
10+
- [Enhancement] Report `-1` instead of `nil` in case `partition_count` failure (mensfeld).
1011
- [Fix] Fix return type on `#rd_kafka_poll` (mensfeld)
1112
- [Fix] `uint8_t` does not exist on Apple Silicon (mensfeld)
1213
- [Fix] Missing ACL `RD_KAFKA_RESOURCE_BROKER` constant reference (mensfeld)
14+
- [Fix] Partition cache caches invalid nil result for `PARTITIONS_COUNT_TTL` (mensfeld)
1315
- [Change] Rename `matching_acl_pattern_type` to `matching_acl_resource_pattern_type` to align the whole API (mensfeld)
1416

1517
## 0.15.0 (2023-12-03)

lib/rdkafka/producer.rb

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,16 @@ def initialize(native_kafka, partitioner_name)
4040
topic_metadata = ::Rdkafka::Metadata.new(inner, topic).topics&.first
4141
end
4242

43-
cache[topic] = [
44-
monotonic_now,
45-
topic_metadata ? topic_metadata[:partition_count] : nil
46-
]
43+
partition_count = topic_metadata ? topic_metadata[:partition_count] : -1
44+
45+
# This approach caches the failure to fetch only for 1 second. This will make sure, that
46+
# we do not cache the failure for too long but also "buys" us a bit of time in case there
47+
# would be issues in the cluster so we won't overaload it with consecutive requests
48+
cache[topic] = if partition_count.positive?
49+
[monotonic_now, partition_count]
50+
else
51+
[monotonic_now - PARTITIONS_COUNT_TTL + 5, partition_count]
52+
end
4753
end
4854
end
4955

@@ -137,14 +143,15 @@ def purge
137143
# Partition count for a given topic.
138144
#
139145
# @param topic [String] The topic name.
140-
# @return [Integer] partition count for a given topic
146+
# @return [Integer] partition count for a given topic or `-1` if it could not be obtained.
141147
#
142148
# @note If 'allow.auto.create.topics' is set to true in the broker, the topic will be
143149
# auto-created after returning nil.
144150
#
145151
# @note We cache the partition count for a given topic for given time.
146152
# This prevents us in case someone uses `partition_key` from querying for the count with
147-
# each message. Instead we query once every 30 seconds at most
153+
# each message. Instead we query once every 30 seconds at most if we have a valid partition
154+
# count or every 5 seconds in case we were not able to obtain number of partitions
148155
def partition_count(topic)
149156
closed_producer_check(__method__)
150157

@@ -194,7 +201,7 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil,
194201
if partition_key
195202
partition_count = partition_count(topic)
196203
# If the topic is not present, set to -1
197-
partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, @partitioner_name) if partition_count
204+
partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, @partitioner_name) if partition_count.positive?
198205
end
199206

200207
# If partition is nil, use -1 to let librdafka set the partition randomly or

0 commit comments

Comments
 (0)