Skip to content

Commit 9663bf5

Browse files
authored
Lock native only on closing and track usage (#264)
* Lock native only on closing and track usage * fix desc
1 parent 9d3160f commit 9663bf5

File tree

4 files changed

+72
-23
lines changed

4 files changed

+72
-23
lines changed

lib/rdkafka/consumer.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@ def finalizer
2727
def close
2828
return if closed?
2929
ObjectSpace.undefine_finalizer(self)
30-
@native_kafka.with_inner do |inner|
30+
31+
@native_kafka.synchronize do |inner|
3132
Rdkafka::Bindings.rd_kafka_consumer_close(inner)
3233
end
34+
3335
@native_kafka.close
3436
end
3537

lib/rdkafka/error.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,10 @@ def initialize(method)
9292
super("Illegal call to #{method.to_s} on a closed admin")
9393
end
9494
end
95+
96+
class ClosedInnerError < BaseError
97+
def initialize
98+
super("Illegal call to a closed inner librdkafka instance")
99+
end
100+
end
95101
end

lib/rdkafka/native_kafka.rb

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ def initialize(inner, run_polling_thread:)
1010
@access_mutex = Mutex.new
1111
# Lock around internal polling
1212
@poll_mutex = Mutex.new
13+
# counter for operations in progress using inner
14+
@operations_in_progress = 0
1315

1416
if run_polling_thread
1517
# Start thread to poll client for delivery callbacks,
@@ -35,10 +37,26 @@ def initialize(inner, run_polling_thread:)
3537
end
3638

3739
def with_inner
38-
return if @inner.nil?
40+
if @access_mutex.owned?
41+
@operations_in_progress += 1
42+
else
43+
@access_mutex.synchronize { @operations_in_progress += 1 }
44+
end
45+
46+
@inner.nil? ? raise(ClosedInnerError) : yield(@inner)
47+
ensure
48+
@operations_in_progress -= 1
49+
end
3950

51+
def synchronize(&block)
4052
@access_mutex.synchronize do
41-
yield @inner
53+
# Wait for any commands using the inner to finish
54+
# This can take a while on blocking operations like polling but is essential not to proceed
55+
# with certain types of operations like resources destruction as it can cause the process
56+
# to hang or crash
57+
sleep(0.01) until @operations_in_progress.zero?
58+
59+
with_inner(&block)
4260
end
4361
end
4462

@@ -53,31 +71,31 @@ def closed?
5371
def close(object_id=nil)
5472
return if closed?
5573

56-
@access_mutex.lock
74+
synchronize do
75+
# Indicate to the outside world that we are closing
76+
@closing = true
5777

58-
# Indicate to the outside world that we are closing
59-
@closing = true
78+
if @polling_thread
79+
# Indicate to polling thread that we're closing
80+
@polling_thread[:closing] = true
6081

61-
if @polling_thread
62-
# Indicate to polling thread that we're closing
63-
@polling_thread[:closing] = true
64-
65-
# Wait for the polling thread to finish up,
66-
# this can be aborted in practice if this
67-
# code runs from a finalizer.
68-
@polling_thread.join
69-
end
82+
# Wait for the polling thread to finish up,
83+
# this can be aborted in practice if this
84+
# code runs from a finalizer.
85+
@polling_thread.join
86+
end
7087

71-
# Destroy the client after locking both mutexes
72-
@poll_mutex.lock
88+
# Destroy the client after locking both mutexes
89+
@poll_mutex.lock
7390

74-
# This check prevents a race condition, where we would enter the close in two threads
75-
# and after unlocking the primary one that hold the lock but finished, ours would be unlocked
76-
# and would continue to run, trying to destroy inner twice
77-
return unless @inner
91+
# This check prevents a race condition, where we would enter the close in two threads
92+
# and after unlocking the primary one that hold the lock but finished, ours would be unlocked
93+
# and would continue to run, trying to destroy inner twice
94+
return unless @inner
7895

79-
Rdkafka::Bindings.rd_kafka_destroy(@inner)
80-
@inner = nil
96+
Rdkafka::Bindings.rd_kafka_destroy(@inner)
97+
@inner = nil
98+
end
8199
end
82100
end
83101
end

spec/rdkafka/consumer_spec.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,29 @@ def send_one_message(val)
286286
consumer.poll(100)
287287
}.to raise_error(Rdkafka::ClosedConsumerError, /poll/)
288288
end
289+
290+
context 'when there are outgoing operations in other threads' do
291+
it 'should wait and not crash' do
292+
times = []
293+
294+
# Run a long running poll
295+
thread = Thread.new do
296+
times << Time.now
297+
consumer.subscribe("empty_test_topic")
298+
times << Time.now
299+
consumer.poll(1_000)
300+
times << Time.now
301+
end
302+
303+
# Make sure it starts before we close
304+
sleep(0.1)
305+
consumer.close
306+
close_time = Time.now
307+
thread.join
308+
309+
times.each { |op_time| expect(op_time).to be < close_time }
310+
end
311+
end
289312
end
290313

291314
describe "#commit, #committed and #store_offset" do

0 commit comments

Comments
 (0)