2929import org .slf4j .Logger ;
3030import org .slf4j .LoggerFactory ;
3131
32- public class Host {
32+ public class Cli {
3333
34- private static final Logger LOGGER = LoggerFactory .getLogger (Host .class );
34+ private static final Logger LOGGER = LoggerFactory .getLogger (Cli .class );
3535
3636 private static final String DOCKER_PREFIX = "DOCKER:" ;
3737
3838 private static final Gson GSON = new Gson ();
3939
40- public static String capture (InputStream is ) throws IOException {
41- BufferedReader br = new BufferedReader (new InputStreamReader (is ));
42- String line ;
43- StringBuilder buff = new StringBuilder ();
44- while ((line = br .readLine ()) != null ) {
45- buff .append (line ).append ("\n " );
46- }
47- return buff .toString ();
48- }
40+ private static final Map <String , String > DOCKER_NODES_TO_CONTAINERS =
41+ Map .of (
42+ "rabbit@node0" , "rabbitmq0" ,
43+ "rabbit@node1" , "rabbitmq1" ,
44+ "rabbit@node2" , "rabbitmq2" );
4945
50- private static Process executeCommand (String command ) {
46+ private static ProcessState executeCommand (String command ) {
5147 return executeCommand (command , false );
5248 }
5349
54- private static Process executeCommand (String command , boolean ignoreError ) {
55- try {
56- Process pr = executeCommandProcess (command );
57-
58- int ev = waitForExitValue (pr );
59- if (ev != 0 && !ignoreError ) {
60- String stdout = capture (pr .getInputStream ());
61- String stderr = capture (pr .getErrorStream ());
62- throw new IOException (
63- "unexpected command exit value: "
64- + ev
65- + "\n command: "
66- + command
67- + "\n "
68- + "\n stdout:\n "
69- + stdout
70- + "\n stderr:\n "
71- + stderr
72- + "\n " );
73- }
74- return pr ;
75- } catch (IOException e ) {
76- throw new RuntimeException (e );
50+ private static ProcessState executeCommand (String command , boolean ignoreError ) {
51+ Process pr = executeCommandProcess (command );
52+ InputStreamPumpState inputState = new InputStreamPumpState (pr .getInputStream ());
53+ InputStreamPumpState errorState = new InputStreamPumpState (pr .getErrorStream ());
54+
55+ int ev = waitForExitValue (pr , inputState , errorState );
56+ inputState .pump ();
57+ errorState .pump ();
58+ if (ev != 0 && !ignoreError ) {
59+ throw new RuntimeException (
60+ "unexpected command exit value: "
61+ + ev
62+ + "\n command: "
63+ + command
64+ + "\n "
65+ + "\n stdout:\n "
66+ + inputState .buffer .toString ()
67+ + "\n stderr:\n "
68+ + errorState .buffer .toString ()
69+ + "\n " );
7770 }
71+ return new ProcessState (inputState );
7872 }
7973
80- public static String hostname () throws IOException {
81- Process process = executeCommand ("hostname" );
82- return capture (process .getInputStream ()).trim ();
74+ public static String hostname () {
75+ return executeCommand ("hostname" ).output ();
8376 }
8477
85- private static int waitForExitValue (Process pr ) {
78+ private static int waitForExitValue (
79+ Process pr , InputStreamPumpState inputState , InputStreamPumpState errorState ) {
8680 while (true ) {
8781 try {
82+ inputState .pump ();
83+ errorState .pump ();
8884 pr .waitFor ();
8985 break ;
9086 } catch (InterruptedException ignored ) {
@@ -93,7 +89,7 @@ private static int waitForExitValue(Process pr) {
9389 return pr .exitValue ();
9490 }
9591
96- private static Process executeCommandProcess (String command ) throws IOException {
92+ private static Process executeCommandProcess (String command ) {
9793 String [] finalCommand ;
9894 if (System .getProperty ("os.name" ).toLowerCase ().contains ("windows" )) {
9995 finalCommand = new String [4 ];
@@ -107,70 +103,59 @@ private static Process executeCommandProcess(String command) throws IOException
107103 finalCommand [1 ] = "-c" ;
108104 finalCommand [2 ] = command ;
109105 }
110- return Runtime .getRuntime ().exec (finalCommand );
106+ try {
107+ return Runtime .getRuntime ().exec (finalCommand );
108+ } catch (IOException e ) {
109+ throw new RuntimeException (e );
110+ }
111111 }
112112
113- public static Process rabbitmqctl (String command ) throws IOException {
113+ public static ProcessState rabbitmqctl (String command ) {
114114 return executeCommand (rabbitmqctlCommand () + " " + command );
115115 }
116116
117- static Process rabbitmqStreams (String command ) {
117+ static ProcessState rabbitmqStreams (String command ) {
118118 return executeCommand (rabbitmqStreamsCommand () + " " + command );
119119 }
120120
121- public static Process rabbitmqctlIgnoreError (String command ) {
121+ public static ProcessState rabbitmqctlIgnoreError (String command ) {
122122 return executeCommand (rabbitmqctlCommand () + " " + command , true );
123123 }
124124
125- public static Process killConnection (String connectionName ) {
126- try {
127- List <ConnectionInfo > cs = listConnections ();
128- if (cs .stream ().filter (c -> connectionName .equals (c .clientProvidedName ())).count () != 1 ) {
129- throw new IllegalArgumentException (
130- format (
131- "Could not find 1 connection '%s' in stream connections: %s" ,
132- connectionName ,
133- cs .stream ()
134- .map (ConnectionInfo ::clientProvidedName )
135- .collect (Collectors .joining (", " ))));
136- }
137- return rabbitmqctl ("eval 'rabbit_stream:kill_connection(\" " + connectionName + "\" ).'" );
138- } catch (IOException e ) {
139- throw new RuntimeException (e );
125+ public static ProcessState killConnection (String connectionName ) {
126+ List <ConnectionInfo > cs = listConnections ();
127+ if (cs .stream ().filter (c -> connectionName .equals (c .clientProvidedName ())).count () != 1 ) {
128+ throw new IllegalArgumentException (
129+ format (
130+ "Could not find 1 connection '%s' in stream connections: %s" ,
131+ connectionName ,
132+ cs .stream ()
133+ .map (ConnectionInfo ::clientProvidedName )
134+ .collect (Collectors .joining (", " ))));
140135 }
136+ return rabbitmqctl ("eval 'rabbit_stream:kill_connection(\" " + connectionName + "\" ).'" );
141137 }
142138
143139 public static List <ConnectionInfo > listConnections () {
144- try {
145- Process process =
146- rabbitmqctl ("list_stream_connections -q --formatter table conn_name,client_properties" );
147- List <ConnectionInfo > connectionInfoList = Collections .emptyList ();
148- if (process .exitValue () != 0 ) {
149- LOGGER .warn (
150- "Error while trying to list stream connections. Standard output: {}, error output: {}" ,
151- capture (process .getInputStream ()),
152- capture (process .getErrorStream ()));
153- return connectionInfoList ;
154- }
155- String content = capture (process .getInputStream ());
156- String [] lines = content .split (System .getProperty ("line.separator" ));
157- if (lines .length > 1 ) {
158- connectionInfoList = new ArrayList <>(lines .length - 1 );
159- for (int i = 1 ; i < lines .length ; i ++) {
160- String line = lines [i ];
161- String [] fields = line .split ("\t " );
162- String connectionName = fields [0 ];
163- Map <String , String > clientProperties = Collections .emptyMap ();
164- if (fields .length > 1 && fields [1 ].length () > 1 ) {
165- clientProperties = buildClientProperties (fields );
166- }
167- connectionInfoList .add (new ConnectionInfo (connectionName , clientProperties ));
140+ ProcessState process =
141+ rabbitmqctl ("list_stream_connections -q --formatter table conn_name,client_properties" );
142+ List <ConnectionInfo > connectionInfoList = Collections .emptyList ();
143+ String content = process .output ();
144+ String [] lines = content .split (System .lineSeparator ());
145+ if (lines .length > 1 ) {
146+ connectionInfoList = new ArrayList <>(lines .length - 1 );
147+ for (int i = 1 ; i < lines .length ; i ++) {
148+ String line = lines [i ];
149+ String [] fields = line .split ("\t " );
150+ String connectionName = fields [0 ];
151+ Map <String , String > clientProperties = Collections .emptyMap ();
152+ if (fields .length > 1 && fields [1 ].length () > 1 ) {
153+ clientProperties = buildClientProperties (fields );
168154 }
155+ connectionInfoList .add (new ConnectionInfo (connectionName , clientProperties ));
169156 }
170- return connectionInfoList ;
171- } catch (IOException e ) {
172- throw new RuntimeException (e );
173157 }
158+ return connectionInfoList ;
174159 }
175160
176161 private static Map <String , String > buildClientProperties (String [] fields ) {
@@ -201,32 +186,26 @@ public static void restartStream(String stream) {
201186 rabbitmqStreams (" restart_stream " + stream );
202187 }
203188
204- public static Process killStreamLeaderProcess (String stream ) {
205- try {
206- return rabbitmqctl (
207- "eval 'case rabbit_stream_manager:lookup_leader(<<\" /\" >>, <<\" "
208- + stream
209- + "\" >>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'" );
210- } catch (IOException e ) {
211- throw new RuntimeException (e );
212- }
189+ public static void killStreamLeaderProcess (String stream ) {
190+ rabbitmqctl (
191+ "eval 'case rabbit_stream_manager:lookup_leader(<<\" /\" >>, <<\" "
192+ + stream
193+ + "\" >>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'" );
213194 }
214195
215- public static void addUser (String username , String password ) throws IOException {
196+ public static void addUser (String username , String password ) {
216197 rabbitmqctl (format ("add_user %s %s" , username , password ));
217198 }
218199
219- public static void setPermissions (String username , List <String > permissions ) throws IOException {
200+ public static void setPermissions (String username , List <String > permissions ) {
220201 setPermissions (username , "/" , permissions );
221202 }
222203
223- public static void setPermissions (String username , String vhost , String permission )
224- throws IOException {
204+ public static void setPermissions (String username , String vhost , String permission ) {
225205 setPermissions (username , vhost , asList (permission , permission , permission ));
226206 }
227207
228- public static void setPermissions (String username , String vhost , List <String > permissions )
229- throws IOException {
208+ public static void setPermissions (String username , String vhost , List <String > permissions ) {
230209 if (permissions .size () != 3 ) {
231210 throw new IllegalArgumentException ();
232211 }
@@ -236,23 +215,23 @@ public static void setPermissions(String username, String vhost, List<String> pe
236215 vhost , username , permissions .get (0 ), permissions .get (1 ), permissions .get (2 )));
237216 }
238217
239- public static void changePassword (String username , String newPassword ) throws IOException {
218+ public static void changePassword (String username , String newPassword ) {
240219 rabbitmqctl (format ("change_password %s %s" , username , newPassword ));
241220 }
242221
243- public static void deleteUser (String username ) throws IOException {
222+ public static void deleteUser (String username ) {
244223 rabbitmqctl (format ("delete_user %s" , username ));
245224 }
246225
247- public static void addVhost (String vhost ) throws IOException {
226+ public static void addVhost (String vhost ) {
248227 rabbitmqctl ("add_vhost " + vhost );
249228 }
250229
251- public static void deleteVhost (String vhost ) throws Exception {
230+ public static void deleteVhost (String vhost ) {
252231 rabbitmqctl ("delete_vhost " + vhost );
253232 }
254233
255- public static void setEnv (String parameter , String value ) throws IOException {
234+ public static void setEnv (String parameter , String value ) {
256235 rabbitmqctl (format ("eval 'application:set_env(rabbitmq_stream, %s, %s).'" , parameter , value ));
257236 }
258237
@@ -334,6 +313,68 @@ public static boolean isOnDocker() {
334313 return rabbitmqCtl .startsWith (DOCKER_PREFIX );
335314 }
336315
316+ public static List <String > nodes () {
317+ List <String > clusterNodes = new ArrayList <>();
318+ clusterNodes .add (rabbitmqctl ("eval 'node().'" ).output ().trim ());
319+ List <String > nodes =
320+ Arrays .stream (
321+ rabbitmqctl ("eval 'nodes().'" )
322+ .output ()
323+ .replace ("[" , "" )
324+ .replace ("]" , "" )
325+ .split ("," ))
326+ .map (String ::trim )
327+ .collect (Collectors .toList ());
328+ clusterNodes .addAll (nodes );
329+ return List .copyOf (clusterNodes );
330+ }
331+
332+ public static void restartNode (String node ) {
333+ String container = nodeToDockerContainer (node );
334+ String dockerCommand = "docker exec " + container + " " ;
335+ String rabbitmqUpgradeCommand = dockerCommand + "rabbitmq-upgrade " ;
336+ executeCommand (rabbitmqUpgradeCommand + "await_online_quorum_plus_one -t 300" );
337+ executeCommand (rabbitmqUpgradeCommand + "drain" );
338+ executeCommand ("docker stop " + container );
339+ executeCommand ("docker start " + container );
340+ String otherContainer =
341+ DOCKER_NODES_TO_CONTAINERS .values ().stream ()
342+ .filter (c -> !c .endsWith (container ))
343+ .findAny ()
344+ .get ();
345+ executeCommand (
346+ "docker exec "
347+ + otherContainer
348+ + " rabbitmqctl await_online_nodes "
349+ + DOCKER_NODES_TO_CONTAINERS .size ());
350+ executeCommand (dockerCommand + "rabbitmqctl status" );
351+ }
352+
353+ public static void rebalance () {
354+ rabbitmqQueues ("rebalance all" );
355+ }
356+
357+ static ProcessState rabbitmqQueues (String command ) {
358+ return executeCommand (rabbitmqQueuesCommand () + " " + command );
359+ }
360+
361+ private static String rabbitmqQueuesCommand () {
362+ String rabbitmqctl = rabbitmqctlCommand ();
363+ int lastIndex = rabbitmqctl .lastIndexOf ("rabbitmqctl" );
364+ if (lastIndex == -1 ) {
365+ throw new IllegalArgumentException ("Not a valid rabbitqmctl command: " + rabbitmqctl );
366+ }
367+ return rabbitmqctl .substring (0 , lastIndex ) + "rabbitmq-queues" ;
368+ }
369+
370+ private static String nodeToDockerContainer (String node ) {
371+ String containerId = DOCKER_NODES_TO_CONTAINERS .get (node );
372+ if (containerId == null ) {
373+ throw new IllegalArgumentException ("No container for node " + node );
374+ }
375+ return containerId ;
376+ }
377+
337378 private static final class CallableAutoCloseable implements AutoCloseable {
338379
339380 private final Callable <Void > end ;
@@ -378,4 +419,40 @@ public String toString() {
378419 + '}' ;
379420 }
380421 }
422+
423+ public static class ProcessState {
424+
425+ private final InputStreamPumpState inputState ;
426+
427+ ProcessState (InputStreamPumpState inputState ) {
428+ this .inputState = inputState ;
429+ }
430+
431+ public String output () {
432+ return inputState .buffer .toString ();
433+ }
434+ }
435+
436+ private static class InputStreamPumpState {
437+
438+ private final BufferedReader reader ;
439+ private final StringBuilder buffer ;
440+
441+ private InputStreamPumpState (InputStream in ) {
442+ this .reader = new BufferedReader (new InputStreamReader (in ));
443+ this .buffer = new StringBuilder ();
444+ }
445+
446+ void pump () {
447+ String line ;
448+ while (true ) {
449+ try {
450+ if ((line = reader .readLine ()) == null ) break ;
451+ } catch (IOException e ) {
452+ throw new RuntimeException (e );
453+ }
454+ buffer .append (line ).append ("\n " );
455+ }
456+ }
457+ }
381458}
0 commit comments