Skip to content

Commit dc51a8c

Browse files
garyrussellartembilan
authored andcommitted
GH-3526: Fix Infinite Loop in FailoverCConnFactory
Resolves #3526 `FailoverClientConnectionFactory` The logic to detect we had iterated over all factories and including the one from which the previous connection was established was incorrect, causing an infite loop until one of the factory connections was successful. Change the logic to detect we have reset the iterator and the current failure is from the same factory as the one from which the previous connection was established. **cherry-pick to 5.4.x, 5.3.x** * Add diagnostics. * Fix race in test. * More race fixes and diagnostics. * Remove diagnostics.
1 parent a7843af commit dc51a8c

File tree

3 files changed

+66
-17
lines changed

3 files changed

+66
-17
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -264,7 +264,7 @@ private synchronized void findAConnection() throws InterruptedException {
264264
if (!this.factoryIterator.hasNext()) {
265265
this.factoryIterator = this.connectionFactories.iterator();
266266
}
267-
boolean retried = false;
267+
boolean restartedList = false;
268268
while (!success) {
269269
try {
270270
nextFactory = this.factoryIterator.next();
@@ -282,17 +282,18 @@ private synchronized void findAConnection() throws InterruptedException {
282282
+ e.toString()
283283
+ ", trying another");
284284
}
285+
if (restartedList && (lastFactoryToTry == null || lastFactoryToTry.equals(nextFactory))) {
286+
logger.debug("Failover failed to find a connection");
287+
/*
288+
* We've tried every factory including the
289+
* one the current connection was on.
290+
*/
291+
this.open = false;
292+
throw e;
293+
}
285294
if (!this.factoryIterator.hasNext()) {
286-
if (retried && (lastFactoryToTry == null || lastFactoryToTry.equals(nextFactory))) {
287-
/*
288-
* We've tried every factory including the
289-
* one the current connection was on.
290-
*/
291-
this.open = false;
292-
throw e;
293-
}
294295
this.factoryIterator = this.connectionFactories.iterator();
295-
retried = true;
296+
restartedList = true;
296297
}
297298
}
298299
}

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,10 +316,14 @@ public TcpListener getListener() {
316316
return this.testListener;
317317
}
318318
if (this.manualListenerRegistration && !this.testFailed) {
319-
if (this.logger.isDebugEnabled()) {
319+
boolean debugEnabled = this.logger.isDebugEnabled();
320+
if (debugEnabled) {
320321
this.logger.debug(getConnectionId() + " Waiting for listener registration");
321322
}
322323
waitForListenerRegistration();
324+
if (debugEnabled) {
325+
this.logger.debug(getConnectionId() + " Listener registered");
326+
}
323327
}
324328
return this.listener;
325329
}

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828
import static org.mockito.Mockito.times;
2929
import static org.mockito.Mockito.when;
3030

31+
import java.io.BufferedReader;
3132
import java.io.IOException;
33+
import java.io.InputStreamReader;
3234
import java.io.UncheckedIOException;
35+
import java.net.ServerSocket;
3336
import java.net.Socket;
3437
import java.nio.channels.SocketChannel;
3538
import java.util.ArrayList;
@@ -41,7 +44,9 @@
4144
import java.util.concurrent.atomic.AtomicInteger;
4245
import java.util.concurrent.atomic.AtomicReference;
4346

44-
import org.junit.Test;
47+
import javax.net.ServerSocketFactory;
48+
49+
import org.junit.jupiter.api.Test;
4550
import org.mockito.InOrder;
4651
import org.mockito.Mockito;
4752

@@ -62,6 +67,7 @@
6267
import org.springframework.messaging.MessageChannel;
6368
import org.springframework.messaging.SubscribableChannel;
6469
import org.springframework.messaging.support.GenericMessage;
70+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
6571

6672
/**
6773
* @author Gary Russell
@@ -173,7 +179,7 @@ private void testRefreshShared(boolean closeOnRefresh, long interval) throws Exc
173179
inOrder.verifyNoMoreInteractions();
174180
}
175181

176-
@Test(expected = UncheckedIOException.class)
182+
@Test
177183
public void testFailoverAllDead() throws Exception {
178184
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
179185
AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class);
@@ -193,10 +199,47 @@ public void testFailoverAllDead() throws Exception {
193199
FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories);
194200
failoverFactory.start();
195201
GenericMessage<String> message = new GenericMessage<String>("foo");
196-
failoverFactory.getConnection().send(message);
202+
assertThatExceptionOfType(UncheckedIOException.class).isThrownBy(() ->
203+
failoverFactory.getConnection().send(message));
197204
Mockito.verify(conn2).send(message);
198205
}
199206

207+
@Test
208+
void failoverAllDeadAfterSuccess() throws Exception {
209+
ServerSocket ss1 = ServerSocketFactory.getDefault().createServerSocket(0);
210+
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
211+
exec.initialize();
212+
exec.submit(() -> {
213+
Socket accepted = ss1.accept();
214+
BufferedReader br = new BufferedReader(new InputStreamReader(accepted.getInputStream()));
215+
br.readLine();
216+
accepted.getOutputStream().write("ok\r\n".getBytes());
217+
accepted.close();
218+
ss1.close();
219+
return true;
220+
});
221+
TcpNetClientConnectionFactory cf1 = new TcpNetClientConnectionFactory("localhost", ss1.getLocalPort());
222+
AbstractClientConnectionFactory cf2 = mock(AbstractClientConnectionFactory.class);
223+
doThrow(new UncheckedIOException(new IOException("fail"))).when(cf2).getConnection();
224+
CountDownLatch latch = new CountDownLatch(2);
225+
cf1.setApplicationEventPublisher(event -> {
226+
if (event instanceof TcpConnectionCloseEvent) {
227+
latch.countDown();
228+
}
229+
});
230+
cf2.setApplicationEventPublisher(event -> { });
231+
FailoverClientConnectionFactory fccf = new FailoverClientConnectionFactory(List.of(cf1, cf2));
232+
fccf.registerListener(msf -> {
233+
latch.countDown();
234+
return false;
235+
});
236+
fccf.start();
237+
fccf.getConnection().send(new GenericMessage<>("test"));
238+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
239+
assertThatExceptionOfType(UncheckedIOException.class).isThrownBy(() ->
240+
fccf.getConnection().send(new GenericMessage<>("test")));
241+
}
242+
200243
@Test
201244
public void testFailoverAllDeadButOriginalOkAgain() throws Exception {
202245
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
@@ -228,7 +271,7 @@ public void testFailoverAllDeadButOriginalOkAgain() throws Exception {
228271
Mockito.verify(conn1, times(2)).send(message);
229272
}
230273

231-
@Test(expected = UncheckedIOException.class)
274+
@Test
232275
public void testFailoverConnectNone() throws Exception {
233276
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
234277
AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class);
@@ -242,7 +285,8 @@ public void testFailoverConnectNone() throws Exception {
242285
FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories);
243286
failoverFactory.start();
244287
GenericMessage<String> message = new GenericMessage<String>("foo");
245-
failoverFactory.getConnection().send(message);
288+
assertThatExceptionOfType(UncheckedIOException.class).isThrownBy(() ->
289+
failoverFactory.getConnection().send(message));
246290
}
247291

248292
@Test

0 commit comments

Comments
 (0)