Skip to content

Commit 8b3bf9c

Browse files
authored
Make decrement thread-safe for any Ruby impl (#267)
1 parent 40824ec commit 8b3bf9c

File tree

1 file changed

+15
-1
lines changed

1 file changed

+15
-1
lines changed

lib/rdkafka/native_kafka.rb

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,20 @@ def initialize(inner, run_polling_thread:)
1010
@access_mutex = Mutex.new
1111
# Lock around internal polling
1212
@poll_mutex = Mutex.new
13+
# Lock around decrementing the operations in progress counter
14+
# We have two mutexes - one for increment (`@access_mutex`) and one for decrement mutex
15+
# because they serve different purposes:
16+
#
17+
# - `@access_mutex` allows us to lock the execution and make sure that any operation within
18+
# the `#synchronize` is the only one running and that there are no other running
19+
# operations.
20+
# - `@decrement_mutex` ensures, that our decrement operation is thread-safe for any Ruby
21+
# implementation.
22+
#
23+
# We do not use the same mutex, because it could create a deadlock when an already
24+
# incremented operation cannot decrement because `@access_lock` is now owned by a different
25+
# thread in a synchronized mode and the synchronized mode is waiting on the decrement.
26+
@decrement_mutex = Mutex.new
1327
# counter for operations in progress using inner
1428
@operations_in_progress = 0
1529

@@ -45,7 +59,7 @@ def with_inner
4559

4660
@inner.nil? ? raise(ClosedInnerError) : yield(@inner)
4761
ensure
48-
@operations_in_progress -= 1
62+
@decrement_mutex.synchronize { @operations_in_progress -= 1 }
4963
end
5064

5165
def synchronize(&block)

0 commit comments

Comments
 (0)