diff --git a/lib/redis_client/cluster/pub_sub.rb b/lib/redis_client/cluster/pub_sub.rb index b4cfd61..fa9004e 100644 --- a/lib/redis_client/cluster/pub_sub.rb +++ b/lib/redis_client/cluster/pub_sub.rb @@ -35,22 +35,27 @@ def spawn_worker(client, queue) # It is a fixed size but we can modify the size with some environment variables. # So it consumes memory 1 MB multiplied a number of workers. Thread.new(client, queue, nil) do |pubsub, q, prev_err| - loop do - q << pubsub.next_event - prev_err = nil - rescue StandardError => e - next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message - - q << e - prev_err = e + Thread.handle_interrupt(WORKER_INTERRUPTION_OPTS) do + loop do + q << pubsub.next_event + prev_err = nil + rescue StandardError => e + next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message + + q << e + prev_err = e + end end + rescue IOError + # stream closed in another thread end end end + WORKER_INTERRUPTION_OPTS = { IOError => :never }.freeze BUF_SIZE = Integer(ENV.fetch('REDIS_CLIENT_PUBSUB_BUF_SIZE', 1024)) - private_constant :BUF_SIZE + private_constant :WORKER_INTERRUPTION_OPTS, :BUF_SIZE def initialize(router, command_builder) @router = router diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 62f9a31..5072e10 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -585,53 +585,51 @@ def test_pubsub_with_wrong_command end def test_global_pubsub - sub = Fiber.new do |pubsub| - channel = 'my-global-channel' - pubsub.call('SUBSCRIBE', channel) - assert_equal(['subscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC)) - Fiber.yield(channel) - Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC)) - pubsub.call('UNSUBSCRIBE') - pubsub.close - end + pubsub = @client.pubsub + channel = 'my-global-channel' + + pubsub.call('SUBSCRIBE', channel) + assert_equal(['subscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC)) - channel = sub.resume(@client.pubsub) publish_messages { |cli| cli.call('PUBLISH', channel, 'hello global world') } - assert_equal(['message', channel, 'hello global world'], sub.resume) + assert_equal(['message', channel, 'hello global world'], pubsub.next_event(TEST_TIMEOUT_SEC)) + + pubsub.call('UNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby + ensure + pubsub&.close end def test_global_pubsub_without_timeout - sub = Fiber.new do |pubsub| - pubsub.call('SUBSCRIBE', 'my-global-not-published-channel', 'my-global-published-channel') - want = [%w[subscribe my-global-not-published-channel], %w[subscribe my-global-published-channel]] - got = collect_messages(pubsub, size: 2, timeout: nil).map { |e| e.take(2) }.sort_by { |e| e[1].to_s } - assert_equal(want, got) - Fiber.yield('my-global-published-channel') - Fiber.yield(collect_messages(pubsub, size: 1, timeout: nil).first) - pubsub.call('UNSUBSCRIBE') - pubsub.close - end + pubsub = @client.pubsub - channel = sub.resume(@client.pubsub) - publish_messages { |cli| cli.call('PUBLISH', channel, 'hello global published world') } - assert_equal(['message', channel, 'hello global published world'], sub.resume) + pubsub.call('SUBSCRIBE', 'my-global-not-published-channel', 'my-global-published-channel') + want = [%w[subscribe my-global-not-published-channel], %w[subscribe my-global-published-channel]] + got = collect_messages(pubsub, size: 2, timeout: nil).map { |e| e.take(2) }.sort_by { |e| e[1].to_s } + assert_equal(want, got) + + publish_messages { |cli| cli.call('PUBLISH', 'my-global-published-channel', 'hello global published world') } + got = collect_messages(pubsub, size: 1, timeout: nil).first + assert_equal(['message', 'my-global-published-channel', 'hello global published world'], got) + + pubsub.call('UNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby + ensure + pubsub&.close end def test_global_pubsub_with_multiple_channels - sub = Fiber.new do |pubsub| - pubsub.call('SUBSCRIBE', *Array.new(10) { |i| "g-chan#{i}" }) - got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s } - 10.times { |i| assert_equal(['subscribe', "g-chan#{i}", i + 1], got[i]) } - Fiber.yield - Fiber.yield(collect_messages(pubsub, size: 10)) - pubsub.call('UNSUBSCRIBE') - pubsub.close - end + pubsub = @client.pubsub + + pubsub.call('SUBSCRIBE', *Array.new(10) { |i| "g-chan#{i}" }) + got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s } + 10.times { |i| assert_equal(['subscribe', "g-chan#{i}", i + 1], got[i]) } - sub.resume(@client.pubsub) publish_messages { |cli| cli.pipelined { |pi| 10.times { |i| pi.call('PUBLISH', "g-chan#{i}", i) } } } - got = sub.resume.sort_by { |e| e[1].to_s } + got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s } 10.times { |i| assert_equal(['message', "g-chan#{i}", i.to_s], got[i]) } + + pubsub.call('UNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby + ensure + pubsub&.close end def test_sharded_pubsub @@ -640,19 +638,18 @@ def test_sharded_pubsub return end - sub = Fiber.new do |pubsub| - channel = 'my-sharded-channel' - pubsub.call('SSUBSCRIBE', channel) - assert_equal(['ssubscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC)) - Fiber.yield(channel) - Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC)) - pubsub.call('SUNSUBSCRIBE') - pubsub.close - end + pubsub = @client.pubsub + channel = 'my-sharded-channel' + + pubsub.call('SSUBSCRIBE', channel) + assert_equal(['ssubscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC)) - channel = sub.resume(@client.pubsub) publish_messages { |cli| cli.call('SPUBLISH', channel, 'hello sharded world') } - assert_equal(['smessage', channel, 'hello sharded world'], sub.resume) + assert_equal(['smessage', channel, 'hello sharded world'], pubsub.next_event(TEST_TIMEOUT_SEC)) + + pubsub.call('SUNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby + ensure + pubsub&.close end def test_sharded_pubsub_without_timeout @@ -661,21 +658,21 @@ def test_sharded_pubsub_without_timeout return end - sub = Fiber.new do |pubsub| - pubsub.call('SSUBSCRIBE', 'my-sharded-not-published-channel') - pubsub.call('SSUBSCRIBE', 'my-sharded-published-channel') - want = [%w[ssubscribe my-sharded-not-published-channel], %w[ssubscribe my-sharded-published-channel]] - got = collect_messages(pubsub, size: 2, timeout: nil).map { |e| e.take(2) }.sort_by { |e| e[1].to_s } - assert_equal(want, got) - Fiber.yield('my-sharded-published-channel') - Fiber.yield(collect_messages(pubsub, size: 1, timeout: nil).first) - pubsub.call('SUNSUBSCRIBE') - pubsub.close - end + pubsub = @client.pubsub - channel = sub.resume(@client.pubsub) - publish_messages { |cli| cli.call('SPUBLISH', channel, 'hello sharded published world') } - assert_equal(['smessage', channel, 'hello sharded published world'], sub.resume) + pubsub.call('SSUBSCRIBE', 'my-sharded-not-published-channel') + pubsub.call('SSUBSCRIBE', 'my-sharded-published-channel') + want = [%w[ssubscribe my-sharded-not-published-channel], %w[ssubscribe my-sharded-published-channel]] + got = collect_messages(pubsub, size: 2, timeout: nil).map { |e| e.take(2) }.sort_by { |e| e[1].to_s } + assert_equal(want, got) + + publish_messages { |cli| cli.call('SPUBLISH', 'my-sharded-published-channel', 'hello sharded published world') } + got = collect_messages(pubsub, size: 1, timeout: nil).first + assert_equal(['smessage', 'my-sharded-published-channel', 'hello sharded published world'], got) + + pubsub.call('SUNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby + ensure + pubsub&.close end def test_sharded_pubsub_with_multiple_channels @@ -684,20 +681,19 @@ def test_sharded_pubsub_with_multiple_channels return end - sub = Fiber.new do |pubsub| - 10.times { |i| pubsub.call('SSUBSCRIBE', "s-chan#{i}") } - got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s } - 10.times { |i| assert_equal(['ssubscribe', "s-chan#{i}"], got[i].take(2)) } - Fiber.yield - Fiber.yield(collect_messages(pubsub, size: 10)) - pubsub.call('SUNSUBSCRIBE') - pubsub.close - end + pubsub = @client.pubsub + + 10.times { |i| pubsub.call('SSUBSCRIBE', "s-chan#{i}") } + got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s } + 10.times { |i| assert_equal(['ssubscribe', "s-chan#{i}"], got[i].take(2)) } - sub.resume(@client.pubsub) publish_messages { |cli| cli.pipelined { |pi| 10.times { |i| pi.call('SPUBLISH', "s-chan#{i}", i) } } } - got = sub.resume.sort_by { |e| e[1].to_s } + got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s } 10.times { |i| assert_equal(['smessage', "s-chan#{i}", i.to_s], got[i]) } + + pubsub.call('SUNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby + ensure + pubsub&.close end def test_other_pubsub_commands