44import io .rsocket .RSocket ;
55import io .rsocket .util .ByteBufPayload ;
66import io .scalecube .services .api .ServiceMessage ;
7- import io .scalecube .services .exceptions .ConnectionClosedException ;
87import io .scalecube .services .transport .api .ClientChannel ;
98import io .scalecube .services .transport .api .ServiceMessageCodec ;
109import java .lang .reflect .Type ;
11- import java .nio .channels .ClosedChannelException ;
1210import org .reactivestreams .Publisher ;
1311import reactor .core .publisher .Flux ;
1412import reactor .core .publisher .Mono ;
1513
1614public class RSocketClientChannel implements ClientChannel {
1715
18- private Mono <RSocket > rsocket ;
19- private ServiceMessageCodec messageCodec ;
16+ private final Mono <RSocket > rsocket ;
17+ private final ServiceMessageCodec messageCodec ;
2018
2119 public RSocketClientChannel (Mono <RSocket > rsocket , ServiceMessageCodec codec ) {
2220 this .rsocket = rsocket ;
@@ -26,27 +24,15 @@ public RSocketClientChannel(Mono<RSocket> rsocket, ServiceMessageCodec codec) {
2624 @ Override
2725 public Mono <ServiceMessage > requestResponse (ServiceMessage message , Type responseType ) {
2826 return rsocket
29- .flatMap (
30- rsocket ->
31- rsocket
32- .requestResponse (toPayload (message ))
33- .onErrorMap (
34- ClosedChannelException .class ,
35- e -> new ConnectionClosedException ("Connection closed" )))
27+ .flatMap (rsocket -> rsocket .requestResponse (toPayload (message )))
3628 .map (this ::toMessage )
3729 .map (msg -> ServiceMessageCodec .decodeData (msg , responseType ));
3830 }
3931
4032 @ Override
4133 public Flux <ServiceMessage > requestStream (ServiceMessage message , Type responseType ) {
4234 return rsocket
43- .flatMapMany (
44- rsocket ->
45- rsocket
46- .requestStream (toPayload (message ))
47- .onErrorMap (
48- ClosedChannelException .class ,
49- e -> new ConnectionClosedException ("Connection closed" )))
35+ .flatMapMany (rsocket -> rsocket .requestStream (toPayload (message )))
5036 .map (this ::toMessage )
5137 .map (msg -> ServiceMessageCodec .decodeData (msg , responseType ));
5238 }
@@ -55,13 +41,7 @@ public Flux<ServiceMessage> requestStream(ServiceMessage message, Type responseT
5541 public Flux <ServiceMessage > requestChannel (
5642 Publisher <ServiceMessage > publisher , Type responseType ) {
5743 return rsocket
58- .flatMapMany (
59- rsocket ->
60- rsocket
61- .requestChannel (Flux .from (publisher ).map (this ::toPayload ))
62- .onErrorMap (
63- ClosedChannelException .class ,
64- e -> new ConnectionClosedException ("Connection closed" )))
44+ .flatMapMany (rsocket -> rsocket .requestChannel (Flux .from (publisher ).map (this ::toPayload )))
6545 .map (this ::toMessage )
6646 .map (msg -> ServiceMessageCodec .decodeData (msg , responseType ));
6747 }
0 commit comments