Skip to content

Commit f978c98

Browse files
authored
fix: revise rough interruption handling to fine grain for thread (#440)
1 parent 4e4af95 commit f978c98

File tree

1 file changed

+8
-6
lines changed

1 file changed

+8
-6
lines changed

lib/redis_client/cluster/pub_sub.rb

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ class RedisClient
77
class Cluster
88
class PubSub
99
class State
10+
IO_ERROR_NEVER = { IOError => :never }.freeze
11+
IO_ERROR_ON_BLOCKING = { IOError => :on_blocking }.freeze
12+
private_constant :IO_ERROR_NEVER, :IO_ERROR_ON_BLOCKING
13+
1014
def initialize(client, queue)
1115
@client = client
1216
@worker = nil
@@ -35,14 +39,14 @@ def spawn_worker(client, queue)
3539
# It is a fixed size but we can modify the size with some environment variables.
3640
# So it consumes memory 1 MB multiplied a number of workers.
3741
Thread.new(client, queue, nil) do |pubsub, q, prev_err|
38-
Thread.handle_interrupt(WORKER_INTERRUPTION_OPTS) do
42+
Thread.handle_interrupt(IO_ERROR_NEVER) do
3943
loop do
40-
q << pubsub.next_event
44+
Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) { q << pubsub.next_event }
4145
prev_err = nil
4246
rescue StandardError => e
4347
next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message
4448

45-
q << e
49+
Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) { q << e }
4650
prev_err = e
4751
end
4852
end
@@ -52,10 +56,8 @@ def spawn_worker(client, queue)
5256
end
5357
end
5458

55-
WORKER_INTERRUPTION_OPTS = { IOError => :never }.freeze
5659
BUF_SIZE = Integer(ENV.fetch('REDIS_CLIENT_PUBSUB_BUF_SIZE', 1024))
57-
58-
private_constant :WORKER_INTERRUPTION_OPTS, :BUF_SIZE
60+
private_constant :BUF_SIZE
5961

6062
def initialize(router, command_builder)
6163
@router = router

0 commit comments

Comments
 (0)