Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions lib/redis_client/cluster/pub_sub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,27 @@ def spawn_worker(client, queue)
# It is a fixed size but we can modify the size with some environment variables.
# So it consumes memory 1 MB multiplied a number of workers.
Thread.new(client, queue, nil) do |pubsub, q, prev_err|
loop do
q << pubsub.next_event
prev_err = nil
rescue StandardError => e
next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message

q << e
prev_err = e
Thread.handle_interrupt(WORKER_INTERRUPTION_OPTS) do
loop do
q << pubsub.next_event
prev_err = nil
rescue StandardError => e
next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message

q << e
prev_err = e
end
end
rescue IOError
# stream closed in another thread
end
end
end

WORKER_INTERRUPTION_OPTS = { IOError => :never }.freeze
BUF_SIZE = Integer(ENV.fetch('REDIS_CLIENT_PUBSUB_BUF_SIZE', 1024))

private_constant :BUF_SIZE
private_constant :WORKER_INTERRUPTION_OPTS, :BUF_SIZE

def initialize(router, command_builder)
@router = router
Expand Down
138 changes: 67 additions & 71 deletions test/redis_client/test_cluster.rb
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to use Fiber in these test cases.

Original file line number Diff line number Diff line change
Expand Up @@ -585,53 +585,51 @@ def test_pubsub_with_wrong_command
end

