Skip to content

Commit 8543134

Browse files
authored
fix: ensure recoverability from cluster state changes (#379)
1 parent 1967399 commit 8543134

File tree

14 files changed

+399
-174
lines changed

14 files changed

+399
-174
lines changed

.github/workflows/test.yaml

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,22 @@ jobs:
3333
- {redis: '7.2', ruby: '3.3', driver: 'hiredis', compose: compose.ssl.yaml}
3434
- {redis: '7.2', ruby: '3.3', compose: compose.replica.yaml, replica: '2'}
3535
- {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'}
36-
- {task: test_cluster_broken, redis: '7.2', restart: 'no', startup: '6'}
37-
- {task: test_cluster_broken, redis: '6.2', restart: 'no', startup: '6'}
38-
- {task: test_cluster_scale, redis: '7.2', compose: compose.scale.yaml, startup: '8'}
39-
- {task: test_cluster_scale, redis: '6.2', compose: compose.scale.yaml, startup: '8'}
4036
- {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml}
37+
- {task: test_cluster_broken, restart: 'no', startup: '6'}
4138
- {redis: '7.0', ruby: '3.1'}
4239
- {redis: '6.2', ruby: '3.0'}
4340
- {redis: '5.0', ruby: '2.7'}
44-
- {task: test_cluster_state, redis: '8', pattern: 'PrimaryOnly', compose: compose.valkey.yaml, replica: '2', startup: '9'}
45-
- {task: test_cluster_state, redis: '8', pattern: 'Pooled', compose: compose.valkey.yaml, replica: '2', startup: '9'}
46-
- {task: test_cluster_state, redis: '8', pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, replica: '2', startup: '9'}
47-
- {task: test_cluster_state, redis: '8', pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, replica: '2', startup: '9'}
48-
- {task: test_cluster_state, redis: '8', pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, replica: '2', startup: '9'}
41+
- {task: test_cluster_scale, pattern: 'Single', compose: compose.scale.yaml, startup: '8'}
42+
- {task: test_cluster_scale, pattern: 'Pipeline', compose: compose.scale.yaml, startup: '8'}
43+
- {task: test_cluster_scale, pattern: 'Transaction', compose: compose.scale.yaml, startup: '8'}
44+
- {task: test_cluster_scale, pattern: 'PubSub', compose: compose.scale.yaml, startup: '8'}
4945
- {ruby: 'jruby'}
5046
- {ruby: 'truffleruby'}
47+
- {task: test_cluster_state, pattern: 'PrimaryOnly', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
48+
- {task: test_cluster_state, pattern: 'Pooled', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
49+
- {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
50+
- {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
51+
- {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
5152
env:
5253
REDIS_VERSION: ${{ matrix.redis || '7.2' }}
5354
DOCKER_COMPOSE_FILE: ${{ matrix.compose || 'compose.yaml' }}

.rubocop.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ Metrics/AbcSize:
1414
Exclude:
1515
- 'test/**/*'
1616

17+
Metrics/CyclomaticComplexity:
18+
Exclude:
19+
- 'test/**/*'
20+
21+
Metrics/PerceivedComplexity:
22+
Exclude:
23+
- 'test/**/*'
24+
1725
Metrics/ClassLength:
1826
Max: 500
1927

lib/redis_client/cluster/errors.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class ErrorCollection < ::RedisClient::Error
3232
def initialize(errors)
3333
@errors = {}
3434
if !errors.is_a?(Hash) || errors.empty?
35-
super('')
35+
super(errors.to_s)
3636
return
3737
end
3838

lib/redis_client/cluster/optimistic_locking.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ def initialize(router)
1111
@asking = false
1212
end
1313

14-
def watch(keys)
14+
def watch(keys) # rubocop:disable Metrics/AbcSize
1515
slot = find_slot(keys)
1616
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?
1717

@@ -32,7 +32,13 @@ def watch(keys)
3232
c.call('UNWATCH')
3333
raise
3434
end
35+
rescue ::RedisClient::CommandError => e
36+
@router.renew_cluster_state if e.message.start_with?('CLUSTERDOWN Hash slot not served')
37+
raise
3538
end
39+
rescue ::RedisClient::ConnectionError
40+
@router.renew_cluster_state
41+
raise
3642
end
3743
end
3844
end

lib/redis_client/cluster/pipeline.rb

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco
5555
results = Array.new(commands.size)
5656
@pending_reads += size
5757
write_multi(commands)
58-
redirection_indices = nil
59-
first_exception = nil
58+
redirection_indices = stale_cluster_state = first_exception = nil
6059

6160
size.times do |index|
6261
timeout = timeouts && timeouts[index]
@@ -73,18 +72,31 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco
7372
elsif exception
7473
first_exception ||= result
7574
end
75+
76+
stale_cluster_state = true if result.message.start_with?('CLUSTERDOWN Hash slot not served')
7677
end
7778

7879
results[index] = result
7980
end
8081

81-
raise first_exception if exception && first_exception
82-
return results if redirection_indices.nil?
82+
if redirection_indices
83+
err = ::RedisClient::Cluster::Pipeline::RedirectionNeeded.new
84+
err.replies = results
85+
err.indices = redirection_indices
86+
err.first_exception = first_exception
87+
raise err
88+
end
89+
90+
if stale_cluster_state
91+
err = ::RedisClient::Cluster::Pipeline::StaleClusterState.new
92+
err.replies = results
93+
err.first_exception = first_exception
94+
raise err
95+
end
96+
97+
raise first_exception if first_exception
8398

84-
err = ::RedisClient::Cluster::Pipeline::RedirectionNeeded.new
85-
err.replies = results
86-
err.indices = redirection_indices
87-
raise err
99+
results
88100
end
89101
end
90102

@@ -98,8 +110,12 @@ def ensure_connected_cluster_scoped(retryable: true, &block)
98110

99111
ReplySizeError = Class.new(::RedisClient::Error)
100112

113+
class StaleClusterState < ::RedisClient::Error
114+
attr_accessor :replies, :first_exception
115+
end
116+
101117
class RedirectionNeeded < ::RedisClient::Error
102-
attr_accessor :replies, :indices
118+
attr_accessor :replies, :indices, :first_exception
103119
end
104120

105121
def initialize(router, command_builder, concurrent_worker, exception:, seed: Random.new_seed)
@@ -166,14 +182,18 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
166182
end
167183
end
168184

169-
all_replies = errors = required_redirections = nil
185+
all_replies = errors = required_redirections = cluster_state_errors = nil
170186

171187
work_group.each do |node_key, v|
172188
case v
173189
when ::RedisClient::Cluster::Pipeline::RedirectionNeeded
174190
required_redirections ||= {}
175191
required_redirections[node_key] = v
192+
when ::RedisClient::Cluster::Pipeline::StaleClusterState
193+
cluster_state_errors ||= {}
194+
cluster_state_errors[node_key] = v
176195
when StandardError
196+
cluster_state_errors ||= {} if v.is_a?(::RedisClient::ConnectionError)
177197
errors ||= {}
178198
errors[node_key] = v
179199
else
@@ -183,15 +203,25 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
183203
end
184204

185205
work_group.close
206+
@router.renew_cluster_state if cluster_state_errors
186207
raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil?
187208

188209
required_redirections&.each do |node_key, v|
210+
raise v.first_exception if v.first_exception
211+
189212
all_replies ||= Array.new(@size)
190213
pipeline = @pipelines[node_key]
191214
v.indices.each { |i| v.replies[i] = handle_redirection(v.replies[i], pipeline, i) }
192215
pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] }
193216
end
194217

218+
cluster_state_errors&.each do |node_key, v|
219+
raise v.first_exception if v.first_exception
220+
221+
all_replies ||= Array.new(@size)
222+
@pipelines[node_key].outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] }
223+
end
224+
195225
all_replies
196226
end
197227

lib/redis_client/cluster/pub_sub.rb

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ def ensure_worker
2424
def close
2525
@worker.exit if @worker&.alive?
2626
@client.close
27+
rescue ::RedisClient::ConnectionError
28+
# ignore
2729
end
2830

2931
private
@@ -51,27 +53,33 @@ def initialize(router, command_builder)
5153
@command_builder = command_builder
5254
@queue = SizedQueue.new(BUF_SIZE)
5355
@state_dict = {}
56+
@commands = []
5457
end
5558

5659
def call(*args, **kwargs)
57-
_call(@command_builder.generate(args, kwargs))
60+
command = @command_builder.generate(args, kwargs)
61+
_call(command)
62+
@commands << command
5863
nil
5964
end
6065

6166
def call_v(command)
62-
_call(@command_builder.generate(command))
67+
command = @command_builder.generate(command)
68+
_call(command)
69+
@commands << command
6370
nil
6471
end
6572

6673
def close
6774
@state_dict.each_value(&:close)
6875
@state_dict.clear
76+
@commands.clear
6977
@queue.clear
7078
@queue.close
7179
nil
7280
end
7381

74-
def next_event(timeout = nil)
82+
def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
7583
@state_dict.each_value(&:ensure_worker)
7684
max_duration = calc_max_duration(timeout)
7785
starting = obtain_current_time
@@ -80,6 +88,13 @@ def next_event(timeout = nil)
8088
break if max_duration > 0 && obtain_current_time - starting > max_duration
8189

8290
case event = @queue.pop(true)
91+
when ::RedisClient::CommandError
92+
if event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served')
93+
@router.renew_cluster_state
94+
break start_over
95+
end
96+
97+
raise event
8398
when StandardError then raise event
8499
when Array then break event
85100
end
@@ -99,13 +114,26 @@ def _call(command)
99114
end
100115
end
101116

102-
def call_to_single_state(command)
117+
def call_to_single_state(command, retry_count: 1)
103118
node_key = @router.find_node_key(command)
104-
try_call(node_key, command)
119+
@state_dict[node_key] ||= State.new(@router.find_node(node_key).pubsub, @queue)
120+
@state_dict[node_key].call(command)
121+
rescue ::RedisClient::ConnectionError
122+
@state_dict[node_key].close
123+
@state_dict.delete(node_key)
124+
@router.renew_cluster_state
125+
retry_count -= 1
126+
retry_count >= 0 ? retry : raise
105127
end
106128

107129
def call_to_all_states(command)
108-
@state_dict.each_value { |s| s.call(command) }
130+
@state_dict.each do |node_key, state|
131+
state.call(command)
132+
rescue ::RedisClient::ConnectionError
133+
@state_dict[node_key].close
134+
@state_dict.delete(node_key)
135+
@router.renew_cluster_state
136+
end
109137
end
110138

111139
def call_for_sharded_states(command)
@@ -116,31 +144,20 @@ def call_for_sharded_states(command)
116144
end
117145
end
118146

119-
def try_call(node_key, command, retry_count: 1)
120-
add_state(node_key).call(command)
121-
rescue ::RedisClient::CommandError => e
122-
raise if !e.message.start_with?('MOVED') || retry_count <= 0
123-
124-
# for sharded pub/sub
125-
node_key = e.message.split[2]
126-
retry_count -= 1
127-
retry
128-
end
129-
130-
def add_state(node_key)
131-
return @state_dict[node_key] if @state_dict.key?(node_key)
132-
133-
state = State.new(@router.find_node(node_key).pubsub, @queue)
134-
@state_dict[node_key] = state
135-
end
136-
137147
def obtain_current_time
138148
Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
139149
end
140150

141151
def calc_max_duration(timeout)
142152
timeout.nil? || timeout < 0 ? 0 : timeout * 1_000_000
143153
end
154+
155+
def start_over
156+
@state_dict.each_value(&:close)
157+
@state_dict.clear
158+
@commands.each { |command| _call(command) }
159+
nil
160+
end
144161
end
145162
end
146163
end

0 commit comments

Comments
 (0)