Skip to content

Commit 6691301

Browse files
authored
Merge pull request #1131 from casperisfine/unsubscribe-lock
Refactor pubusb to allow subscribing and unsubscribing from another thread
2 parents 4f8ff4e + 3df7192 commit 6691301

File tree

8 files changed

+122
-46
lines changed

8 files changed

+122
-46
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Unreleased
22

3+
- Allow to call `subscribe`, `unsubscribe`, `psubscribe` and `punsubscribe` from a subscribed client. See #1131.
34
- Fix `redis-clustering` gem to specify the dependency on `redis`
45

56
# 5.0.0.beta3

lib/redis.rb

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -165,19 +165,27 @@ def send_blocking_command(command, timeout, &block)
165165
end
166166

167167
def _subscription(method, timeout, channels, block)
168-
if @subscription_client
169-
return @subscription_client.call_v([method] + channels)
170-
end
168+
if block
169+
if @subscription_client
170+
raise SubscriptionError, "This client is already subscribed"
171+
end
171172

172-
begin
173-
@subscription_client = SubscribedClient.new(@client.pubsub)
174-
if timeout > 0
175-
@subscription_client.send(method, timeout, *channels, &block)
176-
else
177-
@subscription_client.send(method, *channels, &block)
173+
begin
174+
@subscription_client = SubscribedClient.new(@client.pubsub)
175+
if timeout > 0
176+
@subscription_client.send(method, timeout, *channels, &block)
177+
else
178+
@subscription_client.send(method, *channels, &block)
179+
end
180+
ensure
181+
@subscription_client = nil
178182
end
179-
ensure
180-
@subscription_client = nil
183+
else
184+
unless @subscription_client
185+
raise SubscriptionError, "This client is not subscribed"
186+
end
187+
188+
@subscription_client.call_v([method].concat(channels))
181189
end
182190
end
183191
end

lib/redis/commands/pubsub.rb

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,50 +14,34 @@ def subscribed?
1414

1515
# Listen for messages published to the given channels.
1616
def subscribe(*channels, &block)
17-
synchronize do |_client|
18-
_subscription(:subscribe, 0, channels, block)
19-
end
17+
_subscription(:subscribe, 0, channels, block)
2018
end
2119

2220
# Listen for messages published to the given channels. Throw a timeout error
2321
# if there is no messages for a timeout period.
2422
def subscribe_with_timeout(timeout, *channels, &block)
25-
synchronize do |_client|
26-
_subscription(:subscribe_with_timeout, timeout, channels, block)
27-
end
23+
_subscription(:subscribe_with_timeout, timeout, channels, block)
2824
end
2925

3026
# Stop listening for messages posted to the given channels.
3127
def unsubscribe(*channels)
32-
raise "Can't unsubscribe if not subscribed." unless subscribed?
33-
34-
synchronize do |_client|
35-
_subscription(:unsubscribe, 0, channels, nil)
36-
end
28+
_subscription(:unsubscribe, 0, channels, nil)
3729
end
3830

3931
# Listen for messages published to channels matching the given patterns.
4032
def psubscribe(*channels, &block)
41-
synchronize do |_client|
42-
_subscription(:psubscribe, 0, channels, block)
43-
end
33+
_subscription(:psubscribe, 0, channels, block)
4434
end
4535

4636
# Listen for messages published to channels matching the given patterns.
4737
# Throw a timeout error if there is no messages for a timeout period.
4838
def psubscribe_with_timeout(timeout, *channels, &block)
49-
synchronize do |_client|
50-
_subscription(:psubscribe_with_timeout, timeout, channels, block)
51-
end
39+
_subscription(:psubscribe_with_timeout, timeout, channels, block)
5240
end
5341

5442
# Stop listening for messages posted to channels matching the given patterns.
5543
def punsubscribe(*channels)
56-
raise "Can't unsubscribe if not subscribed." unless subscribed?
57-
58-
synchronize do |_client|
59-
_subscription(:punsubscribe, 0, channels, nil)
60-
end
44+
_subscription(:punsubscribe, 0, channels, nil)
6145
end
6246

6347
# Inspect the state of the Pub/Sub subsystem.

lib/redis/distributed.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,7 @@ def subscribe(channel, *channels, &block)
904904

905905
# Stop listening for messages posted to the given channels.
906906
def unsubscribe(*channels)
907-
raise "Can't unsubscribe if not subscribed." unless subscribed?
907+
raise SubscriptionError, "Can't unsubscribe if not subscribed." unless subscribed?
908908

909909
@subscribed_node.unsubscribe(*channels)
910910
end

lib/redis/errors.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,7 @@ class InheritedError < BaseConnectionError
5252
# Raised when client options are invalid.
5353
class InvalidClientOptionError < BaseError
5454
end
55+
56+
class SubscriptionError < BaseError
57+
end
5558
end

lib/redis/subscribe.rb

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ class Redis
44
class SubscribedClient
55
def initialize(client)
66
@client = client
7+
@write_monitor = Monitor.new
78
end
89

910
def call_v(command)
10-
@client.call_v(command)
11+
@write_monitor.synchronize do
12+
@client.call_v(command)
13+
end
1114
end
1215

