diff --git a/lib/redis_client/cluster/pub_sub.rb b/lib/redis_client/cluster/pub_sub.rb index fa9004e..719cc4b 100644 --- a/lib/redis_client/cluster/pub_sub.rb +++ b/lib/redis_client/cluster/pub_sub.rb @@ -7,6 +7,10 @@ class RedisClient class Cluster class PubSub class State + IO_ERROR_NEVER = { IOError => :never }.freeze + IO_ERROR_ON_BLOCKING = { IOError => :on_blocking }.freeze + private_constant :IO_ERROR_NEVER, :IO_ERROR_ON_BLOCKING + def initialize(client, queue) @client = client @worker = nil @@ -35,14 +39,14 @@ def spawn_worker(client, queue) # It is a fixed size but we can modify the size with some environment variables. # So it consumes memory 1 MB multiplied a number of workers. Thread.new(client, queue, nil) do |pubsub, q, prev_err| - Thread.handle_interrupt(WORKER_INTERRUPTION_OPTS) do + Thread.handle_interrupt(IO_ERROR_NEVER) do loop do - q << pubsub.next_event + Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) { q << pubsub.next_event } prev_err = nil rescue StandardError => e next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message - q << e + Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) { q << e } prev_err = e end end @@ -52,10 +56,8 @@ def spawn_worker(client, queue) end end - WORKER_INTERRUPTION_OPTS = { IOError => :never }.freeze BUF_SIZE = Integer(ENV.fetch('REDIS_CLIENT_PUBSUB_BUF_SIZE', 1024)) - - private_constant :WORKER_INTERRUPTION_OPTS, :BUF_SIZE + private_constant :BUF_SIZE def initialize(router, command_builder) @router = router