@@ -5,9 +5,9 @@ use super::{create_connection_request, ClusterMode, TestConfiguration};
55use futures:: future:: { join_all, BoxFuture } ;
66use futures:: FutureExt ;
77use glide_core:: client:: Client ;
8- use glide_core:: connection_request:: NodeAddress ;
98use once_cell:: sync:: Lazy ;
109use redis:: { ConnectionAddr , RedisConnectionInfo } ;
10+ use serde:: Deserialize ;
1111use std:: process:: Command ;
1212use std:: sync:: Mutex ;
1313use std:: time:: Duration ;
@@ -23,6 +23,14 @@ enum ClusterType {
2323 TcpTls ,
2424}
2525
26+ #[ derive( Deserialize , Clone , Debug ) ]
27+ struct ValkeyServerInfo {
28+ host : String ,
29+ port : u32 ,
30+ pid : u32 ,
31+ is_primary : bool ,
32+ }
33+
2634impl ClusterType {
2735 fn build_addr ( use_tls : bool , host : & str , port : u16 ) -> redis:: ConnectionAddr {
2836 if use_tls {
@@ -40,15 +48,27 @@ impl ClusterType {
4048
4149pub struct RedisCluster {
4250 cluster_folder : String ,
43- addresses : Vec < NodeAddress > ,
4451 use_tls : bool ,
4552 password : Option < String > ,
53+ servers : Vec < ValkeyServerInfo > ,
4654}
4755
4856impl Drop for RedisCluster {
4957 fn drop ( & mut self ) {
58+ let pids: Vec < String > = self
59+ . servers
60+ . iter ( )
61+ . map ( |server| format ! ( "{}" , server. pid) )
62+ . collect ( ) ;
63+ let pids = pids. join ( "," ) ;
5064 Self :: execute_cluster_script (
51- vec ! [ "stop" , "--cluster-folder" , & self . cluster_folder] ,
65+ vec ! [
66+ "stop" ,
67+ "--cluster-folder" ,
68+ & self . cluster_folder,
69+ "--pids" ,
70+ & pids,
71+ ] ,
5272 self . use_tls ,
5373 self . password . clone ( ) ,
5474 ) ;
@@ -119,44 +139,46 @@ impl RedisCluster {
119139 script_args. push ( & replicas_num) ;
120140 }
121141 let ( stdout, stderr) = Self :: execute_cluster_script ( script_args, use_tls, None ) ;
122- let ( cluster_folder, addresses ) = Self :: parse_start_script_output ( & stdout, & stderr) ;
142+ let ( cluster_folder, servers ) = Self :: parse_start_script_output ( & stdout, & stderr) ;
123143 let mut password: Option < String > = None ;
124144 if let Some ( info) = conn_info {
125145 password. clone_from ( & info. password ) ;
126146 } ;
127147 RedisCluster {
128148 cluster_folder,
129- addresses,
130149 use_tls,
131150 password,
151+ servers,
132152 }
133153 }
134154
135- fn parse_start_script_output ( output : & str , errors : & str ) -> ( String , Vec < NodeAddress > ) {
136- let cluster_folder = output. split ( "CLUSTER_FOLDER=" ) . collect :: < Vec < & str > > ( ) ;
137- assert ! (
138- !cluster_folder. is_empty( ) && cluster_folder. len( ) >= 2 ,
139- "Received output: {output}, stderr: {errors}"
140- ) ;
141- let cluster_folder = cluster_folder. get ( 1 ) . unwrap ( ) . lines ( ) ;
142- let cluster_folder = cluster_folder. collect :: < Vec < & str > > ( ) ;
143- let cluster_folder = cluster_folder. first ( ) . unwrap ( ) . to_string ( ) ;
144-
145- let output_parts = output. split ( "CLUSTER_NODES=" ) . collect :: < Vec < & str > > ( ) ;
146- assert ! (
147- !output_parts. is_empty( ) && output_parts. len( ) >= 2 ,
148- "Received output: {output}, stderr: {errors}"
149- ) ;
150- let nodes = output_parts. get ( 1 ) . unwrap ( ) . split ( ',' ) ;
151- let mut address_vec: Vec < NodeAddress > = Vec :: new ( ) ;
152- for node in nodes {
153- let node_parts = node. split ( ':' ) . collect :: < Vec < & str > > ( ) ;
154- let mut address_info = NodeAddress :: new ( ) ;
155- address_info. host = node_parts. first ( ) . unwrap ( ) . to_string ( ) . into ( ) ;
156- address_info. port = node_parts. get ( 1 ) . unwrap ( ) . parse :: < u32 > ( ) . unwrap ( ) ;
157- address_vec. push ( address_info) ;
155+ fn value_after_prefix ( prefix : & str , line : & str ) -> Option < String > {
156+ if !line. starts_with ( prefix) {
157+ return None ;
158158 }
159- ( cluster_folder, address_vec)
159+ Some ( line[ prefix. len ( ) ..] . to_string ( ) )
160+ }
161+
162+ fn parse_start_script_output ( output : & str , _errors : & str ) -> ( String , Vec < ValkeyServerInfo > ) {
163+ let prefixes = vec ! [ "CLUSTER_FOLDER" , "SERVERS_JSON" ] ;
164+ let mut values = std:: collections:: HashMap :: < String , String > :: new ( ) ;
165+ let lines: Vec < & str > = output. split ( '\n' ) . map ( |line| line. trim ( ) ) . collect ( ) ;
166+ for line in lines {
167+ for prefix in & prefixes {
168+ let prefix_with_shave = format ! ( "{prefix}=" ) ;
169+ if line. starts_with ( & prefix_with_shave) {
170+ values. insert (
171+ prefix. to_string ( ) ,
172+ Self :: value_after_prefix ( & prefix_with_shave, line) . unwrap_or_default ( ) ,
173+ ) ;
174+ }
175+ }
176+ }
177+
178+ let cluster_folder = values. get ( "CLUSTER_FOLDER" ) . unwrap ( ) ;
179+ let cluster_nodes_json = values. get ( "SERVERS_JSON" ) . unwrap ( ) ;
180+ let servers: Vec < ValkeyServerInfo > = serde_json:: from_str ( cluster_nodes_json) . unwrap ( ) ;
181+ ( cluster_folder. clone ( ) , servers)
160182 }
161183
162184 fn execute_cluster_script (
@@ -180,6 +202,7 @@ impl RedisCluster {
180202 } ,
181203 args. join( " " )
182204 ) ;
205+
183206 let output = if cfg ! ( target_os = "windows" ) {
184207 Command :: new ( "cmd" )
185208 . args ( [ "/C" , & cmd] )
@@ -204,11 +227,9 @@ impl RedisCluster {
204227 }
205228
206229 pub fn get_server_addresses ( & self ) -> Vec < ConnectionAddr > {
207- self . addresses
230+ self . servers
208231 . iter ( )
209- . map ( |address| {
210- ClusterType :: build_addr ( self . use_tls , & address. host , address. port as u16 )
211- } )
232+ . map ( |server| ClusterType :: build_addr ( self . use_tls , & server. host , server. port as u16 ) )
212233 . collect ( )
213234 }
214235}
@@ -284,3 +305,34 @@ pub async fn setup_test_basics(use_tls: bool) -> ClusterTestBasics {
284305 } )
285306 . await
286307}
308+
309+ #[ cfg( test) ]
310+ mod tests {
311+ use super :: * ;
312+
313+ #[ test]
314+ fn test_parse_start_script_output ( ) {
315+ let script_output = r#"
316+ INFO:root:## Executing cluster_manager.py with the following args:
317+ Namespace(host='127.0.0.1', tls=False, auth=None, log='info', logfile=None, action='start', cluster_mode=True, folder_path='/Users/user/glide-for-redis/utils/clusters', ports=None, shard_count=3, replica_count=2, prefix='redis-cluster', load_module=None)
318+ INFO:root:2024-11-05 16:05:44.024796+00:00 Starting script for cluster /Users/user/glide-for-redis/utils/clusters/redis-cluster-2024-11-05T16-05-44Z-2bz4YS
319+ LOG_FILE=/Users/user/glide-for-redis/utils/clusters/redis-cluster-2024-11-05T16-05-44Z-2bz4YS/cluster_manager.log
320+ SERVERS_JSON=[{"host": "127.0.0.1", "port": 39163, "pid": 59428, "is_primary": true}, {"host": "127.0.0.1", "port": 23178, "pid": 59436, "is_primary": true}, {"host": "127.0.0.1", "port": 25186, "pid": 59453, "is_primary": true}, {"host": "127.0.0.1", "port": 52500, "pid": 59432, "is_primary": false}, {"host": "127.0.0.1", "port": 48252, "pid": 59461, "is_primary": false}, {"host": "127.0.0.1", "port": 19544, "pid": 59444, "is_primary": false}, {"host": "127.0.0.1", "port": 37455, "pid": 59440, "is_primary": false}, {"host": "127.0.0.1", "port": 9282, "pid": 59449, "is_primary": false}, {"host": "127.0.0.1", "port": 19843, "pid": 59457, "is_primary": false}]
321+ INFO:root:Created Cluster Redis in 24.8926 seconds
322+ CLUSTER_FOLDER=/Users/user/glide-for-redis/utils/clusters/redis-cluster-2024-11-05T16-05-44Z-2bz4YS
323+ CLUSTER_NODES=127.0.0.1:39163,127.0.0.1:23178,127.0.0.1:25186,127.0.0.1:52500,127.0.0.1:48252,127.0.0.1:19544,127.0.0.1:37455,127.0.0.1:9282,127.0.0.1:19843
324+ "# ;
325+ let ( folder, servers) = RedisCluster :: parse_start_script_output ( script_output, "" ) ;
326+ assert_eq ! ( servers. len( ) , 9 ) ;
327+ assert_eq ! (
328+ folder,
329+ "/Users/user/glide-for-redis/utils/clusters/redis-cluster-2024-11-05T16-05-44Z-2bz4YS"
330+ ) ;
331+
332+ let server_0 = servers. first ( ) . unwrap ( ) ;
333+ assert_eq ! ( server_0. pid, 59428 ) ;
334+ assert_eq ! ( server_0. port, 39163 ) ;
335+ assert_eq ! ( server_0. host, "127.0.0.1" ) ;
336+ assert ! ( server_0. is_primary) ;
337+ }
338+ }
0 commit comments