1316
def subscribe(*channels, &block)
@@ -43,14 +46,16 @@ def close
4346
def subscription(start, stop, channels, block, timeout = 0)
4447
sub = Subscription.new(&block)
4548

46-
@client.call_v([start, *channels])
49+
call_v([start, *channels])
4750
while event = @client.next_event(timeout)
4851
if event.is_a?(::RedisClient::CommandError)
4952
raise Client::ERROR_MAPPING.fetch(event.class), event.message
5053
end
5154

5255
type, *rest = event
53-
sub.callbacks[type].call(*rest)
56+
if callback = sub.callbacks[type]
57+
callback.call(*rest)
58+
end
5459
break if type == stop && rest.last == 0
5560
end
5661
# No need to unsubscribe here. The real client closes the connection
@@ -62,10 +67,7 @@ class Subscription
6267
attr :callbacks
6368

6469
def initialize
65-
@callbacks = Hash.new do |hash, key|
66-
hash[key] = ->(*_) {}
67-
end
68-
70+
@callbacks = {}
6971
yield(self)
7072
end
7173

test/distributed/publish_subscribe_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def test_other_commands_within_a_subscribe
8383
end
8484

8585
def test_subscribe_without_a_block
86-
assert_raises LocalJumpError do
86+
assert_raises Redis::SubscriptionError do
8787
r.subscribe("foo")
8888
end
8989
end

test/redis/publish_subscribe_test.rb

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,19 +205,22 @@ def test_other_commands_within_a_subscribe
205205
end
206206

207207
def test_subscribe_without_a_block
208-
assert_raises LocalJumpError do
208+
error = assert_raises Redis::SubscriptionError do
209209
r.subscribe(channel_name)
210210
end
211+
assert_includes "This client is not subscribed", error.message
211212
end
212213

213214
def test_unsubscribe_without_a_subscribe
214-
assert_raises RuntimeError do
215+
error = assert_raises Redis::SubscriptionError do
215216
r.unsubscribe
216217
end
218+
assert_includes "This client is not subscribed", error.message
217219

218-
assert_raises RuntimeError do
220+
error = assert_raises Redis::SubscriptionError do
219221
r.punsubscribe
220222
end
223+
assert_includes "This client is not subscribed", error.message
221224
end
222225

223226
def test_subscribe_past_a_timeout
@@ -264,12 +267,87 @@ def test_psubscribe_with_timeout
264267
refute received
265268
end
266269

270+
def test_unsubscribe_from_another_thread
271+
@unsubscribed = @subscribed = false
272+
@subscribed_redis = nil
273+
@messages = []
274+
@messages_count = 0
275+
thread = new_thread do |r|
276+
@subscribed_redis = r
277+
r.subscribe(channel_name) do |on|
278+
on.subscribe do |_channel, _total|
279+
@subscribed = true
280+
end
281+
282+
on.message do |channel, message|
283+
@messages << [channel, message]
284+
@messages_count += 1
285+
end
286+
287+
on.unsubscribe do |_channel, _total|
288+
@unsubscribed = true
289+
end
290+
end
291+
end
292+
293+
Thread.pass until @subscribed
294+
295+
redis.publish(channel_name, "test")
296+
Thread.pass until @messages_count == 1
297+
assert_equal [channel_name, "test"], @messages.last
298+
299+
@subscribed_redis.unsubscribe # this shouldn't block
300+
refute_nil thread.join(2)
301+
assert_equal true, @unsubscribed
302+
end
303+
304+
def test_subscribe_from_another_thread
305+
@events = []
306+
@subscribed_redis = nil
307+
thread = new_thread do |r|
308+
r.subscribe(channel_name) do |on|
309+
@subscribed_redis = r
310+
on.subscribe do |channel, _total|
311+
@events << ["subscribed", channel]
312+
end
313+
314+
on.message do |channel, message|
315+
@events << ["message", channel, message]
316+
end
317+
318+
on.unsubscribe do |channel, _total|
319+
@events << ["unsubscribed", channel]
320+
end
321+
end
322+
end
323+
324+
Thread.pass until @subscribed_redis&.subscribed?
325+
326+
redis.publish(channel_name, "test")
327+
@subscribed_redis.subscribe("#{channel_name}:2")
328+
redis.publish("#{channel_name}:2", "test-2")
329+
330+
@subscribed_redis.unsubscribe(channel_name)
331+
@subscribed_redis.unsubscribe # this shouldn't block
332+
333+
refute_nil thread.join(2)
334+
expected = [
335+
["subscribed", channel_name],
336+
["message", channel_name, "test"],
337+
["subscribed", "#{channel_name}:2"],
338+
["message", "#{channel_name}:2", "test-2"],
339+
["unsubscribed", channel_name],
340+
["unsubscribed", "#{channel_name}:2"]
341+
]
342+
assert_equal expected, @events
343+
end
344+
267345
private
268346

269347
def new_thread(&block)
270348
redis = Redis.new(OPTIONS)
271349
thread = Thread.new(redis, &block)
272-
thread.report_on_exception = false
350+
thread.report_on_exception = true
273351
@threads[thread] = redis
274352
thread
275353
end

0 commit comments

Comments
 (0)