From 670e762a55bbe7a93c653c8b2b8095c8787bdfdb Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Tue, 23 Sep 2025 08:41:17 +0900 Subject: [PATCH] fix: interruption handling in worker thread --- .../cluster/concurrent_worker/pooled.rb | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/redis_client/cluster/concurrent_worker/pooled.rb b/lib/redis_client/cluster/concurrent_worker/pooled.rb index 8677cff..c814391 100644 --- a/lib/redis_client/cluster/concurrent_worker/pooled.rb +++ b/lib/redis_client/cluster/concurrent_worker/pooled.rb @@ -10,6 +10,10 @@ module ConcurrentWorker # It is a fixed size but we can modify the size with some environment variables. # So it consumes memory 1 MB multiplied a number of workers. class Pooled + IO_ERROR_NEVER = { IOError => :never }.freeze + IO_ERROR_ON_BLOCKING = { IOError => :on_blocking }.freeze + private_constant :IO_ERROR_NEVER, :IO_ERROR_ON_BLOCKING + def initialize(size:) raise ArgumentError, "size must be positive: #{size}" unless size.positive? @@ -65,7 +69,15 @@ def ensure_workers def spawn_worker Thread.new(@q) do |q| - loop { q.pop.exec } + Thread.handle_interrupt(IO_ERROR_NEVER) do + loop do + Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) do + q.pop.exec + end + end + end + rescue IOError + # stream closed in another thread end end end