44import io .netty .channel .ChannelOption ;
55import io .netty .handler .codec .http .HttpHeaderNames ;
66import io .netty .handler .codec .http .websocketx .PingWebSocketFrame ;
7+ import io .netty .resolver .DefaultAddressResolverGroup ;
78import io .scalecube .services .Address ;
89import io .scalecube .services .ServiceReference ;
910import io .scalecube .services .api .ServiceMessage ;
2627import reactor .netty .Connection ;
2728import reactor .netty .http .client .HttpClient ;
2829import reactor .netty .resources .ConnectionProvider ;
29- import reactor .netty .resources .LoopResources ;
3030
3131public final class WebsocketGatewayClientTransport implements ClientChannel , ClientTransport {
3232
@@ -39,10 +39,8 @@ public final class WebsocketGatewayClientTransport implements ClientChannel, Cli
3939 private static final int CONNECT_TIMEOUT_MILLIS = (int ) Duration .ofSeconds (5 ).toMillis ();
4040
4141 private final GatewayClientCodec clientCodec ;
42- private final LoopResources loopResources ;
4342 private final Duration keepAliveInterval ;
4443 private final Function <HttpClient , HttpClient > operator ;
45- private final boolean ownsLoopResources ;
4644
4745 private final AtomicLong sidCounter = new AtomicLong ();
4846 private final AtomicReference <WebsocketGatewayClientSession > clientSessionReference =
@@ -52,11 +50,6 @@ private WebsocketGatewayClientTransport(Builder builder) {
5250 this .clientCodec = builder .clientCodec ;
5351 this .keepAliveInterval = builder .keepAliveInterval ;
5452 this .operator = builder .operator ;
55- this .loopResources =
56- builder .loopResources == null
57- ? LoopResources .create ("websocket-gateway-client" , 1 , true )
58- : builder .loopResources ;
59- this .ownsLoopResources = builder .loopResources == null ;
6053 }
6154
6255 public static Builder builder () {
@@ -74,7 +67,7 @@ public ClientChannel create(ServiceReference serviceReference) {
7467 final HttpClient httpClient =
7568 operator .apply (
7669 HttpClient .create (ConnectionProvider .newConnection ())
77- .runOn ( loopResources )
70+ .resolver ( DefaultAddressResolverGroup . INSTANCE )
7871 .option (ChannelOption .CONNECT_TIMEOUT_MILLIS , CONNECT_TIMEOUT_MILLIS )
7972 .option (ChannelOption .TCP_NODELAY , true )
8073 .headers (headers -> headers .set (HttpHeaderNames .CONTENT_TYPE , CONTENT_TYPE )));
@@ -196,15 +189,15 @@ private static Throwable getRootCause(Throwable throwable) {
196189
197190 @ Override
198191 public void close () {
199- if (ownsLoopResources ) {
200- loopResources .dispose ();
192+ final var session = clientSessionReference .get ();
193+ if (session != null ) {
194+ session .close ().doOnError (ex -> {}).subscribe ();
201195 }
202196 }
203197
204198 public static class Builder {
205199
206200 private GatewayClientCodec clientCodec = CLIENT_CODEC ;
207- private LoopResources loopResources ;
208201 private Duration keepAliveInterval = Duration .ZERO ;
209202 private Function <HttpClient , HttpClient > operator = client -> client ;
210203
@@ -215,11 +208,6 @@ public Builder clientCodec(GatewayClientCodec clientCodec) {
215208 return this ;
216209 }
217210
218- public Builder loopResources (LoopResources loopResources ) {
219- this .loopResources = loopResources ;
220- return this ;
221- }
222-
223211 public Builder httpClient (UnaryOperator <HttpClient > operator ) {
224212 this .operator = this .operator .andThen (operator );
225213 return this ;
0 commit comments