def test_global_pubsub
sub = Fiber.new do |pubsub|
channel = 'my-global-channel'
pubsub.call('SUBSCRIBE', channel)
assert_equal(['subscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
Fiber.yield(channel)
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
pubsub.call('UNSUBSCRIBE')
pubsub.close
end
pubsub = @client.pubsub
channel = 'my-global-channel'

pubsub.call('SUBSCRIBE', channel)
assert_equal(['subscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))

channel = sub.resume(@client.pubsub)
publish_messages { |cli| cli.call('PUBLISH', channel, 'hello global world') }
assert_equal(['message', channel, 'hello global world'], sub.resume)
assert_equal(['message', channel, 'hello global world'], pubsub.next_event(TEST_TIMEOUT_SEC))

pubsub.call('UNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby
ensure
pubsub&.close
end

def test_global_pubsub_without_timeout
sub = Fiber.new do |pubsub|
pubsub.call('SUBSCRIBE', 'my-global-not-published-channel', 'my-global-published-channel')
want = [%w[subscribe my-global-not-published-channel], %w[subscribe my-global-published-channel]]
got = collect_messages(pubsub, size: 2, timeout: nil).map { |e| e.take(2) }.sort_by { |e| e[1].to_s }
assert_equal(want, got)
Fiber.yield('my-global-published-channel')
Fiber.yield(collect_messages(pubsub, size: 1, timeout: nil).first)
pubsub.call('UNSUBSCRIBE')
pubsub.close
end
pubsub = @client.pubsub

channel = sub.resume(@client.pubsub)
publish_messages { |cli| cli.call('PUBLISH', channel, 'hello global published world') }
assert_equal(['message', channel, 'hello global published world'], sub.resume)
pubsub.call('SUBSCRIBE', 'my-global-not-published-channel', 'my-global-published-channel')
want = [%w[subscribe my-global-not-published-channel], %w[subscribe my-global-published-channel]]
got = collect_messages(pubsub, size: 2, timeout: nil).map { |e| e.take(2) }.sort_by { |e| e[1].to_s }
assert_equal(want, got)

publish_messages { |cli| cli.call('PUBLISH', 'my-global-published-channel', 'hello global published world') }
got = collect_messages(pubsub, size: 1, timeout: nil).first
assert_equal(['message', 'my-global-published-channel', 'hello global published world'], got)

pubsub.call('UNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby
ensure
pubsub&.close
end

def test_global_pubsub_with_multiple_channels
sub = Fiber.new do |pubsub|
pubsub.call('SUBSCRIBE', *Array.new(10) { |i| "g-chan#{i}" })
got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s }
10.times { |i| assert_equal(['subscribe', "g-chan#{i}", i + 1], got[i]) }
Fiber.yield
Fiber.yield(collect_messages(pubsub, size: 10))
pubsub.call('UNSUBSCRIBE')
pubsub.close
end
pubsub = @client.pubsub

pubsub.call('SUBSCRIBE', *Array.new(10) { |i| "g-chan#{i}" })
got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s }
10.times { |i| assert_equal(['subscribe', "g-chan#{i}", i + 1], got[i]) }

sub.resume(@client.pubsub)
publish_messages { |cli| cli.pipelined { |pi| 10.times { |i| pi.call('PUBLISH', "g-chan#{i}", i) } } }
got = sub.resume.sort_by { |e| e[1].to_s }
got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s }
10.times { |i| assert_equal(['message', "g-chan#{i}", i.to_s], got[i]) }

pubsub.call('UNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby
ensure
pubsub&.close
end

def test_sharded_pubsub
Expand All @@ -640,19 +638,18 @@ def test_sharded_pubsub
return
end

sub = Fiber.new do |pubsub|
channel = 'my-sharded-channel'
pubsub.call('SSUBSCRIBE', channel)
assert_equal(['ssubscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))
Fiber.yield(channel)
Fiber.yield(pubsub.next_event(TEST_TIMEOUT_SEC))
pubsub.call('SUNSUBSCRIBE')
pubsub.close
end
pubsub = @client.pubsub
channel = 'my-sharded-channel'

pubsub.call('SSUBSCRIBE', channel)
assert_equal(['ssubscribe', channel, 1], pubsub.next_event(TEST_TIMEOUT_SEC))

channel = sub.resume(@client.pubsub)
publish_messages { |cli| cli.call('SPUBLISH', channel, 'hello sharded world') }
assert_equal(['smessage', channel, 'hello sharded world'], sub.resume)
assert_equal(['smessage', channel, 'hello sharded world'], pubsub.next_event(TEST_TIMEOUT_SEC))

pubsub.call('SUNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby
ensure
pubsub&.close
end

def test_sharded_pubsub_without_timeout
Expand All @@ -661,21 +658,21 @@ def test_sharded_pubsub_without_timeout
return
end

sub = Fiber.new do |pubsub|
pubsub.call('SSUBSCRIBE', 'my-sharded-not-published-channel')
pubsub.call('SSUBSCRIBE', 'my-sharded-published-channel')
want = [%w[ssubscribe my-sharded-not-published-channel], %w[ssubscribe my-sharded-published-channel]]
got = collect_messages(pubsub, size: 2, timeout: nil).map { |e| e.take(2) }.sort_by { |e| e[1].to_s }
assert_equal(want, got)
Fiber.yield('my-sharded-published-channel')
Fiber.yield(collect_messages(pubsub, size: 1, timeout: nil).first)
pubsub.call('SUNSUBSCRIBE')
pubsub.close
end
pubsub = @client.pubsub

channel = sub.resume(@client.pubsub)
publish_messages { |cli| cli.call('SPUBLISH', channel, 'hello sharded published world') }
assert_equal(['smessage', channel, 'hello sharded published world'], sub.resume)
pubsub.call('SSUBSCRIBE', 'my-sharded-not-published-channel')
pubsub.call('SSUBSCRIBE', 'my-sharded-published-channel')
want = [%w[ssubscribe my-sharded-not-published-channel], %w[ssubscribe my-sharded-published-channel]]
got = collect_messages(pubsub, size: 2, timeout: nil).map { |e| e.take(2) }.sort_by { |e| e[1].to_s }
assert_equal(want, got)

publish_messages { |cli| cli.call('SPUBLISH', 'my-sharded-published-channel', 'hello sharded published world') }
got = collect_messages(pubsub, size: 1, timeout: nil).first
assert_equal(['smessage', 'my-sharded-published-channel', 'hello sharded published world'], got)

pubsub.call('SUNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby
ensure
pubsub&.close
end

def test_sharded_pubsub_with_multiple_channels
Expand All @@ -684,20 +681,19 @@ def test_sharded_pubsub_with_multiple_channels
return
end

sub = Fiber.new do |pubsub|
10.times { |i| pubsub.call('SSUBSCRIBE', "s-chan#{i}") }
got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s }
10.times { |i| assert_equal(['ssubscribe', "s-chan#{i}"], got[i].take(2)) }
Fiber.yield
Fiber.yield(collect_messages(pubsub, size: 10))
pubsub.call('SUNSUBSCRIBE')
pubsub.close
end
pubsub = @client.pubsub

10.times { |i| pubsub.call('SSUBSCRIBE', "s-chan#{i}") }
got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s }
10.times { |i| assert_equal(['ssubscribe', "s-chan#{i}"], got[i].take(2)) }

sub.resume(@client.pubsub)
publish_messages { |cli| cli.pipelined { |pi| 10.times { |i| pi.call('SPUBLISH', "s-chan#{i}", i) } } }
got = sub.resume.sort_by { |e| e[1].to_s }
got = collect_messages(pubsub, size: 10).sort_by { |e| e[1].to_s }
10.times { |i| assert_equal(['smessage', "s-chan#{i}", i.to_s], got[i]) }

pubsub.call('SUNSUBSCRIBE') unless RUBY_ENGINE == 'jruby' # FIXME: too slow in jruby
ensure
pubsub&.close
end

def test_other_pubsub_commands
Expand Down