88import io .scalecube .services .transport .api .ServiceMessageCodec ;
99import java .lang .reflect .Type ;
1010import org .reactivestreams .Publisher ;
11+ import org .slf4j .Logger ;
12+ import org .slf4j .LoggerFactory ;
1113import reactor .core .publisher .Flux ;
1214import reactor .core .publisher .Mono ;
15+ import reactor .netty .channel .AbortedException ;
1316
1417public class RSocketClientChannel implements ClientChannel {
1518
19+ private static final Logger LOGGER = LoggerFactory .getLogger (RSocketClientChannel .class );
20+
1621 private final Mono <RSocket > rsocket ;
1722 private final ServiceMessageCodec messageCodec ;
1823
@@ -26,15 +31,17 @@ public Mono<ServiceMessage> requestResponse(ServiceMessage message, Type respons
2631 return rsocket
2732 .flatMap (rsocket -> rsocket .requestResponse (toPayload (message )))
2833 .map (this ::toMessage )
29- .map (msg -> ServiceMessageCodec .decodeData (msg , responseType ));
34+ .map (msg -> ServiceMessageCodec .decodeData (msg , responseType ))
35+ .doOnError (RSocketClientChannel ::handleConnectionReset );
3036 }
3137
3238 @ Override
3339 public Flux <ServiceMessage > requestStream (ServiceMessage message , Type responseType ) {
3440 return rsocket
3541 .flatMapMany (rsocket -> rsocket .requestStream (toPayload (message )))
3642 .map (this ::toMessage )
37- .map (msg -> ServiceMessageCodec .decodeData (msg , responseType ));
43+ .map (msg -> ServiceMessageCodec .decodeData (msg , responseType ))
44+ .doOnError (RSocketClientChannel ::handleConnectionReset );
3845 }
3946
4047 @ Override
@@ -43,7 +50,8 @@ public Flux<ServiceMessage> requestChannel(
4350 return rsocket
4451 .flatMapMany (rsocket -> rsocket .requestChannel (Flux .from (publisher ).map (this ::toPayload )))
4552 .map (this ::toMessage )
46- .map (msg -> ServiceMessageCodec .decodeData (msg , responseType ));
53+ .map (msg -> ServiceMessageCodec .decodeData (msg , responseType ))
54+ .doOnError (RSocketClientChannel ::handleConnectionReset );
4755 }
4856
4957 private Payload toPayload (ServiceMessage request ) {
@@ -57,4 +65,12 @@ private ServiceMessage toMessage(Payload payload) {
5765 payload .release ();
5866 }
5967 }
68+
69+ private static void handleConnectionReset (Throwable throwable ) {
70+ if (AbortedException .isConnectionReset (throwable )) {
71+ if (LOGGER .isDebugEnabled ()) {
72+ LOGGER .debug ("[requestResponse] Connection has been reset" );
73+ }
74+ }
75+ }
6076}
0 commit comments