diff --git a/lib/redis_client/cluster/concurrent_worker/pooled.rb b/lib/redis_client/cluster/concurrent_worker/pooled.rb index 8e92b72..ca29b91 100644 --- a/lib/redis_client/cluster/concurrent_worker/pooled.rb +++ b/lib/redis_client/cluster/concurrent_worker/pooled.rb @@ -11,8 +11,8 @@ module ConcurrentWorker # So it consumes memory 1 MB multiplied a number of workers. class Pooled IO_ERROR_NEVER = { IOError => :never }.freeze - IO_ERROR_ON_BLOCKING = { IOError => :on_blocking }.freeze - private_constant :IO_ERROR_NEVER, :IO_ERROR_ON_BLOCKING + IO_ERROR_IMMEDIATE = { IOError => :immediate }.freeze + private_constant :IO_ERROR_NEVER, :IO_ERROR_IMMEDIATE def initialize(size:) raise ArgumentError, "size must be positive: #{size}" unless size.positive? @@ -73,7 +73,7 @@ def spawn_worker Thread.new(@q) do |q| Thread.handle_interrupt(IO_ERROR_NEVER) do loop do - Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) do + Thread.handle_interrupt(IO_ERROR_IMMEDIATE) do q.pop.exec end end diff --git a/lib/redis_client/cluster/pub_sub.rb b/lib/redis_client/cluster/pub_sub.rb index 4cda609..dea2f6d 100644 --- a/lib/redis_client/cluster/pub_sub.rb +++ b/lib/redis_client/cluster/pub_sub.rb @@ -8,8 +8,8 @@ class Cluster class PubSub class State IO_ERROR_NEVER = { IOError => :never }.freeze - IO_ERROR_ON_BLOCKING = { IOError => :on_blocking }.freeze - private_constant :IO_ERROR_NEVER, :IO_ERROR_ON_BLOCKING + IO_ERROR_IMMEDIATE = { IOError => :immediate }.freeze + private_constant :IO_ERROR_NEVER, :IO_ERROR_IMMEDIATE def initialize(client, queue) @client = client @@ -45,12 +45,12 @@ def spawn_worker(client, queue) Thread.new(client, queue, nil) do |pubsub, q, prev_err| Thread.handle_interrupt(IO_ERROR_NEVER) do loop do - Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) { q << pubsub.next_event } + Thread.handle_interrupt(IO_ERROR_IMMEDIATE) { q << pubsub.next_event } prev_err = nil rescue StandardError => e next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message - Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) { q << e } + Thread.handle_interrupt(IO_ERROR_IMMEDIATE) { q << e } prev_err = e end end