Skip to content

Commit c8255e8

Browse files
authored
fix: pubsub worker stuck in jruby (#438)
1 parent 785be7a commit c8255e8

File tree

2 files changed

+81
-80
lines changed

2 files changed

+81
-80
lines changed

lib/redis_client/cluster/pub_sub.rb

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,27 @@ def spawn_worker(client, queue)
3535
# It is a fixed size but we can modify the size with some environment variables.
3636
# So it consumes memory 1 MB multiplied a number of workers.
3737
Thread.new(client, queue, nil) do |pubsub, q, prev_err|
38-
loop do
39-
q << pubsub.next_event
40-
prev_err = nil
41-
rescue StandardError => e
42-
next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message
43-
44-
q << e
45-
prev_err = e
38+
Thread.handle_interrupt(WORKER_INTERRUPTION_OPTS) do
39+
loop do
40+
q << pubsub.next_event
41+
prev_err = nil
42+
rescue StandardError => e
43+
next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message
44+
45+
q << e
46+
prev_err = e
47+
end
4648
end
49+
rescue IOError
50+
# stream closed in another thread
4751
end
4852
end
4953
end
5054

55+
WORKER_INTERRUPTION_OPTS = { IOError => :never }.freeze
5156
BUF_SIZE = Integer(ENV.fetch('REDIS_CLIENT_PUBSUB_BUF_SIZE', 1024))
5257

53-
private_constant :BUF_SIZE
58+
private_constant :WORKER_INTERRUPTION_OPTS, :BUF_SIZE
5459

5560
def initialize(router, command_builder)
5661
@router = router

test/redis_client/test_cluster.rb

Lines changed: 67 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -585,53 +585,51 @@ def test_pubsub_with_wrong_command
585585
end
586586

587587
def test_global_pubsub
588-
sub = Fiber.new do |pubsub|
589-
channel = 'my-global-channel'
590-
pubsub.call('SUBSCRIBE', channel)
591-
assert_equal(['subscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
592-
Fiber.yield(channel)
593-
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
594-
pubsub.call('UNSUBSCRIBE')
595-
pubsub.close
596-
end
588+
pubsub = @client.pubsub
589+
channel = 'my-global-channel'
590+
591+
pubsub.call('SUBSCRIBE', channel)
592+
assert_equal(['subscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
597593

598-
channel = sub.resume(@client.pubsub)
599594
publish_messages { |cli| cli.call('PUBLISH', channel, 'hello global world') }
600-
assert_equal(['message', channel, 'hello global world'], sub.resume)
595+
assert_equal(['message', channel, 'hello global world'], pubsub.next_event(TEST_TIMEOUT_SEC))
596+
597+
pubsub.call('UNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby
598+
ensure
599+
pubsub&.close
601600
end
602601

603602
def test_global_pubsub_without_timeout
604-
sub = Fiber.new do |pubsub|
605-
pubsub.call('SUBSCRIBE', 'my-global-not-published-channel', 'my-global-published-channel')
606-
want = [%w[subscribe my-global-not-published-channel], %w[subscribe my-global-published-channel]]
607-
got = collect_messages(pubsub, size: 2, timeout: nil).map { |e| e.take(2) }.sort_by { |e| e[1].to_s }
608-
assert_equal(want, got)
609-
Fiber.yield('my-global-published-channel')
610-
Fiber.yield(collect_messages(pubsub, size: 1, timeout: nil).first)
611-
pubsub.call('UNSUBSCRIBE')
612-
pubsub.close
613-
end
603+
pubsub = @client.pubsub
614604

615-
channel = sub.resume(@client.pubsub)
616-
publish_messages { |cli| cli.call('PUBLISH', channel, 'hello global published world') }
617-
assert_equal(['message', channel, 'hello global published world'], sub.resume)
605+
pubsub.call('SUBSCRIBE', 'my-global-not-published-channel', 'my-global-published-channel')
606+
want = [%w[subscribe my-global-not-published-channel], %w[subscribe my-global-published-channel]]
607+
got = collect_messages(pubsub, size: 2, timeout: nil).map { |e| e.take(2) }.sort_by { |e| e[1].to_s }
608+
assert_equal(want, got)
609+
610+
publish_messages { |cli| cli.call('PUBLISH', 'my-global-published-channel', 'hello global published world') }
611+
got = collect_messages(pubsub, size: 1, timeout: nil).first
612+
assert_equal(['message', 'my-global-published-channel', 'hello global published world'], got)
613+
614+
pubsub.call('UNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby
615+
ensure
616+
pubsub&.close
618617
end
619618

620619
def test_global_pubsub_with_multiple_channels
621-
sub = Fiber.new do |pubsub|
622-
pubsub.call('SUBSCRIBE', *Array.new(10) { |i| "g-chan#{i}" })
623-
got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s }
624-
10.times { |i| assert_equal(['subscribe', "g-chan#{i}", i + 1], got[i]) }
625-
Fiber.yield
626-
Fiber.yield(collect_messages(pubsub, size: 10))
627-
pubsub.call('UNSUBSCRIBE')
628-
pubsub.close
629-
end
620+
pubsub = @client.pubsub
621+
622+
pubsub.call('SUBSCRIBE', *Array.new(10) { |i| "g-chan#{i}" })
623+
got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s }
624+
10.times { |i| assert_equal(['subscribe', "g-chan#{i}", i + 1], got[i]) }
630625

631-
sub.resume(@client.pubsub)
632626
publish_messages { |cli| cli.pipelined { |pi| 10.times { |i| pi.call('PUBLISH', "g-chan#{i}", i) } } }
633-
got = sub.resume.sort_by { |e| e[1].to_s }
627+
got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s }
634628
10.times { |i| assert_equal(['message', "g-chan#{i}", i.to_s], got[i]) }
629+
630+
pubsub.call('UNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby
631+
ensure
632+
pubsub&.close
635633
end
636634

637635
def test_sharded_pubsub
@@ -640,19 +638,18 @@ def test_sharded_pubsub
640638
return
641639
end
642640

643-
sub = Fiber.new do |pubsub|
644-
channel = 'my-sharded-channel'
645-
pubsub.call('SSUBSCRIBE', channel)
646-
assert_equal(['ssubscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
647-
Fiber.yield(channel)
648-
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
649-
pubsub.call('SUNSUBSCRIBE')
650-
pubsub.close
651-
end
641+
pubsub = @client.pubsub
642+
channel = 'my-sharded-channel'
643+
644+
pubsub.call('SSUBSCRIBE', channel)
645+
assert_equal(['ssubscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
652646

653-
channel = sub.resume(@client.pubsub)
654647
publish_messages { |cli| cli.call('SPUBLISH', channel, 'hello sharded world') }
655-
assert_equal(['smessage', channel, 'hello sharded world'], sub.resume)
648+
assert_equal(['smessage', channel, 'hello sharded world'], pubsub.next_event(TEST_TIMEOUT_SEC))
649+
650+
pubsub.call('SUNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby
651+
ensure
652+
pubsub&.close
656653
end
657654

658655
def test_sharded_pubsub_without_timeout
@@ -661,21 +658,21 @@ def test_sharded_pubsub_without_timeout
661658
return
662659
end
663660

664-
sub = Fiber.new do |pubsub|
665-
pubsub.call('SSUBSCRIBE', 'my-sharded-not-published-channel')
666-
pubsub.call('SSUBSCRIBE', 'my-sharded-published-channel')
667-
want = [%w[ssubscribe my-sharded-not-published-channel], %w[ssubscribe my-sharded-published-channel]]
668-
got = collect_messages(pubsub, size: 2, timeout: nil).map { |e| e.take(2) }.sort_by { |e| e[1].to_s }
669-
assert_equal(want, got)
670-
Fiber.yield('my-sharded-published-channel')
671-
Fiber.yield(collect_messages(pubsub, size: 1, timeout: nil).first)
672-
pubsub.call('SUNSUBSCRIBE')
673-
pubsub.close
674-
end
661+
pubsub = @client.pubsub
675662

676-
channel = sub.resume(@client.pubsub)
677-
publish_messages { |cli| cli.call('SPUBLISH', channel, 'hello sharded published world') }
678-
assert_equal(['smessage', channel, 'hello sharded published world'], sub.resume)
663+
pubsub.call('SSUBSCRIBE', 'my-sharded-not-published-channel')
664+
pubsub.call('SSUBSCRIBE', 'my-sharded-published-channel')
665+
want = [%w[ssubscribe my-sharded-not-published-channel], %w[ssubscribe my-sharded-published-channel]]
666+
got = collect_messages(pubsub, size: 2, timeout: nil).map { |e| e.take(2) }.sort_by { |e| e[1].to_s }
667+
assert_equal(want, got)
668+
669+
publish_messages { |cli| cli.call('SPUBLISH', 'my-sharded-published-channel', 'hello sharded published world') }
670+
got = collect_messages(pubsub, size: 1, timeout: nil).first
671+
assert_equal(['smessage', 'my-sharded-published-channel', 'hello sharded published world'], got)
672+
673+
pubsub.call('SUNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby
674+
ensure
675+
pubsub&.close
679676
end
680677

681678
def test_sharded_pubsub_with_multiple_channels
@@ -684,20 +681,19 @@ def test_sharded_pubsub_with_multiple_channels
684681
return
685682
end
686683

687-
sub = Fiber.new do |pubsub|
688-
10.times { |i| pubsub.call('SSUBSCRIBE', "s-chan#{i}") }
689-
got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s }
690-
10.times { |i| assert_equal(['ssubscribe', "s-chan#{i}"], got[i].take(2)) }
691-
Fiber.yield
692-
Fiber.yield(collect_messages(pubsub, size: 10))
693-
pubsub.call('SUNSUBSCRIBE')
694-
pubsub.close
695-
end
684+
pubsub = @client.pubsub
685+
686+
10.times { |i| pubsub.call('SSUBSCRIBE', "s-chan#{i}") }
687+
got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s }
688+
10.times { |i| assert_equal(['ssubscribe', "s-chan#{i}"], got[i].take(2)) }
696689

697-
sub.resume(@client.pubsub)
698690
publish_messages { |cli| cli.pipelined { |pi| 10.times { |i| pi.call('SPUBLISH', "s-chan#{i}", i) } } }
699-
got = sub.resume.sort_by { |e| e[1].to_s }
691+
got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s }
700692
10.times { |i| assert_equal(['smessage', "s-chan#{i}", i.to_s], got[i]) }
693+
694+
pubsub.call('SUNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby
695+
ensure
696+
pubsub&.close
701697
end
702698

703699
def test_other_pubsub_commands

0 commit comments

Comments
 (0)