From 41777f1ee5604bacf13ac723619287a8f461ff8f Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 2 Sep 2019 19:50:06 +0300 Subject: [PATCH 1/5] provides draft of the enhanced websocket handling Signed-off-by: Oleh Dokuka --- build.gradle | 6 +- .../transport/ws/WebSocketHeadersSample.java | 85 ++++++------ .../client/WebsocketClientTransport.java | 16 ++- .../netty/server/WebsocketRouteTransport.java | 121 ++++++++++++++++- .../server/WebsocketServerTransport.java | 123 ++++++++++++++---- 5 files changed, 282 insertions(+), 69 deletions(-) diff --git a/build.gradle b/build.gradle index d3ea08183..855046ad4 100644 --- a/build.gradle +++ b/build.gradle @@ -29,7 +29,7 @@ subprojects { apply plugin: 'io.spring.dependency-management' apply plugin: 'com.github.sherter.google-java-format' - ext['reactor-bom.version'] = 'Dysprosium-M3' + ext['reactor-bom.version'] = 'Dysprosium-BUILD-SNAPSHOT' ext['logback.version'] = '1.2.3' ext['findbugs.version'] = '3.0.2' ext['netty.version'] = '4.1.37.Final' @@ -97,9 +97,9 @@ subprojects { mavenCentral() maven { url 'http://repo.spring.io/milestone' } // temporary for Reactor Dysprosium - if (version.endsWith('BUILD-SNAPSHOT') || project.hasProperty('platformVersion')) { +// if (version.endsWith('BUILD-SNAPSHOT') || project.hasProperty('platformVersion')) { maven { url 'http://repo.spring.io/libs-snapshot' } - } +// } } if (project.name != 'rsocket-bom') { diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java index d3865c01b..9fcf90c58 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java @@ -16,27 +16,25 @@ package io.rsocket.examples.transport.ws; -import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.rsocket.AbstractRSocket; import io.rsocket.ConnectionSetupPayload; -import io.rsocket.DuplexConnection; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.SocketAcceptor; import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.transport.ServerTransport; -import io.rsocket.transport.netty.WebsocketDuplexConnection; import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.WebsocketServerTransport; import io.rsocket.util.ByteBufPayload; import java.time.Duration; import java.util.HashMap; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.core.scheduler.Schedulers; -import reactor.netty.Connection; -import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; public class WebSocketHeadersSample { @@ -44,43 +42,43 @@ public class WebSocketHeadersSample { public static void main(String[] args) { - ServerTransport.ConnectionAcceptor acceptor = + CloseableChannel disposableServer = RSocketFactory.receive() .frameDecoder(PayloadDecoder.ZERO_COPY) .acceptor(new SocketAcceptorImpl()) - .toConnectionAcceptor(); - - DisposableServer disposableServer = - HttpServer.create() - .host("localhost") - .port(0) - .route( - routes -> - routes.ws( - "/", - (in, out) -> { - if (in.headers().containsValue("Authorization", "test", true)) { - DuplexConnection connection = - new WebsocketDuplexConnection((Connection) in); - return acceptor.apply(connection).then(out.neverComplete()); - } - - return out.sendClose( - HttpResponseStatus.UNAUTHORIZED.code(), - HttpResponseStatus.UNAUTHORIZED.reasonPhrase()); - })) - .bindNow(); + .transport( + WebsocketServerTransport.builder() + .filteringInbound( + headers -> headers.containsValue("Authorization", "test", true)) + .closingWithStatus(headers -> new WebSocketCloseStatus(4404, "Unauthorized")) + .build(HttpServer.create().host("localhost").port(8080)) + // Same could be done with routing transport + // WebsocketRouteTransport + // .builder() + // .filteringInbound(headers -> + // headers.containsValue("Authorization", "test", true)) + // .closingWithStatus(headers -> new WebSocketCloseStatus(4404, + // "Unauthorized")) + // .observingOn("/") + // .build(HttpServer.create().host("localhost").port(8080)) + ) + .start() + .block(); WebsocketClientTransport clientTransport = - WebsocketClientTransport.create(disposableServer.host(), disposableServer.port()); + WebsocketClientTransport.create(disposableServer.address()); + MonoProcessor statusMonoProcessor = MonoProcessor.create(); clientTransport.setTransportHeaders( () -> { HashMap map = new HashMap<>(); - map.put("Authorization", "test"); + map.put("Authorization", "1"); return map; }); + clientTransport.setCloseStatusConsumer( + webSocketCloseStatusMono -> webSocketCloseStatusMono.log().subscribe(statusMonoProcessor)); + RSocket socket = RSocketFactory.connect() .keepAliveAckTimeout(Duration.ofMinutes(10)) @@ -89,17 +87,24 @@ public static void main(String[] args) { .start() .block(); - Flux.range(0, 100) - .concatMap(i -> socket.fireAndForget(payload1.retain())) - // .doOnNext(p -> { - //// System.out.println(p.getDataUtf8()); - // p.release(); - // }) - .blockLast(); + try { + Flux.range(0, 100).concatMap(i -> socket.fireAndForget(payload1.retain())).blockLast(); + + } catch (Exception e) { + System.out.println("Observed WebSocket Close Status " + statusMonoProcessor.peek()); + } + socket.dispose(); WebsocketClientTransport clientTransport2 = - WebsocketClientTransport.create(disposableServer.host(), disposableServer.port()); + WebsocketClientTransport.create(disposableServer.address()); + + clientTransport2.setTransportHeaders( + () -> { + HashMap map = new HashMap<>(); + map.put("Authorization", "test"); + return map; + }); RSocket rSocket = RSocketFactory.connect() @@ -109,7 +114,7 @@ public static void main(String[] args) { .start() .block(); - // expect error here because of closed channel + // expect normal execution here rSocket.requestResponse(payload1).block(); } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java index 5049119a5..65c2efc44 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java @@ -21,6 +21,7 @@ import static io.rsocket.transport.netty.UriUtils.isSecure; import io.netty.buffer.ByteBufAllocator; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.transport.ClientTransport; @@ -32,9 +33,11 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.Supplier; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; +import reactor.netty.http.websocket.WebsocketInbound; import reactor.netty.tcp.TcpClient; /** @@ -48,7 +51,9 @@ public final class WebsocketClientTransport implements ClientTransport, Transpor private final HttpClient client; - private String path; + private final String path; + + private Consumer> closeStatusConsumer; private Supplier> transportHeaders = Collections::emptyMap; @@ -161,6 +166,11 @@ public Mono connect(int mtu) { .connect() .map( c -> { + Consumer> closeStatusConsumer = + this.closeStatusConsumer; + if (closeStatusConsumer != null) { + closeStatusConsumer.accept(((WebsocketInbound) c).receiveCloseStatus()); + } DuplexConnection connection = new WebsocketDuplexConnection(c); if (mtu > 0) { connection = @@ -176,4 +186,8 @@ public void setTransportHeaders(Supplier> transportHeaders) this.transportHeaders = Objects.requireNonNull(transportHeaders, "transportHeaders must not be null"); } + + public void setCloseStatusConsumer(Consumer> closeStatusConsumer) { + this.closeStatusConsumer = closeStatusConsumer; + } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java index 9b78ece60..4c6818595 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java @@ -16,11 +16,13 @@ package io.rsocket.transport.netty.server; +import static io.netty.handler.codec.http.websocketx.WebSocketCloseStatus.NORMAL_CLOSURE; import static io.rsocket.frame.FrameLengthFlyweight.FRAME_LENGTH_MASK; import io.netty.buffer.ByteBufAllocator; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; -import io.rsocket.Closeable; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.transport.ServerTransport; @@ -32,8 +34,11 @@ import java.util.Objects; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.annotation.Nullable; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.netty.Connection; @@ -46,7 +51,12 @@ * An implementation of {@link ServerTransport} that connects via Websocket and listens on specified * routes. */ -public final class WebsocketRouteTransport implements ServerTransport { +public final class WebsocketRouteTransport implements ServerTransport { + + private static final Function DEFAULT_STATUS_SUPPLIER = + __ -> NORMAL_CLOSURE; + + private static final Consumer NO_OPS_ROUTES_BUILDER = (r) -> {}; private final UriPathTemplate template; @@ -54,6 +64,10 @@ public final class WebsocketRouteTransport implements ServerTransport private final HttpServer server; + private final Function webSocketCloseStatusSupplier; + + private final @Nullable Predicate headersPredicate; + /** * Creates a new instance * @@ -64,15 +78,43 @@ public final class WebsocketRouteTransport implements ServerTransport public WebsocketRouteTransport( HttpServer server, Consumer routesBuilder, String path) { + this(server, routesBuilder, path, null, DEFAULT_STATUS_SUPPLIER); + } + + public WebsocketRouteTransport( + HttpServer server, + Consumer routesBuilder, + String path, + @Nullable Predicate headersPredicate, + Function webSocketCloseStatusSupplier) { + this.server = Objects.requireNonNull(server, "server must not be null"); this.routesBuilder = Objects.requireNonNull(routesBuilder, "routesBuilder must not be null"); this.template = new UriPathTemplate(Objects.requireNonNull(path, "path must not be null")); + this.headersPredicate = headersPredicate; + this.webSocketCloseStatusSupplier = + Objects.requireNonNull(webSocketCloseStatusSupplier, "status supplier must not be null"); } @Override - public Mono start(ConnectionAcceptor acceptor, int mtu) { + public Mono start(ConnectionAcceptor acceptor, int mtu) { Objects.requireNonNull(acceptor, "acceptor must not be null"); + if (headersPredicate != null) { + return server + .route( + routes -> { + routesBuilder.accept(routes); + routes.ws( + hsr -> hsr.method().equals(HttpMethod.GET) && template.matches(hsr.uri()), + newHandler(acceptor, mtu, headersPredicate, webSocketCloseStatusSupplier), + null, + FRAME_LENGTH_MASK); + }) + .bind() + .map(CloseableChannel::new); + } + return server .route( routes -> { @@ -120,6 +162,36 @@ public static BiFunction> n }; } + /** + * Creates a new Websocket handler + * + * @param acceptor the {@link ConnectionAcceptor} to use with the handler + * @param mtu the fragment size + * @return a new Websocket handler + * @throws NullPointerException if {@code acceptor} is {@code null} + */ + public static BiFunction> newHandler( + ConnectionAcceptor acceptor, + int mtu, + Predicate headersPredicate, + Function webSocketCloseStatusSupplier) { + return (in, out) -> { + HttpHeaders headers = in.headers(); + if (!headersPredicate.test(headers)) { + final WebSocketCloseStatus status = webSocketCloseStatusSupplier.apply(headers); + return out.sendClose(status.code(), status.reasonText()); + } + + DuplexConnection connection = new WebsocketDuplexConnection((Connection) in); + if (mtu > 0) { + connection = + new FragmentationDuplexConnection( + connection, ByteBufAllocator.DEFAULT, mtu, false, "server"); + } + return acceptor.apply(connection).then(out.neverComplete()); + }; + } + static final class UriPathTemplate { private static final Pattern FULL_SPLAT_PATTERN = Pattern.compile("[\\*][\\*]"); @@ -236,4 +308,47 @@ private Matcher matcher(String uri) { return m; } } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private Predicate headersPredicate; + private Function webSocketCloseStatusSupplier = + DEFAULT_STATUS_SUPPLIER; + private String path = "/"; + private Consumer routesBuilder = NO_OPS_ROUTES_BUILDER; + + public Builder filteringInbound(Predicate headersPredicate) { + Objects.requireNonNull(headersPredicate, "Header predicate must not be null"); + this.headersPredicate = headersPredicate; + return this; + } + + public Builder closingWithStatus( + Function webSocketCloseStatusSupplier) { + this.webSocketCloseStatusSupplier = + Objects.requireNonNull( + webSocketCloseStatusSupplier, "WebSocketCloseStatusSupplier must not be null"); + return this; + } + + public Builder observingOn(String path) { + Objects.requireNonNull(path, "path must not be null"); + this.path = path; + return this; + } + + public Builder routingWith(Consumer routesBuilder) { + this.routesBuilder = Objects.requireNonNull(routesBuilder, "routesBuilder must not be null"); + return this; + } + + public WebsocketRouteTransport build(HttpServer server) { + return new WebsocketRouteTransport( + server, routesBuilder, path, headersPredicate, webSocketCloseStatusSupplier); + } + } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java index 205f419a2..ea697975b 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java @@ -16,15 +16,18 @@ package io.rsocket.transport.netty.server; +import static io.netty.handler.codec.http.websocketx.WebSocketCloseStatus.NORMAL_CLOSURE; import static io.rsocket.frame.FrameLengthFlyweight.FRAME_LENGTH_MASK; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.transport.ClientTransport; @@ -35,7 +38,10 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; +import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; @@ -50,10 +56,18 @@ public final class WebsocketServerTransport implements ServerTransport, TransportHeaderAware { private static final Logger logger = LoggerFactory.getLogger(WebsocketServerTransport.class); + private static final Function DEFAULT_STATUS_SUPPLIER = + __ -> NORMAL_CLOSURE; + private final HttpServer server; private Supplier> transportHeaders = Collections::emptyMap; + private Function webSocketCloseStatusSupplier = + DEFAULT_STATUS_SUPPLIER; + + private @Nullable Predicate headersPredicate; + private WebsocketServerTransport(HttpServer server) { this.server = server; } @@ -147,27 +161,92 @@ public Mono start(ConnectionAcceptor acceptor, int mtu) { Objects.requireNonNull(acceptor, "acceptor must not be null"); Mono isError = FragmentationDuplexConnection.checkMtu(mtu); - return isError != null - ? isError - : server - .handle( - (request, response) -> { - transportHeaders.get().forEach(response::addHeader); - return response.sendWebsocket( - null, - FRAME_LENGTH_MASK, - (in, out) -> { - DuplexConnection connection = - new WebsocketDuplexConnection((Connection) in); - if (mtu > 0) { - connection = - new FragmentationDuplexConnection( - connection, ByteBufAllocator.DEFAULT, mtu, false, "server"); - } - return acceptor.apply(connection).then(out.neverComplete()); - }); - }) - .bind() - .map(CloseableChannel::new); + + if (isError != null) { + return isError; + } + + if (headersPredicate != null) { + return server + .handle( + (request, response) -> { + transportHeaders.get().forEach(response::addHeader); + return response.sendWebsocket( + null, + FRAME_LENGTH_MASK, + (in, out) -> { + HttpHeaders headers = in.headers(); + if (!headersPredicate.test(headers)) { + final WebSocketCloseStatus status = + webSocketCloseStatusSupplier.apply(headers); + return out.sendClose(status.code(), status.reasonText()); + } + + DuplexConnection connection = new WebsocketDuplexConnection((Connection) in); + if (mtu > 0) { + connection = + new FragmentationDuplexConnection( + connection, ByteBufAllocator.DEFAULT, mtu, false, "server"); + } + return acceptor.apply(connection).then(out.neverComplete()); + }); + }) + .bind() + .map(CloseableChannel::new); + } + + return server + .handle( + (request, response) -> { + transportHeaders.get().forEach(response::addHeader); + return response.sendWebsocket( + null, + FRAME_LENGTH_MASK, + (in, out) -> { + DuplexConnection connection = new WebsocketDuplexConnection((Connection) in); + if (mtu > 0) { + connection = + new FragmentationDuplexConnection( + connection, ByteBufAllocator.DEFAULT, mtu, false, "server"); + } + return acceptor.apply(connection).then(out.neverComplete()); + }); + }) + .bind() + .map(CloseableChannel::new); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private Predicate headersPredicate; + private Function webSocketCloseStatusSupplier = + DEFAULT_STATUS_SUPPLIER; + + public Builder filteringInbound(Predicate headersPredicate) { + this.headersPredicate = + Objects.requireNonNull(headersPredicate, "Header predicate must not be null"); + return this; + } + + public Builder closingWithStatus( + Function webSocketCloseStatusSupplier) { + this.webSocketCloseStatusSupplier = + Objects.requireNonNull( + webSocketCloseStatusSupplier, "WebSocketCloseStatusSupplier must not be null"); + return this; + } + + public WebsocketServerTransport build(HttpServer server) { + WebsocketServerTransport websocketServerTransport = WebsocketServerTransport.create(server); + + websocketServerTransport.headersPredicate = headersPredicate; + websocketServerTransport.webSocketCloseStatusSupplier = webSocketCloseStatusSupplier; + + return websocketServerTransport; + } } } From c7e01f41f83352cebe707f133bfbbf0024d1b90f Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 6 Sep 2019 10:52:09 +0300 Subject: [PATCH 2/5] fixes samples; bumps reactor to RC1 version; bumps netty to 4.1.39 Signed-off-by: Oleh Dokuka --- build.gradle | 4 +- .../WebSocketRouteTransportHeadersSample.java | 138 ++++++++++++++++++ ...ebSocketServerTransportHeadersSample.java} | 14 +- 3 files changed, 142 insertions(+), 14 deletions(-) create mode 100644 rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketRouteTransportHeadersSample.java rename rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/{WebSocketHeadersSample.java => WebSocketServerTransportHeadersSample.java} (88%) diff --git a/build.gradle b/build.gradle index 855046ad4..50d04be2a 100644 --- a/build.gradle +++ b/build.gradle @@ -29,10 +29,10 @@ subprojects { apply plugin: 'io.spring.dependency-management' apply plugin: 'com.github.sherter.google-java-format' - ext['reactor-bom.version'] = 'Dysprosium-BUILD-SNAPSHOT' + ext['reactor-bom.version'] = 'Dysprosium-RC1' ext['logback.version'] = '1.2.3' ext['findbugs.version'] = '3.0.2' - ext['netty.version'] = '4.1.37.Final' + ext['netty.version'] = '4.1.39.Final' ext['netty-boringssl.version'] = '2.0.25.Final' ext['hdrhistogram.version'] = '2.1.10' ext['mockito.version'] = '2.25.1' diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketRouteTransportHeadersSample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketRouteTransportHeadersSample.java new file mode 100644 index 000000000..a338f8abe --- /dev/null +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketRouteTransportHeadersSample.java @@ -0,0 +1,138 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.examples.transport.ws; + +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; +import io.rsocket.AbstractRSocket; +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.SocketAcceptor; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.WebsocketRouteTransport; +import io.rsocket.util.ByteBufPayload; +import java.time.Duration; +import java.util.HashMap; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; +import reactor.core.scheduler.Schedulers; +import reactor.netty.http.server.HttpServer; + +public class WebSocketRouteTransportHeadersSample { + static final Payload payload1 = ByteBufPayload.create("Hello "); + + public static void main(String[] args) { + + CloseableChannel disposableServer = + RSocketFactory.receive() + .frameDecoder(PayloadDecoder.ZERO_COPY) + .acceptor(new SocketAcceptorImpl()) + .transport( + // Same could be done with routing transport + WebsocketRouteTransport.builder() + .filteringInbound( + headers -> headers.containsValue("Authorization", "test", true)) + .closingWithStatus(headers -> new WebSocketCloseStatus(4404, "Unauthorized")) + .observingOn("/") + .build(HttpServer.create().host("localhost").port(8080))) + .start() + .block(); + + WebsocketClientTransport clientTransport = + WebsocketClientTransport.create(disposableServer.address()); + + MonoProcessor statusMonoProcessor = MonoProcessor.create(); + clientTransport.setTransportHeaders( + () -> { + HashMap map = new HashMap<>(); + map.put("Authorization", "1"); + return map; + }); + + clientTransport.setCloseStatusConsumer( + webSocketCloseStatusMono -> webSocketCloseStatusMono.log().subscribe(statusMonoProcessor)); + + RSocket socket = + RSocketFactory.connect() + .keepAliveAckTimeout(Duration.ofMinutes(10)) + .frameDecoder(PayloadDecoder.ZERO_COPY) + .transport(clientTransport) + .start() + .block(); + + try { + Flux.range(0, 100).concatMap(i -> socket.fireAndForget(payload1.retain())).blockLast(); + + } catch (Exception e) { + System.out.println("Observed WebSocket Close Status " + statusMonoProcessor.peek()); + } + + socket.dispose(); + + WebsocketClientTransport clientTransport2 = + WebsocketClientTransport.create(disposableServer.address()); + + clientTransport2.setTransportHeaders( + () -> { + HashMap map = new HashMap<>(); + map.put("Authorization", "test"); + return map; + }); + + RSocket rSocket = + RSocketFactory.connect() + .keepAliveAckTimeout(Duration.ofMinutes(10)) + .frameDecoder(PayloadDecoder.ZERO_COPY) + .transport(clientTransport2) + .start() + .block(); + + // expect normal execution here + rSocket.requestResponse(payload1).block(); + } + + private static class SocketAcceptorImpl implements SocketAcceptor { + @Override + public Mono accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) { + return Mono.just( + new AbstractRSocket() { + + @Override + public Mono fireAndForget(Payload payload) { + // System.out.println(payload.getDataUtf8()); + payload.release(); + return Mono.empty(); + } + + @Override + public Mono requestResponse(Payload payload) { + return Mono.just(payload); + } + + @Override + public Flux requestChannel(Publisher payloads) { + return Flux.from(payloads).subscribeOn(Schedulers.single()); + } + }); + } + } +} diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketServerTransportHeadersSample.java similarity index 88% rename from rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java rename to rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketServerTransportHeadersSample.java index 9fcf90c58..83f4477e9 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketHeadersSample.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketServerTransportHeadersSample.java @@ -37,7 +37,7 @@ import reactor.core.scheduler.Schedulers; import reactor.netty.http.server.HttpServer; -public class WebSocketHeadersSample { +public class WebSocketServerTransportHeadersSample { static final Payload payload1 = ByteBufPayload.create("Hello "); public static void main(String[] args) { @@ -51,17 +51,7 @@ public static void main(String[] args) { .filteringInbound( headers -> headers.containsValue("Authorization", "test", true)) .closingWithStatus(headers -> new WebSocketCloseStatus(4404, "Unauthorized")) - .build(HttpServer.create().host("localhost").port(8080)) - // Same could be done with routing transport - // WebsocketRouteTransport - // .builder() - // .filteringInbound(headers -> - // headers.containsValue("Authorization", "test", true)) - // .closingWithStatus(headers -> new WebSocketCloseStatus(4404, - // "Unauthorized")) - // .observingOn("/") - // .build(HttpServer.create().host("localhost").port(8080)) - ) + .build(HttpServer.create().host("localhost").port(8080))) .start() .block(); From c51ae4d5ff75dba69cbd717a86ad3adcc22a8106 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 6 Sep 2019 11:22:45 +0300 Subject: [PATCH 3/5] fixes samples Signed-off-by: Oleh Dokuka --- .../transport/ws/WebSocketRouteTransportHeadersSample.java | 3 +-- .../transport/ws/WebSocketServerTransportHeadersSample.java | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketRouteTransportHeadersSample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketRouteTransportHeadersSample.java index a338f8abe..d31223a7b 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketRouteTransportHeadersSample.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketRouteTransportHeadersSample.java @@ -118,7 +118,6 @@ public Mono accept(ConnectionSetupPayload setupPayload, RSocket reactiv @Override public Mono fireAndForget(Payload payload) { - // System.out.println(payload.getDataUtf8()); payload.release(); return Mono.empty(); } @@ -130,7 +129,7 @@ public Mono requestResponse(Payload payload) { @Override public Flux requestChannel(Publisher payloads) { - return Flux.from(payloads).subscribeOn(Schedulers.single()); + return Flux.from(payloads); } }); } diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketServerTransportHeadersSample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketServerTransportHeadersSample.java index 83f4477e9..9bac962ad 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketServerTransportHeadersSample.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/ws/WebSocketServerTransportHeadersSample.java @@ -116,7 +116,6 @@ public Mono accept(ConnectionSetupPayload setupPayload, RSocket reactiv @Override public Mono fireAndForget(Payload payload) { - // System.out.println(payload.getDataUtf8()); payload.release(); return Mono.empty(); } @@ -128,7 +127,7 @@ public Mono requestResponse(Payload payload) { @Override public Flux requestChannel(Publisher payloads) { - return Flux.from(payloads).subscribeOn(Schedulers.single()); + return Flux.from(payloads); } }); } From a1f4d8e191ed4141ecb653f1a3c3bfd2668d8015 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 6 Sep 2019 11:24:21 +0300 Subject: [PATCH 4/5] fixes build script Signed-off-by: Oleh Dokuka --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 50d04be2a..10d319a11 100644 --- a/build.gradle +++ b/build.gradle @@ -97,9 +97,9 @@ subprojects { mavenCentral() maven { url 'http://repo.spring.io/milestone' } // temporary for Reactor Dysprosium -// if (version.endsWith('BUILD-SNAPSHOT') || project.hasProperty('platformVersion')) { + if (version.endsWith('BUILD-SNAPSHOT') || project.hasProperty('platformVersion')) { maven { url 'http://repo.spring.io/libs-snapshot' } -// } + } } if (project.name != 'rsocket-bom') { From b42f27b17c732517d8bbf667e4105b9f974f02a9 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 6 Sep 2019 15:14:21 +0300 Subject: [PATCH 5/5] Update rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java Co-Authored-By: Yuri Schimke --- .../rsocket/transport/netty/server/WebsocketRouteTransport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java index 4c6818595..e2304028a 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java @@ -331,7 +331,7 @@ public Builder closingWithStatus( Function webSocketCloseStatusSupplier) { this.webSocketCloseStatusSupplier = Objects.requireNonNull( - webSocketCloseStatusSupplier, "WebSocketCloseStatusSupplier must not be null"); + webSocketCloseStatusSupplier, "webSocketCloseStatusSupplier must not be null"); return this; }