diff --git a/build.gradle b/build.gradle index d3ea08183..10d319a11 100644 --- a/build.gradle +++ b/build.gradle @@ -29,10 +29,10 @@ subprojects { apply plugin: 'io.spring.dependency-management' apply plugin: 'com.github.sherter.google-java-format' - ext['reactor-bom.version'] = 'Dysprosium-M3' + ext['reactor-bom.version'] = 'Dysprosium-RC1' ext['logback.version'] = '1.2.3' ext['findbugs.version'] = '3.0.2' - ext['netty.version'] = '4.1.37.Final' + ext['netty.version'] = '4.1.39.Final' ext['netty-boringssl.version'] = '2.0.25.Final' ext['hdrhistogram.version'] = '2.1.10' ext['mockito.version'] = '2.25.1' diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketRouteTransportHeadersSample.java similarity index 61% rename from rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java rename to rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketRouteTransportHeadersSample.java index d3865c01b..d31223a7b 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketRouteTransportHeadersSample.java @@ -16,71 +16,61 @@ package io.rsocket.examples.transport.ws; -import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.rsocket.AbstractRSocket; import io.rsocket.ConnectionSetupPayload; -import io.rsocket.DuplexConnection; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.SocketAcceptor; import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.transport.ServerTransport; -import io.rsocket.transport.netty.WebsocketDuplexConnection; import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.WebsocketRouteTransport; import io.rsocket.util.ByteBufPayload; import java.time.Duration; import java.util.HashMap; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.core.scheduler.Schedulers; -import reactor.netty.Connection; -import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; -public class WebSocketHeadersSample { +public class WebSocketRouteTransportHeadersSample { static final Payload payload1 = ByteBufPayload.create("Hello "); public static void main(String[] args) { - ServerTransport.ConnectionAcceptor acceptor = + CloseableChannel disposableServer = RSocketFactory.receive() .frameDecoder(PayloadDecoder.ZERO_COPY) .acceptor(new SocketAcceptorImpl()) - .toConnectionAcceptor(); - - DisposableServer disposableServer = - HttpServer.create() - .host("localhost") - .port(0) - .route( - routes -> - routes.ws( - "/", - (in, out) -> { - if (in.headers().containsValue("Authorization", "test", true)) { - DuplexConnection connection = - new WebsocketDuplexConnection((Connection) in); - return acceptor.apply(connection).then(out.neverComplete()); - } - - return out.sendClose( - HttpResponseStatus.UNAUTHORIZED.code(), - HttpResponseStatus.UNAUTHORIZED.reasonPhrase()); - })) - .bindNow(); + .transport( + // Same could be done with routing transport + WebsocketRouteTransport.builder() + .filteringInbound( + headers -> headers.containsValue("Authorization", "test", true)) + .closingWithStatus(headers -> new WebSocketCloseStatus(4404, "Unauthorized")) + .observingOn("/") + .build(HttpServer.create().host("localhost").port(8080))) + .start() + .block(); WebsocketClientTransport clientTransport = - WebsocketClientTransport.create(disposableServer.host(), disposableServer.port()); + WebsocketClientTransport.create(disposableServer.address()); + MonoProcessor statusMonoProcessor = MonoProcessor.create(); clientTransport.setTransportHeaders( () -> { HashMap map = new HashMap<>(); - map.put("Authorization", "test"); + map.put("Authorization", "1"); return map; }); + clientTransport.setCloseStatusConsumer( + webSocketCloseStatusMono -> webSocketCloseStatusMono.log().subscribe(statusMonoProcessor)); + RSocket socket = RSocketFactory.connect() .keepAliveAckTimeout(Duration.ofMinutes(10)) @@ -89,17 +79,24 @@ public static void main(String[] args) { .start() .block(); - Flux.range(0, 100) - .concatMap(i -> socket.fireAndForget(payload1.retain())) - // .doOnNext(p -> { - //// System.out.println(p.getDataUtf8()); - // p.release(); - // }) - .blockLast(); + try { + Flux.range(0, 100).concatMap(i -> socket.fireAndForget(payload1.retain())).blockLast(); + + } catch (Exception e) { + System.out.println("Observed WebSocket Close Status " + statusMonoProcessor.peek()); + } + socket.dispose(); WebsocketClientTransport clientTransport2 = - WebsocketClientTransport.create(disposableServer.host(), disposableServer.port()); + WebsocketClientTransport.create(disposableServer.address()); + + clientTransport2.setTransportHeaders( + () -> { + HashMap map = new HashMap<>(); + map.put("Authorization", "test"); + return map; + }); RSocket rSocket = RSocketFactory.connect() @@ -109,7 +106,7 @@ public static void main(String[] args) { .start() .block(); - // expect error here because of closed channel + // expect normal execution here rSocket.requestResponse(payload1).block(); } @@ -121,7 +118,6 @@ public Mono accept(ConnectionSetupPayload setupPayload, RSocket reactiv @Override public Mono fireAndForget(Payload payload) { - // System.out.println(payload.getDataUtf8()); payload.release(); return Mono.empty(); } @@ -133,7 +129,7 @@ public Mono requestResponse(Payload payload) { @Override public Flux requestChannel(Publisher payloads) { - return Flux.from(payloads).subscribeOn(Schedulers.single()); + return Flux.from(payloads); } }); } diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketServerTransportHeadersSample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketServerTransportHeadersSample.java new file mode 100644 index 000000000..9bac962ad --- /dev/null +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketServerTransportHeadersSample.java @@ -0,0 +1,135 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.examples.transport.ws; + +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; +import io.rsocket.AbstractRSocket; +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.SocketAcceptor; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.WebsocketServerTransport; +import io.rsocket.util.ByteBufPayload; +import java.time.Duration; +import java.util.HashMap; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; +import reactor.core.scheduler.Schedulers; +import reactor.netty.http.server.HttpServer; + +public class WebSocketServerTransportHeadersSample { + static final Payload payload1 = ByteBufPayload.create("Hello "); + + public static void main(String[] args) { + + CloseableChannel disposableServer = + RSocketFactory.receive() + .frameDecoder(PayloadDecoder.ZERO_COPY) + .acceptor(new SocketAcceptorImpl()) + .transport( + WebsocketServerTransport.builder() + .filteringInbound( + headers -> headers.containsValue("Authorization", "test", true)) + .closingWithStatus(headers -> new WebSocketCloseStatus(4404, "Unauthorized")) + .build(HttpServer.create().host("localhost").port(8080))) + .start() + .block(); + + WebsocketClientTransport clientTransport = + WebsocketClientTransport.create(disposableServer.address()); + + MonoProcessor statusMonoProcessor = MonoProcessor.create(); + clientTransport.setTransportHeaders( + () -> { + HashMap map = new HashMap<>(); + map.put("Authorization", "1"); + return map; + }); + + clientTransport.setCloseStatusConsumer( + webSocketCloseStatusMono -> webSocketCloseStatusMono.log().subscribe(statusMonoProcessor)); + + RSocket socket = + RSocketFactory.connect() + .keepAliveAckTimeout(Duration.ofMinutes(10)) + .frameDecoder(PayloadDecoder.ZERO_COPY) + .transport(clientTransport) + .start() + .block(); + + try { + Flux.range(0, 100).concatMap(i -> socket.fireAndForget(payload1.retain())).blockLast(); + + } catch (Exception e) { + System.out.println("Observed WebSocket Close Status " + statusMonoProcessor.peek()); + } + + socket.dispose(); + + WebsocketClientTransport clientTransport2 = + WebsocketClientTransport.create(disposableServer.address()); + + clientTransport2.setTransportHeaders( + () -> { + HashMap map = new HashMap<>(); + map.put("Authorization", "test"); + return map; + }); + + RSocket rSocket = + RSocketFactory.connect() + .keepAliveAckTimeout(Duration.ofMinutes(10)) + .frameDecoder(PayloadDecoder.ZERO_COPY) + .transport(clientTransport2) + .start() + .block(); + + // expect normal execution here + rSocket.requestResponse(payload1).block(); + } + + private static class SocketAcceptorImpl implements SocketAcceptor { + @Override + public Mono accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) { + return Mono.just( + new AbstractRSocket() { + + @Override + public Mono fireAndForget(Payload payload) { + payload.release(); + return Mono.empty(); + } + + @Override + public Mono requestResponse(Payload payload) { + return Mono.just(payload); + } + + @Override + public Flux requestChannel(Publisher payloads) { + return Flux.from(payloads); + } + }); + } + } +} diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java index 5049119a5..65c2efc44 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java @@ -21,6 +21,7 @@ import static io.rsocket.transport.netty.UriUtils.isSecure; import io.netty.buffer.ByteBufAllocator; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.transport.ClientTransport; @@ -32,9 +33,11 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.Supplier; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; +import reactor.netty.http.websocket.WebsocketInbound; import reactor.netty.tcp.TcpClient; /** @@ -48,7 +51,9 @@ public final class WebsocketClientTransport implements ClientTransport, Transpor private final HttpClient client; - private String path; + private final String path; + + private Consumer> closeStatusConsumer; private Supplier> transportHeaders = Collections::emptyMap; @@ -161,6 +166,11 @@ public Mono connect(int mtu) { .connect() .map( c -> { + Consumer> closeStatusConsumer = + this.closeStatusConsumer; + if (closeStatusConsumer != null) { + closeStatusConsumer.accept(((WebsocketInbound) c).receiveCloseStatus()); + } DuplexConnection connection = new WebsocketDuplexConnection(c); if (mtu > 0) { connection = @@ -176,4 +186,8 @@ public void setTransportHeaders(Supplier> transportHeaders) this.transportHeaders = Objects.requireNonNull(transportHeaders, "transportHeaders must not be null"); } + + public void setCloseStatusConsumer(Consumer> closeStatusConsumer) { + this.closeStatusConsumer = closeStatusConsumer; + } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java index 9b78ece60..e2304028a 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java @@ -16,11 +16,13 @@ package io.rsocket.transport.netty.server; +import static io.netty.handler.codec.http.websocketx.WebSocketCloseStatus.NORMAL_CLOSURE; import static io.rsocket.frame.FrameLengthFlyweight.FRAME_LENGTH_MASK; import io.netty.buffer.ByteBufAllocator; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; -import io.rsocket.Closeable; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.transport.ServerTransport; @@ -32,8 +34,11 @@ import java.util.Objects; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.annotation.Nullable; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.netty.Connection; @@ -46,7 +51,12 @@ * An implementation of {@link ServerTransport} that connects via Websocket and listens on specified * routes. */ -public final class WebsocketRouteTransport implements ServerTransport { +public final class WebsocketRouteTransport implements ServerTransport { + + private static final Function DEFAULT_STATUS_SUPPLIER = + __ -> NORMAL_CLOSURE; + + private static final Consumer NO_OPS_ROUTES_BUILDER = (r) -> {}; private final UriPathTemplate template; @@ -54,6 +64,10 @@ public final class WebsocketRouteTransport implements ServerTransport private final HttpServer server; + private final Function webSocketCloseStatusSupplier; + + private final @Nullable Predicate headersPredicate; + /** * Creates a new instance * @@ -64,15 +78,43 @@ public final class WebsocketRouteTransport implements ServerTransport public WebsocketRouteTransport( HttpServer server, Consumer routesBuilder, String path) { + this(server, routesBuilder, path, null, DEFAULT_STATUS_SUPPLIER); + } + + public WebsocketRouteTransport( + HttpServer server, + Consumer routesBuilder, + String path, + @Nullable Predicate headersPredicate, + Function webSocketCloseStatusSupplier) { + this.server = Objects.requireNonNull(server, "server must not be null"); this.routesBuilder = Objects.requireNonNull(routesBuilder, "routesBuilder must not be null"); this.template = new UriPathTemplate(Objects.requireNonNull(path, "path must not be null")); + this.headersPredicate = headersPredicate; + this.webSocketCloseStatusSupplier = + Objects.requireNonNull(webSocketCloseStatusSupplier, "status supplier must not be null"); } @Override - public Mono start(ConnectionAcceptor acceptor, int mtu) { + public Mono start(ConnectionAcceptor acceptor, int mtu) { Objects.requireNonNull(acceptor, "acceptor must not be null"); + if (headersPredicate != null) { + return server + .route( + routes -> { + routesBuilder.accept(routes); + routes.ws( + hsr -> hsr.method().equals(HttpMethod.GET) && template.matches(hsr.uri()), + newHandler(acceptor, mtu, headersPredicate, webSocketCloseStatusSupplier), + null, + FRAME_LENGTH_MASK); + }) + .bind() + .map(CloseableChannel::new); + } + return server .route( routes -> { @@ -120,6 +162,36 @@ public static BiFunction> n }; } + /** + * Creates a new Websocket handler + * + * @param acceptor the {@link ConnectionAcceptor} to use with the handler + * @param mtu the fragment size + * @return a new Websocket handler + * @throws NullPointerException if {@code acceptor} is {@code null} + */ + public static BiFunction> newHandler( + ConnectionAcceptor acceptor, + int mtu, + Predicate headersPredicate, + Function webSocketCloseStatusSupplier) { + return (in, out) -> { + HttpHeaders headers = in.headers(); + if (!headersPredicate.test(headers)) { + final WebSocketCloseStatus status = webSocketCloseStatusSupplier.apply(headers); + return out.sendClose(status.code(), status.reasonText()); + } + + DuplexConnection connection = new WebsocketDuplexConnection((Connection) in); + if (mtu > 0) { + connection = + new FragmentationDuplexConnection( + connection, ByteBufAllocator.DEFAULT, mtu, false, "server"); + } + return acceptor.apply(connection).then(out.neverComplete()); + }; + } + static final class UriPathTemplate { private static final Pattern FULL_SPLAT_PATTERN = Pattern.compile("[\\*][\\*]"); @@ -236,4 +308,47 @@ private Matcher matcher(String uri) { return m; } } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private Predicate headersPredicate; + private Function webSocketCloseStatusSupplier = + DEFAULT_STATUS_SUPPLIER; + private String path = "/"; + private Consumer routesBuilder = NO_OPS_ROUTES_BUILDER; + + public Builder filteringInbound(Predicate headersPredicate) { + Objects.requireNonNull(headersPredicate, "Header predicate must not be null"); + this.headersPredicate = headersPredicate; + return this; + } + + public Builder closingWithStatus( + Function webSocketCloseStatusSupplier) { + this.webSocketCloseStatusSupplier = + Objects.requireNonNull( + webSocketCloseStatusSupplier, "webSocketCloseStatusSupplier must not be null"); + return this; + } + + public Builder observingOn(String path) { + Objects.requireNonNull(path, "path must not be null"); + this.path = path; + return this; + } + + public Builder routingWith(Consumer routesBuilder) { + this.routesBuilder = Objects.requireNonNull(routesBuilder, "routesBuilder must not be null"); + return this; + } + + public WebsocketRouteTransport build(HttpServer server) { + return new WebsocketRouteTransport( + server, routesBuilder, path, headersPredicate, webSocketCloseStatusSupplier); + } + } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java index 205f419a2..ea697975b 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java @@ -16,15 +16,18 @@ package io.rsocket.transport.netty.server; +import static io.netty.handler.codec.http.websocketx.WebSocketCloseStatus.NORMAL_CLOSURE; import static io.rsocket.frame.FrameLengthFlyweight.FRAME_LENGTH_MASK; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.transport.ClientTransport; @@ -35,7 +38,10 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; +import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; @@ -50,10 +56,18 @@ public final class WebsocketServerTransport implements ServerTransport, TransportHeaderAware { private static final Logger logger = LoggerFactory.getLogger(WebsocketServerTransport.class); + private static final Function DEFAULT_STATUS_SUPPLIER = + __ -> NORMAL_CLOSURE; + private final HttpServer server; private Supplier> transportHeaders = Collections::emptyMap; + private Function webSocketCloseStatusSupplier = + DEFAULT_STATUS_SUPPLIER; + + private @Nullable Predicate headersPredicate; + private WebsocketServerTransport(HttpServer server) { this.server = server; } @@ -147,27 +161,92 @@ public Mono start(ConnectionAcceptor acceptor, int mtu) { Objects.requireNonNull(acceptor, "acceptor must not be null"); Mono isError = FragmentationDuplexConnection.checkMtu(mtu); - return isError != null - ? isError - : server - .handle( - (request, response) -> { - transportHeaders.get().forEach(response::addHeader); - return response.sendWebsocket( - null, - FRAME_LENGTH_MASK, - (in, out) -> { - DuplexConnection connection = - new WebsocketDuplexConnection((Connection) in); - if (mtu > 0) { - connection = - new FragmentationDuplexConnection( - connection, ByteBufAllocator.DEFAULT, mtu, false, "server"); - } - return acceptor.apply(connection).then(out.neverComplete()); - }); - }) - .bind() - .map(CloseableChannel::new); + + if (isError != null) { + return isError; + } + + if (headersPredicate != null) { + return server + .handle( + (request, response) -> { + transportHeaders.get().forEach(response::addHeader); + return response.sendWebsocket( + null, + FRAME_LENGTH_MASK, + (in, out) -> { + HttpHeaders headers = in.headers(); + if (!headersPredicate.test(headers)) { + final WebSocketCloseStatus status = + webSocketCloseStatusSupplier.apply(headers); + return out.sendClose(status.code(), status.reasonText()); + } + + DuplexConnection connection = new WebsocketDuplexConnection((Connection) in); + if (mtu > 0) { + connection = + new FragmentationDuplexConnection( + connection, ByteBufAllocator.DEFAULT, mtu, false, "server"); + } + return acceptor.apply(connection).then(out.neverComplete()); + }); + }) + .bind() + .map(CloseableChannel::new); + } + + return server + .handle( + (request, response) -> { + transportHeaders.get().forEach(response::addHeader); + return response.sendWebsocket( + null, + FRAME_LENGTH_MASK, + (in, out) -> { + DuplexConnection connection = new WebsocketDuplexConnection((Connection) in); + if (mtu > 0) { + connection = + new FragmentationDuplexConnection( + connection, ByteBufAllocator.DEFAULT, mtu, false, "server"); + } + return acceptor.apply(connection).then(out.neverComplete()); + }); + }) + .bind() + .map(CloseableChannel::new); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private Predicate headersPredicate; + private Function webSocketCloseStatusSupplier = + DEFAULT_STATUS_SUPPLIER; + + public Builder filteringInbound(Predicate headersPredicate) { + this.headersPredicate = + Objects.requireNonNull(headersPredicate, "Header predicate must not be null"); + return this; + } + + public Builder closingWithStatus( + Function webSocketCloseStatusSupplier) { + this.webSocketCloseStatusSupplier = + Objects.requireNonNull( + webSocketCloseStatusSupplier, "WebSocketCloseStatusSupplier must not be null"); + return this; + } + + public WebsocketServerTransport build(HttpServer server) { + WebsocketServerTransport websocketServerTransport = WebsocketServerTransport.create(server); + + websocketServerTransport.headersPredicate = headersPredicate; + websocketServerTransport.webSocketCloseStatusSupplier = webSocketCloseStatusSupplier; + + return websocketServerTransport; + } } }