Skip to content

Commit 64871cf

Browse files
authored
Merge pull request #489 from michaelsauter/feature/async-overflow-message
Align buffer overflow erros with sync producer
2 parents 9994566 + 2c52870 commit 64871cf

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

lib/kafka/async_producer.rb

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,10 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli
100100
def produce(value, topic:, **options)
101101
ensure_threads_running!
102102

103-
buffer_overflow(topic) if @queue.size >= @max_queue_size
103+
if @queue.size >= @max_queue_size
104+
buffer_overflow topic,
105+
"Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached"
106+
end
104107

105108
args = [value, **options.merge(topic: topic)]
106109
@queue << [:produce, args]
@@ -148,14 +151,12 @@ def ensure_threads_running!
148151
@timer_thread ||= Thread.new { @timer.run }
149152
end
150153

151-
def buffer_overflow(topic)
154+
def buffer_overflow(topic, message)
152155
@instrumenter.instrument("buffer_overflow.async_producer", {
153156
topic: topic,
154157
})
155158

156-
@logger.error "Cannot produce message to #{topic}, max queue size (#{@max_queue_size}) reached"
157-
158-
raise BufferOverflow
159+
raise BufferOverflow, message
159160
end
160161

161162
class Timer

0 commit comments

Comments
 (0)