Skip to content

Commit 66a3f04

Browse files
authored
fix: client should be able to subscribe multiple channels for Pub/Sub (#215)
1 parent b24e528 commit 66a3f04

File tree

2 files changed

+71
-12
lines changed

2 files changed

+71
-12
lines changed

lib/redis_client/cluster/pub_sub.rb

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,61 @@
33
class RedisClient
44
class Cluster
55
class PubSub
6+
MAX_THREADS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', 5))
7+
68
def initialize(router, command_builder)
79
@router = router
810
@command_builder = command_builder
9-
@pubsub = nil
11+
@pubsub_states = {}
1012
end
1113

1214
def call(*args, **kwargs)
13-
close
14-
command = @command_builder.generate(args, kwargs)
15-
@pubsub = @router.assign_node(command).pubsub
16-
@pubsub.call_v(command)
15+
_call(@command_builder.generate(args, kwargs))
1716
end
1817

1918
def call_v(command)
20-
close
21-
command = @command_builder.generate(command)
22-
@pubsub = @router.assign_node(command).pubsub
23-
@pubsub.call_v(command)
19+
_call(@command_builder.generate(command))
2420
end
2521

2622
def close
27-
@pubsub&.close
28-
@pubsub = nil
23+
@pubsub_states.each_value(&:close)
24+
@pubsub_states.clear
2925
end
3026

3127
def next_event(timeout = nil)
32-
@pubsub&.next_event(timeout)
28+
msgs = collect_messages(timeout).compact
29+
return msgs.first if msgs.size == 1
30+
31+
msgs
32+
end
33+
34+
private
35+
36+
def _call(command)
37+
node_key = @router.find_node_key(command)
38+
pubsub = if @pubsub_states.key?(node_key)
39+
@pubsub_states[node_key]
40+
else
41+
@pubsub_states[node_key] = @router.find_node(node_key).pubsub
42+
end
43+
pubsub.call_v(command)
44+
end
45+
46+
def collect_messages(timeout)
47+
@pubsub_states.each_slice(MAX_THREADS).each_with_object([]) do |chuncked_pubsub_states, acc|
48+
threads = chuncked_pubsub_states.map do |_, v|
49+
Thread.new(v) do |pubsub|
50+
Thread.current[:reply] = pubsub.next_event(timeout)
51+
rescue StandardError => e
52+
Thread.current[:reply] = e
53+
end
54+
end
55+
56+
threads.each do |t|
57+
t.join
58+
acc << t[:reply]
59+
end
60+
end
3361
end
3462
end
3563
end

test/redis_client/test_cluster.rb

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ def test_global_pubsub
186186
pubsub = @client.pubsub
187187
pubsub.call('SUBSCRIBE', "channel#{i}")
188188
assert_equal(['subscribe', "channel#{i}", 1], pubsub.next_event(0.1))
189+
pubsub.close
189190
end
190191

191192
sub = Fiber.new do |client|
@@ -195,6 +196,7 @@ def test_global_pubsub
195196
assert_equal(['subscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
196197
Fiber.yield(channel)
197198
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
199+
pubsub.close
198200
end
199201

200202
channel = sub.resume(@client)
@@ -216,6 +218,7 @@ def test_sharded_pubsub
216218
assert_equal(['ssubscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
217219
Fiber.yield(channel)
218220
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
221+
pubsub.close
219222
end
220223

221224
channel = sub.resume(@client)
@@ -224,6 +227,34 @@ def test_sharded_pubsub
224227
end
225228
end
226229

230+
def test_sharded_pubsub_with_multiple_channels
231+
if TEST_REDIS_MAJOR_VERSION < 7
232+
skip('Sharded Pub/Sub is supported by Redis 7+.')
233+
return
234+
end
235+
236+
sub = Fiber.new do |pubsub|
237+
assert_empty(pubsub.next_event(TEST_TIMEOUT_SEC))
238+
pubsub.call('SSUBSCRIBE', 'chan1')
239+
pubsub.call('SSUBSCRIBE', 'chan2')
240+
assert_equal(
241+
[['ssubscribe', 'chan1', 1], ['ssubscribe', 'chan2', 1]],
242+
pubsub.next_event(TEST_TIMEOUT_SEC).sort_by { |e| e[1] }
243+
)
244+
Fiber.yield
245+
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
246+
pubsub.close
247+
end
248+
249+
sub.resume(@client.pubsub)
250+
@client.call('SPUBLISH', 'chan1', 'hello')
251+
@client.call('SPUBLISH', 'chan2', 'world')
252+
assert_equal(
253+
[%w[smessage chan1 hello], %w[smessage chan2 world]],
254+
sub.resume.sort_by { |e| e[1] }
255+
)
256+
end
257+
227258
def test_close
228259
assert_nil(@client.close)
229260
end

0 commit comments

Comments
 (0)