44import io .netty .channel .ChannelOption ;
55import io .netty .handler .codec .http .HttpHeaderNames ;
66import io .netty .handler .codec .http .websocketx .PingWebSocketFrame ;
7+ import io .netty .resolver .DefaultAddressResolverGroup ;
78import io .scalecube .services .Address ;
89import io .scalecube .services .ServiceReference ;
910import io .scalecube .services .api .ServiceMessage ;
2627import reactor .netty .Connection ;
2728import reactor .netty .http .client .HttpClient ;
2829import reactor .netty .resources .ConnectionProvider ;
29- import reactor .netty .resources .LoopResources ;
3030
3131public final class WebsocketGatewayClientTransport implements ClientChannel , ClientTransport {
3232
@@ -39,10 +39,8 @@ public final class WebsocketGatewayClientTransport implements ClientChannel, Cli
3939 private static final int CONNECT_TIMEOUT_MILLIS = (int ) Duration .ofSeconds (5 ).toMillis ();
4040
4141 private final GatewayClientCodec clientCodec ;
42- private final LoopResources loopResources ;
4342 private final Duration keepAliveInterval ;
4443 private final Function <HttpClient , HttpClient > operator ;
45- private final boolean ownsLoopResources ;
4644
4745 private final AtomicLong sidCounter = new AtomicLong ();
4846 private final AtomicReference <WebsocketGatewayClientSession > clientSessionReference =
@@ -52,11 +50,6 @@ private WebsocketGatewayClientTransport(Builder builder) {
5250 this .clientCodec = builder .clientCodec ;
5351 this .keepAliveInterval = builder .keepAliveInterval ;
5452 this .operator = builder .operator ;
55- this .loopResources =
56- builder .loopResources == null
57- ? LoopResources .create ("websocket-gateway-client" , 1 , true )
58- : builder .loopResources ;
59- this .ownsLoopResources = builder .loopResources == null ;
6053 }
6154
6255 @ Override
@@ -70,7 +63,7 @@ public ClientChannel create(ServiceReference serviceReference) {
7063 final HttpClient httpClient =
7164 operator .apply (
7265 HttpClient .create (ConnectionProvider .newConnection ())
73- .runOn ( loopResources )
66+ .resolver ( DefaultAddressResolverGroup . INSTANCE )
7467 .option (ChannelOption .CONNECT_TIMEOUT_MILLIS , CONNECT_TIMEOUT_MILLIS )
7568 .option (ChannelOption .TCP_NODELAY , true )
7669 .headers (headers -> headers .set (HttpHeaderNames .CONTENT_TYPE , CONTENT_TYPE )));
@@ -192,15 +185,15 @@ private static Throwable getRootCause(Throwable throwable) {
192185
193186 @ Override
194187 public void close () {
195- if (ownsLoopResources ) {
196- loopResources .dispose ();
188+ final var session = clientSessionReference .get ();
189+ if (session != null ) {
190+ session .close ().doOnError (ex -> {}).subscribe ();
197191 }
198192 }
199193
200194 public static class Builder {
201195
202196 private GatewayClientCodec clientCodec = CLIENT_CODEC ;
203- private LoopResources loopResources ;
204197 private Duration keepAliveInterval = Duration .ZERO ;
205198 private Function <HttpClient , HttpClient > operator = client -> client ;
206199
@@ -211,11 +204,6 @@ public Builder clientCodec(GatewayClientCodec clientCodec) {
211204 return this ;
212205 }
213206
214- public Builder loopResources (LoopResources loopResources ) {
215- this .loopResources = loopResources ;
216- return this ;
217- }
218-
219207 public Builder httpClient (UnaryOperator <HttpClient > operator ) {
220208 this .operator = this .operator .andThen (operator );
221209 return this ;
0 commit comments