@@ -20,6 +20,20 @@ class Node
20
20
IGNORE_GENERIC_CONFIG_KEYS = %i[ url host port path ] . freeze
21
21
22
22
ReloadNeeded = Class . new ( ::RedisClient ::Error )
23
+ Info = Struct . new (
24
+ 'RedisNode' ,
25
+ :id , :node_key , :role , :primary_id , :ping_sent ,
26
+ :pong_recv , :config_epoch , :link_state , :slots ,
27
+ keyword_init : true
28
+ ) do
29
+ def primary?
30
+ role == 'master'
31
+ end
32
+
33
+ def replica?
34
+ role == 'slave'
35
+ end
36
+ end
23
37
24
38
class Config < ::RedisClient ::Config
25
39
def initialize ( scale_read : false , **kwargs )
@@ -48,7 +62,7 @@ def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/Cycl
48
62
Thread . pass
49
63
Thread . current . thread_variable_set ( :index , i )
50
64
reply = cli . call ( 'CLUSTER' , 'NODES' )
51
- Thread . current . thread_variable_set ( :info , parse_node_info ( reply ) )
65
+ Thread . current . thread_variable_set ( :info , parse_cluster_node_reply ( reply ) )
52
66
rescue StandardError => e
53
67
Thread . current . thread_variable_set ( :error , e )
54
68
ensure
@@ -70,10 +84,11 @@ def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/Cycl
70
84
71
85
raise ::RedisClient ::Cluster ::InitialSetupError , errors if node_info_list . nil?
72
86
73
- grouped = node_info_list . compact . group_by do |rows |
74
- rows . sort_by { |row | row [ :id ] }
75
- . map { |r | "#{ r [ :id ] } #{ r [ :node_key ] } #{ r [ :role ] } #{ r [ :primary_id ] } #{ r [ :config_epoch ] } " }
76
- . join
87
+ grouped = node_info_list . compact . group_by do |info_list |
88
+ info_list
89
+ . sort_by ( &:id )
90
+ . map { |i | "#{ i . id } #{ i . node_key } #{ i . role } #{ i . primary_id } #{ i . config_epoch } " }
91
+ . join
77
92
end
78
93
79
94
grouped . max_by { |_ , v | v . size } [ 1 ] . first
@@ -83,8 +98,8 @@ def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/Cycl
83
98
84
99
# @see https://redis.io/commands/cluster-nodes/
85
100
# @see https://github.com/redis/redis/blob/78960ad57b8a5e6af743d789ed8fd767e37d42b8/src/cluster.c#L4660-L4683
86
- def parse_node_info ( info ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
87
- rows = info . split ( "\n " ) . map ( &:split )
101
+ def parse_cluster_node_reply ( reply ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
102
+ rows = reply . split ( "\n " ) . map ( &:split )
88
103
rows . each { |arr | arr [ 2 ] = arr [ 2 ] . split ( ',' ) }
89
104
rows . select! { |arr | arr [ 7 ] == 'connected' && ( arr [ 2 ] & %w[ fail? fail handshake noaddr noflags ] ) . empty? }
90
105
rows . each do |arr |
@@ -99,23 +114,25 @@ def parse_node_info(info) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticC
99
114
end
100
115
101
116
rows . map do |arr |
102
- { id : arr [ 0 ] , node_key : arr [ 1 ] , role : arr [ 2 ] , primary_id : arr [ 3 ] , ping_sent : arr [ 4 ] ,
103
- pong_recv : arr [ 5 ] , config_epoch : arr [ 6 ] , link_state : arr [ 7 ] , slots : arr [ 8 ] }
117
+ ::RedisClient ::Cluster ::Node ::Info . new (
118
+ id : arr [ 0 ] , node_key : arr [ 1 ] , role : arr [ 2 ] , primary_id : arr [ 3 ] , ping_sent : arr [ 4 ] ,
119
+ pong_recv : arr [ 5 ] , config_epoch : arr [ 6 ] , link_state : arr [ 7 ] , slots : arr [ 8 ]
120
+ )
104
121
end
105
122
end
106
123
end
107
124
108
125
def initialize (
109
126
options ,
110
- node_info : [ ] ,
127
+ node_info_list : [ ] ,
111
128
with_replica : false ,
112
129
replica_affinity : :random ,
113
130
pool : nil ,
114
131
**kwargs
115
132
)
116
133
117
- @slots = build_slot_node_mappings ( node_info )
118
- @replications = build_replication_mappings ( node_info )
134
+ @slots = build_slot_node_mappings ( node_info_list )
135
+ @replications = build_replication_mappings ( node_info_list )
119
136
@topology = make_topology_class ( with_replica , replica_affinity ) . new ( @replications , options , pool , **kwargs )
120
137
@mutex = Mutex . new
121
138
end
@@ -207,23 +224,23 @@ def make_topology_class(with_replica, replica_affinity)
207
224
end
208
225
end
209
226
210
- def build_slot_node_mappings ( node_info )
227
+ def build_slot_node_mappings ( node_info_list )
211
228
slots = Array . new ( SLOT_SIZE )
212
- node_info . each do |info |
213
- next if info [ : slots] . nil? || info [ : slots] . empty?
229
+ node_info_list . each do |info |
230
+ next if info . slots . nil? || info . slots . empty?
214
231
215
- info [ : slots] . each { |start , last | ( start ..last ) . each { |i | slots [ i ] = info [ : node_key] } }
232
+ info . slots . each { |start , last | ( start ..last ) . each { |i | slots [ i ] = info . node_key } }
216
233
end
217
234
218
235
slots
219
236
end
220
237
221
- def build_replication_mappings ( node_info ) # rubocop:disable Metrics/AbcSize
222
- dict = node_info . to_h { |info | [ info [ :id ] , info ] }
223
- node_info . each_with_object ( Hash . new { |h , k | h [ k ] = [ ] } ) do |info , acc |
224
- primary_info = dict [ info [ : primary_id] ]
225
- acc [ primary_info [ : node_key] ] << info [ : node_key] unless primary_info . nil?
226
- acc [ info [ : node_key] ] if info [ :role ] == 'master' # for the primary which have no replicas
238
+ def build_replication_mappings ( node_info_list ) # rubocop:disable Metrics/AbcSize
239
+ dict = node_info_list . to_h { |info | [ info . id , info ] }
240
+ node_info_list . each_with_object ( Hash . new { |h , k | h [ k ] = [ ] } ) do |info , acc |
241
+ primary_info = dict [ info . primary_id ]
242
+ acc [ primary_info . node_key ] << info . node_key unless primary_info . nil?
243
+ acc [ info . node_key ] if info . primary? # for the primary which have no replicas
227
244
end
228
245
end
229
246
0 commit comments