Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions lib/redis_client/cluster/pub_sub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ class RedisClient
class Cluster
class PubSub
class State
IO_ERROR_NEVER = { IOError => :never }.freeze
IO_ERROR_ON_BLOCKING = { IOError => :on_blocking }.freeze
private_constant :IO_ERROR_NEVER, :IO_ERROR_ON_BLOCKING

def initialize(client, queue)
@client = client
@worker = nil
Expand Down Expand Up @@ -35,14 +39,14 @@ def spawn_worker(client, queue)
# It is a fixed size but we can modify the size with some environment variables.
# So it consumes memory 1 MB multiplied a number of workers.
Thread.new(client, queue, nil) do |pubsub, q, prev_err|
Thread.handle_interrupt(WORKER_INTERRUPTION_OPTS) do
Thread.handle_interrupt(IO_ERROR_NEVER) do
loop do
q << pubsub.next_event
Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) { q << pubsub.next_event }
prev_err = nil
rescue StandardError => e
next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message

q << e
Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) { q << e }
prev_err = e
end
end
Expand All @@ -52,10 +56,8 @@ def spawn_worker(client, queue)
end
end

WORKER_INTERRUPTION_OPTS = { IOError => :never }.freeze
BUF_SIZE = Integer(ENV.fetch('REDIS_CLIENT_PUBSUB_BUF_SIZE', 1024))

private_constant :WORKER_INTERRUPTION_OPTS, :BUF_SIZE
private_constant :BUF_SIZE

def initialize(router, command_builder)
@router = router
Expand Down