@@ -171,18 +171,8 @@ public String getPassword() {
171171 @ Test public void thatShutdownHooksOnConnectionFireBeforeRecoveryStarts () throws IOException , InterruptedException {
172172 final List <String > events = new CopyOnWriteArrayList <String >();
173173 final CountDownLatch latch = new CountDownLatch (3 ); // one when started, another when complete
174- connection .addShutdownListener (new ShutdownListener () {
175- @ Override
176- public void shutdownCompleted (ShutdownSignalException cause ) {
177- events .add ("shutdown hook 1" );
178- }
179- });
180- connection .addShutdownListener (new ShutdownListener () {
181- @ Override
182- public void shutdownCompleted (ShutdownSignalException cause ) {
183- events .add ("shutdown hook 2" );
184- }
185- });
174+ connection .addShutdownListener (cause -> events .add ("shutdown hook 1" ));
175+ connection .addShutdownListener (cause -> events .add ("shutdown hook 2" ));
186176 // note: we do not want to expose RecoveryCanBeginListener so this
187177 // test does not use it
188178 final CountDownLatch recoveryCanBeginLatch = new CountDownLatch (1 );
@@ -220,12 +210,7 @@ public void handleTopologyRecoveryStarted(Recoverable recoverable) {
220210
221211 @ Test public void shutdownHooksRecoveryOnConnection () throws IOException , InterruptedException {
222212 final CountDownLatch latch = new CountDownLatch (2 );
223- connection .addShutdownListener (new ShutdownListener () {
224- @ Override
225- public void shutdownCompleted (ShutdownSignalException cause ) {
226- latch .countDown ();
227- }
228- });
213+ connection .addShutdownListener (cause -> latch .countDown ());
229214 assertThat (connection .isOpen ()).isTrue ();
230215 closeAndWaitForRecovery ();
231216 assertThat (connection .isOpen ()).isTrue ();
@@ -235,12 +220,7 @@ public void shutdownCompleted(ShutdownSignalException cause) {
235220
236221 @ Test public void shutdownHooksRecoveryOnChannel () throws IOException , InterruptedException {
237222 final CountDownLatch latch = new CountDownLatch (3 );
238- channel .addShutdownListener (new ShutdownListener () {
239- @ Override
240- public void shutdownCompleted (ShutdownSignalException cause ) {
241- latch .countDown ();
242- }
243- });
223+ channel .addShutdownListener (cause -> latch .countDown ());
244224 assertThat (connection .isOpen ()).isTrue ();
245225 closeAndWaitForRecovery ();
246226 assertThat (connection .isOpen ()).isTrue ();
@@ -254,12 +234,12 @@ public void shutdownCompleted(ShutdownSignalException cause) {
254234 final CountDownLatch latch = new CountDownLatch (2 );
255235 connection .addBlockedListener (new BlockedListener () {
256236 @ Override
257- public void handleBlocked (String reason ) throws IOException {
237+ public void handleBlocked (String reason ) {
258238 latch .countDown ();
259239 }
260240
261241 @ Override
262- public void handleUnblocked () throws IOException {
242+ public void handleUnblocked () {
263243 latch .countDown ();
264244 }
265245 });
@@ -299,14 +279,8 @@ public void handleUnblocked() throws IOException {
299279
300280 @ Test public void returnListenerRecovery () throws IOException , InterruptedException {
301281 final CountDownLatch latch = new CountDownLatch (1 );
302- channel .addReturnListener (new ReturnListener () {
303- @ Override
304- public void handleReturn (int replyCode , String replyText , String exchange ,
305- String routingKey , AMQP .BasicProperties properties ,
306- byte [] body ) throws IOException {
307- latch .countDown ();
308- }
309- });
282+ channel .addReturnListener (
283+ (replyCode , replyText , exchange , routingKey , properties , body ) -> latch .countDown ());
310284 closeAndWaitForRecovery ();
311285 expectChannelRecovery (channel );
312286 channel .basicPublish ("" , "unknown" , true , false , null , "mandatory1" .getBytes ());
@@ -317,12 +291,12 @@ public void handleReturn(int replyCode, String replyText, String exchange,
317291 final CountDownLatch latch = new CountDownLatch (1 );
318292 channel .addConfirmListener (new ConfirmListener () {
319293 @ Override
320- public void handleAck (long deliveryTag , boolean multiple ) throws IOException {
294+ public void handleAck (long deliveryTag , boolean multiple ) {
321295 latch .countDown ();
322296 }
323297
324298 @ Override
325- public void handleNack (long deliveryTag , boolean multiple ) throws IOException {
299+ public void handleNack (long deliveryTag , boolean multiple ) {
326300 latch .countDown ();
327301 }
328302 });
@@ -425,13 +399,10 @@ private void testClientNamedQueueRecoveryWith(String q, boolean noWait) throws I
425399 final AtomicReference <String > nameBefore = new AtomicReference <String >(q );
426400 final AtomicReference <String > nameAfter = new AtomicReference <String >();
427401 final CountDownLatch listenerLatch = new CountDownLatch (1 );
428- ((AutorecoveringConnection )connection ).addQueueRecoveryListener (new QueueRecoveryListener () {
429- @ Override
430- public void queueRecovered (String oldName , String newName ) {
431- nameBefore .set (oldName );
432- nameAfter .set (newName );
433- listenerLatch .countDown ();
434- }
402+ ((AutorecoveringConnection )connection ).addQueueRecoveryListener ((oldName , newName ) -> {
403+ nameBefore .set (oldName );
404+ nameAfter .set (newName );
405+ listenerLatch .countDown ();
435406 });
436407 ch .queueBind (nameBefore .get (), x , "" );
437408 restartPrimaryAndWaitForRecovery ();
@@ -673,14 +644,12 @@ public void queueRecovered(String oldName, String newName) {
673644 final AtomicReference <String > tagA = new AtomicReference <String >();
674645 final AtomicReference <String > tagB = new AtomicReference <String >();
675646 final CountDownLatch listenerLatch = new CountDownLatch (n );
676- ((AutorecoveringConnection )connection ).addConsumerRecoveryListener (new ConsumerRecoveryListener () {
677- @ Override
678- public void consumerRecovered (String oldConsumerTag , String newConsumerTag ) {
647+ ((AutorecoveringConnection )connection ).addConsumerRecoveryListener (
648+ (oldConsumerTag , newConsumerTag ) -> {
679649 tagA .set (oldConsumerTag );
680650 tagB .set (newConsumerTag );
681651 listenerLatch .countDown ();
682- }
683- });
652+ });
684653
685654 assertConsumerCount (n , q );
686655 closeAndWaitForRecovery ();
@@ -830,7 +799,8 @@ public void handleDelivery(String consumerTag,
830799
831800 @ Test public void recoveryWithMultipleThreads () throws Exception {
832801 // test with 8 recovery threads
833- final ThreadPoolExecutor executor = new ThreadPoolExecutor (8 , 8 , 30 , TimeUnit .SECONDS , new LinkedBlockingQueue <Runnable >());
802+ final ThreadPoolExecutor executor = new ThreadPoolExecutor (8 , 8 , 30 , TimeUnit .SECONDS ,
803+ new LinkedBlockingQueue <>());
834804 executor .allowCoreThreadTimeOut (true );
835805 ConnectionFactory connectionFactory = buildConnectionFactoryWithRecoveryEnabled (false );
836806 assertThat (connectionFactory .getTopologyRecoveryExecutor ()).isNull ();
@@ -956,12 +926,7 @@ private static void expectExchangeRecovery(Channel ch, String x) throws IOExcept
956926
957927 private static CountDownLatch prepareForShutdown (Connection conn ) {
958928 final CountDownLatch latch = new CountDownLatch (1 );
959- conn .addShutdownListener (new ShutdownListener () {
960- @ Override
961- public void shutdownCompleted (ShutdownSignalException cause ) {
962- latch .countDown ();
963- }
964- });
929+ conn .addShutdownListener (cause -> latch .countDown ());
965930 return latch ;
966931 }
967932
0 commit comments