Skip to content

Commit 8e91ad7

Browse files
authored
fix: broken behavior for Pub/Sub (#217)
1 parent c9d90d2 commit 8e91ad7

File tree

2 files changed

+110
-41
lines changed

2 files changed

+110
-41
lines changed

lib/redis_client/cluster/pub_sub.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ def close
2525
end
2626

2727
def next_event(timeout = nil)
28+
return if @pubsub_states.empty?
29+
2830
msgs = collect_messages(timeout).compact
29-
return msgs.first if msgs.size == 1
31+
return msgs.first if msgs.size < 2
3032

3133
msgs
3234
end
@@ -43,7 +45,7 @@ def _call(command)
4345
pubsub.call_v(command)
4446
end
4547

46-
def collect_messages(timeout)
48+
def collect_messages(timeout) # rubocop:disable Metrics/AbcSize
4749
@pubsub_states.each_slice(MAX_THREADS).each_with_object([]) do |chuncked_pubsub_states, acc|
4850
threads = chuncked_pubsub_states.map do |_, v|
4951
Thread.new(v) do |pubsub|
@@ -55,7 +57,7 @@ def collect_messages(timeout)
5557

5658
threads.each do |t|
5759
t.join
58-
acc << t[:reply]
60+
acc << t[:reply] unless t[:reply].nil?
5961
end
6062
end
6163
end

test/redis_client/test_cluster.rb

Lines changed: 105 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -181,27 +181,58 @@ def test_pipelined_with_many_commands
181181
results.each_with_index { |got, i| assert_equal(i.to_s, got) }
182182
end
183183

184-
def test_global_pubsub
185-
10.times do |i|
186-
pubsub = @client.pubsub
187-
pubsub.call('SUBSCRIBE', "channel#{i}")
188-
assert_equal(['subscribe', "channel#{i}", 1], pubsub.next_event(0.1))
189-
pubsub.close
190-
end
184+
def test_pubsub_without_subscription
185+
pubsub = @client.pubsub
186+
assert_nil(pubsub.next_event(TEST_TIMEOUT_SEC))
187+
pubsub.close
188+
end
191189

192-
sub = Fiber.new do |client|
193-
channel = 'my-channel'
194-
pubsub = client.pubsub
190+
def test_global_pubsub
191+
sub = Fiber.new do |pubsub|
192+
channel = 'my-global-channel'
195193
pubsub.call('SUBSCRIBE', channel)
196194
assert_equal(['subscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
197195
Fiber.yield(channel)
198196
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
197+
pubsub.call('UNSUBSCRIBE')
199198
pubsub.close
200199
end
201200

202-
channel = sub.resume(@client)
203-
@client.call('PUBLISH', channel, 'hello world')
204-
assert_equal(['message', channel, 'hello world'], sub.resume)
201+
channel = sub.resume(@client.pubsub)
202+
publish_messages do |cli|
203+
cli.call('PUBLISH', channel, 'hello global world')
204+
end
205+
206+
assert_equal(['message', channel, 'hello global world'], sub.resume)
207+
end
208+
209+
def test_global_pubsub_with_multiple_channels
210+
if hiredis_used?
211+
skip('FIXME: SEGV occured if using hiredis driver')
212+
return
213+
end
214+
215+
sub = Fiber.new do |pubsub|
216+
pubsub.call('SUBSCRIBE', *Array.new(10) { |i| "g-chan#{i}" })
217+
assert_equal(
218+
Array.new(10) { |i| ['subscribe', "g-chan#{i}", i + 1] },
219+
collect_messages(pubsub).sort_by { |e| e[1].to_s }
220+
)
221+
Fiber.yield
222+
Fiber.yield(collect_messages(pubsub))
223+
pubsub.call('UNSUBSCRIBE')
224+
pubsub.close
225+
end
226+
227+
sub.resume(@client.pubsub)
228+
publish_messages do |cli|
229+
cli.pipelined { |pi| 10.times { |i| pi.call('PUBLISH', "g-chan#{i}", i) } }
230+
end
231+
232+
assert_equal(
233+
Array.new(10) { |i| ['message', "g-chan#{i}", i.to_s] },
234+
sub.resume.sort_by { |e| e[1].to_s }
235+
)
205236
end
206237

207238
def test_sharded_pubsub
@@ -210,21 +241,22 @@ def test_sharded_pubsub
210241
return
211242
end
212243

213-
10.times do |i|
214-
sub = Fiber.new do |client|
215-
channel = "my-channel-#{i}"
216-
pubsub = client.pubsub
217-
pubsub.call('SSUBSCRIBE', channel)
218-
assert_equal(['ssubscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
219-
Fiber.yield(channel)
220-
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
221-
pubsub.close
222-
end
244+
sub = Fiber.new do |pubsub|
245+
channel = 'my-sharded-channel'
246+
pubsub.call('SSUBSCRIBE', channel)
247+
assert_equal(['ssubscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
248+
Fiber.yield(channel)
249+
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
250+
pubsub.call('SUNSUBSCRIBE')
251+
pubsub.close
252+
end
223253

224-
channel = sub.resume(@client)
225-
@client.call('SPUBLISH', channel, "hello world #{i}")
226-
assert_equal(['smessage', channel, "hello world #{i}"], sub.resume)
254+
channel = sub.resume(@client.pubsub)
255+
publish_messages do |cli|
256+
cli.call('SPUBLISH', channel, 'hello sharded world')
227257
end
258+
259+
assert_equal(['smessage', channel, 'hello sharded world'], sub.resume)
228260
end
229261

230262
def test_sharded_pubsub_with_multiple_channels
@@ -233,25 +265,29 @@ def test_sharded_pubsub_with_multiple_channels
233265
return
234266
end
235267

268+
if hiredis_used?
269+
skip('FIXME: SEGV occured if using hiredis driver')
270+
return
271+
end
272+
236273
sub = Fiber.new do |pubsub|
237-
assert_empty(pubsub.next_event(TEST_TIMEOUT_SEC))
238-
pubsub.call('SSUBSCRIBE', 'chan1')
239-
pubsub.call('SSUBSCRIBE', 'chan2')
240-
assert_equal(
241-
[['ssubscribe', 'chan1', 1], ['ssubscribe', 'chan2', 1]],
242-
pubsub.next_event(TEST_TIMEOUT_SEC).sort_by { |e| e[1] }
243-
)
274+
10.times { |i| pubsub.call('SSUBSCRIBE', "s-chan#{i}") }
275+
got = collect_messages(pubsub).sort_by { |e| e[1].to_s }
276+
10.times { |i| assert_equal(['ssubscribe', "s-chan#{i}"], got[i].take(2)) }
244277
Fiber.yield
245-
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
278+
Fiber.yield(collect_messages(pubsub))
279+
pubsub.call('SUNSUBSCRIBE')
246280
pubsub.close
247281
end
248282

249283
sub.resume(@client.pubsub)
250-
@client.call('SPUBLISH', 'chan1', 'hello')
251-
@client.call('SPUBLISH', 'chan2', 'world')
284+
publish_messages do |cli|
285+
cli.pipelined { |pi| 10.times { |i| pi.call('SPUBLISH', "s-chan#{i}", i) } }
286+
end
287+
252288
assert_equal(
253-
[%w[smessage chan1 hello], %w[smessage chan2 world]],
254-
sub.resume.sort_by { |e| e[1] }
289+
Array.new(10) { |i| ['smessage', "s-chan#{i}", i.to_s] },
290+
sub.resume.sort_by { |e| e[1].to_s }
255291
)
256292
end
257293

@@ -355,6 +391,37 @@ def wait_for_replication
355391
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
356392
@client&.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
357393
end
394+
395+
def collect_messages(pubsub, max_attempts: 30, timeout: 1.0)
396+
messages = []
397+
attempts = 0
398+
loop do
399+
attempts += 1
400+
break if attempts > max_attempts
401+
402+
reply = pubsub.next_event(timeout)
403+
break if reply.nil?
404+
405+
if reply.first.is_a?(Array)
406+
messages += reply
407+
else
408+
messages << reply
409+
end
410+
end
411+
412+
messages
413+
end
414+
415+
def publish_messages
416+
client = new_test_client
417+
yield client
418+
client.close
419+
end
420+
421+
def hiredis_used?
422+
::RedisClient.const_defined?(:HiredisConnection) &&
423+
::RedisClient.default_driver == ::RedisClient::HiredisConnection
424+
end
358425
end
359426

360427
class PrimaryOnly < TestingWrapper

0 commit comments

Comments
 (0)