@@ -125,21 +125,26 @@ public static ProcessState rabbitmqctlIgnoreError(String command) {
125125
126126 public static ProcessState killConnection (String connectionName ) {
127127 List <ConnectionInfo > cs = listConnections ();
128- if (cs .stream ().filter (c -> connectionName .equals (c .clientProvidedName ())).count () != 1 ) {
129- throw new IllegalArgumentException (
130- format (
131- "Could not find 1 connection '%s' in stream connections: %s" ,
132- connectionName ,
133- cs .stream ()
134- .map (ConnectionInfo ::clientProvidedName )
135- .collect (Collectors .joining (", " ))));
136- }
137- return rabbitmqctl ("eval 'rabbit_stream:kill_connection(\" " + connectionName + "\" ).'" );
128+ String pid =
129+ cs .stream ()
130+ .filter (c -> connectionName .equals (c .clientProvidedName ()))
131+ .findFirst ()
132+ .orElseThrow (
133+ () ->
134+ new IllegalArgumentException (
135+ format (
136+ "Could not find 1 connection '%s' in stream connections: %s" ,
137+ connectionName ,
138+ cs .stream ()
139+ .map (ConnectionInfo ::clientProvidedName )
140+ .collect (Collectors .joining (", " )))))
141+ .pid ;
142+ return rabbitmqctl (String .format ("eval 'exit(rabbit_misc:string_to_pid(\" %s\" ), kill).'" , pid ));
138143 }
139144
140145 public static List <ConnectionInfo > listConnections () {
141146 ProcessState process =
142- rabbitmqctl ("list_stream_connections -q --formatter table conn_name,client_properties" );
147+ rabbitmqctl ("list_stream_connections -q --formatter table conn_name,pid, client_properties" );
143148 List <ConnectionInfo > connectionInfoList = Collections .emptyList ();
144149 String content = process .output ();
145150 String [] lines = content .split (System .lineSeparator ());
@@ -149,11 +154,12 @@ public static List<ConnectionInfo> listConnections() {
149154 String line = lines [i ];
150155 String [] fields = line .split ("\t " );
151156 String connectionName = fields [0 ];
157+ String pid = fields [1 ];
152158 Map <String , String > clientProperties = Collections .emptyMap ();
153- if (fields .length > 1 && fields [1 ].length () > 1 ) {
159+ if (fields .length > 2 && fields [2 ].length () > 1 ) {
154160 clientProperties = buildClientProperties (fields );
155161 }
156- connectionInfoList .add (new ConnectionInfo (connectionName , clientProperties ));
162+ connectionInfoList .add (new ConnectionInfo (connectionName , pid , clientProperties ));
157163 }
158164 }
159165 return connectionInfoList ;
@@ -180,7 +186,7 @@ public static List<ConnectionInfo> listProducerConnections() {
180186 }
181187
182188 private static Map <String , String > buildClientProperties (String [] fields ) {
183- String clientPropertiesString = fields [1 ];
189+ String clientPropertiesString = fields [2 ];
184190 clientPropertiesString =
185191 clientPropertiesString
186192 .replace ("[" , "" )
@@ -446,11 +452,12 @@ public void close() throws Exception {
446452
447453 public static class ConnectionInfo {
448454
449- private final String name ;
455+ private final String name , pid ;
450456 private final Map <String , String > clientProperties ;
451457
452- public ConnectionInfo (String name , Map <String , String > clientProperties ) {
458+ public ConnectionInfo (String name , String pid , Map <String , String > clientProperties ) {
453459 this .name = name ;
460+ this .pid = pid ;
454461 this .clientProperties = clientProperties ;
455462 }
456463
0 commit comments