Skip to content

Commit 73ef688

Browse files
committed
Added back Hooks.onErrorDropped for AbortedException and ConnectionClosedException
1 parent 072fe55 commit 73ef688

File tree

2 files changed

+23
-4
lines changed

2 files changed

+23
-4
lines changed

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.netty.util.concurrent.Future;
99
import io.scalecube.services.auth.Authenticator;
1010
import io.scalecube.services.auth.CredentialsSupplier;
11+
import io.scalecube.services.exceptions.ConnectionClosedException;
1112
import io.scalecube.services.methods.ServiceMethodRegistry;
1213
import io.scalecube.services.transport.api.ClientTransport;
1314
import io.scalecube.services.transport.api.DataCodec;
@@ -18,13 +19,31 @@
1819
import java.util.StringJoiner;
1920
import java.util.concurrent.ThreadFactory;
2021
import java.util.function.Function;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
2124
import reactor.core.publisher.Flux;
25+
import reactor.core.publisher.Hooks;
2226
import reactor.core.publisher.Mono;
2327
import reactor.netty.FutureMono;
28+
import reactor.netty.channel.AbortedException;
2429
import reactor.netty.resources.LoopResources;
2530

2631
public class RSocketServiceTransport implements ServiceTransport {
2732

33+
public static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceTransport.class);
34+
35+
static {
36+
Hooks.onErrorDropped(
37+
t -> {
38+
if (AbortedException.isConnectionReset(t)
39+
|| ConnectionClosedException.isConnectionClosed(t)) {
40+
if (LOGGER.isDebugEnabled()) {
41+
LOGGER.debug("Connection aborted: {}", t.toString());
42+
}
43+
}
44+
});
45+
}
46+
2847
private int numOfWorkers = Runtime.getRuntime().availableProcessors();
2948

3049
private HeadersCodec headersCodec = HeadersCodec.DEFAULT_INSTANCE;

services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
import io.scalecube.services.api.ServiceMessage;
1111
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
1212
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
13+
import io.scalecube.services.exceptions.ConnectionClosedException;
1314
import io.scalecube.services.sut.QuoteService;
1415
import io.scalecube.services.sut.SimpleQuoteService;
15-
import java.nio.channels.ClosedChannelException;
1616
import java.time.Duration;
1717
import java.util.Optional;
1818
import java.util.concurrent.CountDownLatch;
@@ -94,7 +94,7 @@ public void test_remote_node_died_mono_never() throws Exception {
9494
TimeUnit.MILLISECONDS.sleep(100);
9595

9696
assertEquals(0, latch1.getCount());
97-
assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass());
97+
assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass());
9898
assertTrue(sub1.get().isDisposed());
9999
}
100100

@@ -122,7 +122,7 @@ public void test_remote_node_died_many_never() throws Exception {
122122
TimeUnit.MILLISECONDS.sleep(100);
123123

124124
assertEquals(0, latch1.getCount());
125-
assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass());
125+
assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass());
126126
assertTrue(sub1.get().isDisposed());
127127
}
128128

@@ -154,7 +154,7 @@ public void test_remote_node_died_many_then_never() throws Exception {
154154
TimeUnit.MILLISECONDS.sleep(100);
155155

156156
assertEquals(0, latch1.getCount());
157-
assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass());
157+
assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass());
158158
assertTrue(sub1.get().isDisposed());
159159
}
160160
}

0 commit comments

Comments
 (0)