Skip to content

Commit 9b4c20d

Browse files
committed
Suggest load balancer setting in case of connection timeout
It may not always be an UnknownHostException, some environments just time out if the host is not reachable.
1 parent bb99a3a commit 9b4c20d

File tree

4 files changed

+31
-7
lines changed

4 files changed

+31
-7
lines changed

src/main/java/com/rabbitmq/stream/impl/Utils.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.rabbitmq.stream.*;
1919
import com.rabbitmq.stream.impl.Client.ClientParameters;
20+
import io.netty.channel.ConnectTimeoutException;
2021
import java.net.UnknownHostException;
2122
import java.security.cert.X509Certificate;
2223
import java.time.Duration;
@@ -157,7 +158,9 @@ static ClientFactory coordinatorClientFactory(StreamEnvironment environment) {
157158
context.key(), context1 -> new Client(context1.parameters()))
158159
.client(Utils.ClientFactoryContext.fromParameters(parametersCopy).key(context.key()));
159160
} catch (StreamException e) {
160-
if (e.getCause() != null && e.getCause() instanceof UnknownHostException) {
161+
if (e.getCause() != null
162+
&& (e.getCause() instanceof UnknownHostException
163+
|| e.getCause() instanceof ConnectTimeoutException)) {
161164
String message =
162165
e.getMessage()
163166
+ ". "

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import io.netty.buffer.ByteBuf;
4444
import io.netty.buffer.ByteBufAllocator;
4545
import io.netty.buffer.UnpooledByteBufAllocator;
46+
import io.netty.channel.ChannelOption;
47+
import io.netty.channel.ConnectTimeoutException;
4648
import java.io.ByteArrayOutputStream;
4749
import java.io.DataOutputStream;
4850
import java.net.UnknownHostException;
@@ -88,9 +90,16 @@ static boolean await(CountDownLatch latch, Duration timeout) {
8890

8991
@Test
9092
void connectionErrorShouldReturnStreamExceptionForStackTrace() {
91-
assertThatThrownBy(() -> cf.get((new ClientParameters().host(UUID.randomUUID().toString()))))
93+
assertThatThrownBy(
94+
() ->
95+
cf.get(
96+
new ClientParameters()
97+
.host(UUID.randomUUID().toString())
98+
.bootstrapCustomizer(
99+
b -> b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1_000))))
92100
.isInstanceOf(StreamException.class)
93-
.hasCauseInstanceOf(UnknownHostException.class);
101+
.cause()
102+
.isInstanceOfAny(ConnectTimeoutException.class, UnknownHostException.class);
94103
}
95104

96105
@Test

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import com.rabbitmq.stream.impl.TestUtils.BrokerVersion;
2828
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
2929
import com.rabbitmq.stream.impl.TestUtils.DisabledIfRabbitMqCtlNotSet;
30+
import io.netty.channel.ChannelOption;
31+
import io.netty.channel.ConnectTimeoutException;
3032
import io.netty.channel.EventLoopGroup;
3133
import java.net.UnknownHostException;
3234
import java.time.Duration;
@@ -971,6 +973,9 @@ void creationShouldFailWithDetailsWhenUnknownHost() {
971973
environmentBuilder()
972974
.host(localhost.host())
973975
.port(localhost.port())
976+
.netty()
977+
.bootstrapCustomizer(b -> b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1_000))
978+
.environmentBuilder()
974979
.addressResolver(
975980
n ->
976981
connectionCount.getAndIncrement() == 0
@@ -982,11 +987,12 @@ void creationShouldFailWithDetailsWhenUnknownHost() {
982987
env.consumerBuilder().stream(stream)
983988
.messageHandler((context, message) -> {})
984989
.build())
985-
.hasCauseInstanceOf(UnknownHostException.class)
986990
.hasMessageContaining(
987991
"https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#understanding-connection-logic")
988992
.hasMessageContaining(
989-
"https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer");
993+
"https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer")
994+
.cause()
995+
.isInstanceOfAny(ConnectTimeoutException.class, UnknownHostException.class);
990996
}
991997
}
992998
}

src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.rabbitmq.stream.compression.Compression;
2727
import com.rabbitmq.stream.impl.MonitoringTestUtils.ProducerInfo;
2828
import com.rabbitmq.stream.impl.StreamProducer.Status;
29+
import io.netty.channel.ChannelOption;
30+
import io.netty.channel.ConnectTimeoutException;
2931
import io.netty.channel.EventLoopGroup;
3032
import java.net.UnknownHostException;
3133
import java.nio.charset.StandardCharsets;
@@ -649,18 +651,22 @@ void creationShouldFailWithDetailsWhenUnknownHost() {
649651
environmentBuilder()
650652
.host(localhost.host())
651653
.port(localhost.port())
654+
.netty()
655+
.bootstrapCustomizer(b -> b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1_000))
656+
.environmentBuilder()
652657
.addressResolver(
653658
n ->
654659
connectionCount.getAndIncrement() == 0
655660
? n
656661
: new Address(UUID.randomUUID().toString(), Client.DEFAULT_PORT));
657662
try (Environment env = builder.build()) {
658663
assertThatThrownBy(() -> env.producerBuilder().stream(stream).build())
659-
.hasCauseInstanceOf(UnknownHostException.class)
660664
.hasMessageContaining(
661665
"https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#understanding-connection-logic")
662666
.hasMessageContaining(
663-
"https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer");
667+
"https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer")
668+
.cause()
669+
.isInstanceOfAny(ConnectTimeoutException.class, UnknownHostException.class);
664670
}
665671
}
666672
}

0 commit comments

Comments
 (0)