Skip to content

Commit 51b69e9

Browse files
authored
Merge pull request #855 from cstyles/async-producer-improvements
Various AsyncProducer improvements
2 parents 25e24f9 + d9f9363 commit 51b69e9

File tree

1 file changed

+18
-9
lines changed

1 file changed

+18
-9
lines changed

lib/kafka/async_producer.rb

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ module Kafka
5959
# producer.shutdown
6060
#
6161
class AsyncProducer
62-
THREAD_MUTEX = Mutex.new
63-
6462
# Initializes a new AsyncProducer.
6563
#
6664
# @param sync_producer [Kafka::Producer] the synchronous producer that should
@@ -94,6 +92,8 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli
9492

9593
# The timer will no-op if the delivery interval is zero.
9694
@timer = Timer.new(queue: @queue, interval: delivery_interval)
95+
96+
@thread_mutex = Mutex.new
9797
end
9898

9999
# Produces a message to the specified topic.
@@ -131,6 +131,8 @@ def produce(value, topic:, **options)
131131
# @see Kafka::Producer#deliver_messages
132132
# @return [nil]
133133
def deliver_messages
134+
ensure_threads_running!
135+
134136
@queue << [:deliver_messages, nil]
135137

136138
nil
@@ -142,6 +144,8 @@ def deliver_messages
142144
# @see Kafka::Producer#shutdown
143145
# @return [nil]
144146
def shutdown
147+
ensure_threads_running!
148+
145149
@timer_thread && @timer_thread.exit
146150
@queue << [:shutdown, nil]
147151
@worker_thread && @worker_thread.join
@@ -152,17 +156,22 @@ def shutdown
152156
private
153157

154158
def ensure_threads_running!
155-
THREAD_MUTEX.synchronize do
156-
@worker_thread = nil unless @worker_thread && @worker_thread.alive?
157-
@worker_thread ||= Thread.new { @worker.run }
158-
end
159+
return if worker_thread_alive? && timer_thread_alive?
159160

160-
THREAD_MUTEX.synchronize do
161-
@timer_thread = nil unless @timer_thread && @timer_thread.alive?
162-
@timer_thread ||= Thread.new { @timer.run }
161+
@thread_mutex.synchronize do
162+
@worker_thread = Thread.new { @worker.run } unless worker_thread_alive?
163+
@timer_thread = Thread.new { @timer.run } unless timer_thread_alive?
163164
end
164165
end
165166

167+
def worker_thread_alive?
168+
!!@worker_thread && @worker_thread.alive?
169+
end
170+
171+
def timer_thread_alive?
172+
!!@timer_thread && @timer_thread.alive?
173+
end
174+
166175
def buffer_overflow(topic, message)
167176
@instrumenter.instrument("buffer_overflow.async_producer", {
168177
topic: topic,

0 commit comments

Comments
 (0)