@@ -16,42 +16,47 @@ class Cluster
16
16
def initialize ( config , pool : nil , concurrency : nil , **kwargs )
17
17
@config = config
18
18
@concurrent_worker = ::RedisClient ::Cluster ::ConcurrentWorker . create ( **( concurrency || { } ) )
19
- @router = ::RedisClient ::Cluster ::Router . new ( config , @concurrent_worker , pool : pool , **kwargs )
20
19
@command_builder = config . command_builder
20
+
21
+ @pool = pool
22
+ @kwargs = kwargs
23
+ @router = nil
24
+ @mutex = Mutex . new
21
25
end
22
26
23
27
def inspect
24
- "#<#{ self . class . name } #{ @router . node_keys . join ( ', ' ) } >"
28
+ node_keys = @router . nil? ? @config . startup_nodes . keys : router . node_keys
29
+ "#<#{ self . class . name } #{ node_keys . join ( ', ' ) } >"
25
30
end
26
31
27
32
def call ( *args , **kwargs , &block )
28
33
command = @command_builder . generate ( args , kwargs )
29
- @ router. send_command ( :call_v , command , &block )
34
+ router . send_command ( :call_v , command , &block )
30
35
end
31
36
32
37
def call_v ( command , &block )
33
38
command = @command_builder . generate ( command )
34
- @ router. send_command ( :call_v , command , &block )
39
+ router . send_command ( :call_v , command , &block )
35
40
end
36
41
37
42
def call_once ( *args , **kwargs , &block )
38
43
command = @command_builder . generate ( args , kwargs )
39
- @ router. send_command ( :call_once_v , command , &block )
44
+ router . send_command ( :call_once_v , command , &block )
40
45
end
41
46
42
47
def call_once_v ( command , &block )
43
48
command = @command_builder . generate ( command )
44
- @ router. send_command ( :call_once_v , command , &block )
49
+ router . send_command ( :call_once_v , command , &block )
45
50
end
46
51
47
52
def blocking_call ( timeout , *args , **kwargs , &block )
48
53
command = @command_builder . generate ( args , kwargs )
49
- @ router. send_command ( :blocking_call_v , command , timeout , &block )
54
+ router . send_command ( :blocking_call_v , command , timeout , &block )
50
55
end
51
56
52
57
def blocking_call_v ( timeout , command , &block )
53
58
command = @command_builder . generate ( command )
54
- @ router. send_command ( :blocking_call_v , command , timeout , &block )
59
+ router . send_command ( :blocking_call_v , command , timeout , &block )
55
60
end
56
61
57
62
def scan ( *args , **kwargs , &block )
@@ -60,31 +65,31 @@ def scan(*args, **kwargs, &block)
60
65
seed = Random . new_seed
61
66
cursor = ZERO_CURSOR_FOR_SCAN
62
67
loop do
63
- cursor , keys = @ router. scan ( 'SCAN' , cursor , *args , seed : seed , **kwargs )
68
+ cursor , keys = router . scan ( 'SCAN' , cursor , *args , seed : seed , **kwargs )
64
69
keys . each ( &block )
65
70
break if cursor == ZERO_CURSOR_FOR_SCAN
66
71
end
67
72
end
68
73
69
74
def sscan ( key , *args , **kwargs , &block )
70
- node = @ router. assign_node ( [ 'SSCAN' , key ] )
71
- @ router. try_delegate ( node , :sscan , key , *args , **kwargs , &block )
75
+ node = router . assign_node ( [ 'SSCAN' , key ] )
76
+ router . try_delegate ( node , :sscan , key , *args , **kwargs , &block )
72
77
end
73
78
74
79
def hscan ( key , *args , **kwargs , &block )
75
- node = @ router. assign_node ( [ 'HSCAN' , key ] )
76
- @ router. try_delegate ( node , :hscan , key , *args , **kwargs , &block )
80
+ node = router . assign_node ( [ 'HSCAN' , key ] )
81
+ router . try_delegate ( node , :hscan , key , *args , **kwargs , &block )
77
82
end
78
83
79
84
def zscan ( key , *args , **kwargs , &block )
80
- node = @ router. assign_node ( [ 'ZSCAN' , key ] )
81
- @ router. try_delegate ( node , :zscan , key , *args , **kwargs , &block )
85
+ node = router . assign_node ( [ 'ZSCAN' , key ] )
86
+ router . try_delegate ( node , :zscan , key , *args , **kwargs , &block )
82
87
end
83
88
84
89
def pipelined ( exception : true )
85
90
seed = @config . use_replica? && @config . replica_affinity == :random ? nil : Random . new_seed
86
91
pipeline = ::RedisClient ::Cluster ::Pipeline . new (
87
- @ router,
92
+ router ,
88
93
@command_builder ,
89
94
@concurrent_worker ,
90
95
exception : exception ,
@@ -99,48 +104,56 @@ def pipelined(exception: true)
99
104
100
105
def multi ( watch : nil )
101
106
if watch . nil? || watch . empty?
102
- transaction = ::RedisClient ::Cluster ::Transaction . new ( @ router, @command_builder )
107
+ transaction = ::RedisClient ::Cluster ::Transaction . new ( router , @command_builder )
103
108
yield transaction
104
109
return transaction . execute
105
110
end
106
111
107
- ::RedisClient ::Cluster ::OptimisticLocking . new ( @ router) . watch ( watch ) do |c , slot , asking |
112
+ ::RedisClient ::Cluster ::OptimisticLocking . new ( router ) . watch ( watch ) do |c , slot , asking |
108
113
transaction = ::RedisClient ::Cluster ::Transaction . new (
109
- @ router, @command_builder , node : c , slot : slot , asking : asking
114
+ router , @command_builder , node : c , slot : slot , asking : asking
110
115
)
111
116
yield transaction
112
117
transaction . execute
113
118
end
114
119
end
115
120
116
121
def pubsub
117
- ::RedisClient ::Cluster ::PubSub . new ( @ router, @command_builder )
122
+ ::RedisClient ::Cluster ::PubSub . new ( router , @command_builder )
118
123
end
119
124
120
125
def with ( ...)
121
126
raise NotImplementedError , 'No way to use'
122
127
end
123
128
124
129
def close
130
+ @router &.close
125
131
@concurrent_worker . close
126
- @router . close
127
132
nil
128
133
end
129
134
130
135
private
131
136
137
+ def router
138
+ return @router unless @router . nil?
139
+
140
+ @mutex . synchronize do
141
+ @router ||= ::RedisClient ::Cluster ::Router . new ( @config , @concurrent_worker , pool : @pool , **@kwargs )
142
+ end
143
+ end
144
+
132
145
def method_missing ( name , *args , **kwargs , &block )
133
- if @ router. command_exists? ( name )
146
+ if router . command_exists? ( name )
134
147
args . unshift ( name )
135
148
command = @command_builder . generate ( args , kwargs )
136
- return @ router. send_command ( :call_v , command , &block )
149
+ return router . send_command ( :call_v , command , &block )
137
150
end
138
151
139
152
super
140
153
end
141
154
142
155
def respond_to_missing? ( name , include_private = false )
143
- return true if @ router. command_exists? ( name )
156
+ return true if router . command_exists? ( name )
144
157
145
158
super
146
159
end
0 commit comments