Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ subprojects {
apply plugin: 'io.spring.dependency-management'
apply plugin: 'com.github.sherter.google-java-format'

ext['reactor-bom.version'] = 'Dysprosium-M3'
ext['reactor-bom.version'] = 'Dysprosium-RC1'
ext['logback.version'] = '1.2.3'
ext['findbugs.version'] = '3.0.2'
ext['netty.version'] = '4.1.37.Final'
ext['netty.version'] = '4.1.39.Final'
ext['netty-boringssl.version'] = '2.0.25.Final'
ext['hdrhistogram.version'] = '2.1.10'
ext['mockito.version'] = '2.25.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,71 +16,61 @@

package io.rsocket.examples.transport.ws;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.WebsocketDuplexConnection;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.WebsocketRouteTransport;
import io.rsocket.util.ByteBufPayload;
import java.time.Duration;
import java.util.HashMap;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class WebSocketHeadersSample {
public class WebSocketRouteTransportHeadersSample {
static final Payload payload1 = ByteBufPayload.create("Hello ");

public static void main(String[] args) {

ServerTransport.ConnectionAcceptor acceptor =
CloseableChannel disposableServer =
RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(new SocketAcceptorImpl())
.toConnectionAcceptor();

DisposableServer disposableServer =
HttpServer.create()
.host("localhost")
.port(0)
.route(
routes ->
routes.ws(
"/",
(in, out) -> {
if (in.headers().containsValue("Authorization", "test", true)) {
DuplexConnection connection =
new WebsocketDuplexConnection((Connection) in);
return acceptor.apply(connection).then(out.neverComplete());
}

return out.sendClose(
HttpResponseStatus.UNAUTHORIZED.code(),
HttpResponseStatus.UNAUTHORIZED.reasonPhrase());
}))
.bindNow();
.transport(
// Same could be done with routing transport
WebsocketRouteTransport.builder()
.filteringInbound(
headers -> headers.containsValue("Authorization", "test", true))
.closingWithStatus(headers -> new WebSocketCloseStatus(4404, "Unauthorized"))
.observingOn("/")
.build(HttpServer.create().host("localhost").port(8080)))
.start()
.block();

WebsocketClientTransport clientTransport =
WebsocketClientTransport.create(disposableServer.host(), disposableServer.port());
WebsocketClientTransport.create(disposableServer.address());

MonoProcessor<WebSocketCloseStatus> statusMonoProcessor = MonoProcessor.create();
clientTransport.setTransportHeaders(
() -> {
HashMap<String, String> map = new HashMap<>();
map.put("Authorization", "test");
map.put("Authorization", "1");
return map;
});

clientTransport.setCloseStatusConsumer(
webSocketCloseStatusMono -> webSocketCloseStatusMono.log().subscribe(statusMonoProcessor));

RSocket socket =
RSocketFactory.connect()
.keepAliveAckTimeout(Duration.ofMinutes(10))
Expand All @@ -89,17 +79,24 @@ public static void main(String[] args) {
.start()
.block();

Flux.range(0, 100)
.concatMap(i -> socket.fireAndForget(payload1.retain()))
// .doOnNext(p -> {
//// System.out.println(p.getDataUtf8());
// p.release();
// })
.blockLast();
try {
Flux.range(0, 100).concatMap(i -> socket.fireAndForget(payload1.retain())).blockLast();

} catch (Exception e) {
System.out.println("Observed WebSocket Close Status " + statusMonoProcessor.peek());
}

socket.dispose();

WebsocketClientTransport clientTransport2 =
WebsocketClientTransport.create(disposableServer.host(), disposableServer.port());
WebsocketClientTransport.create(disposableServer.address());

clientTransport2.setTransportHeaders(
() -> {
HashMap<String, String> map = new HashMap<>();
map.put("Authorization", "test");
return map;
});

RSocket rSocket =
RSocketFactory.connect()
Expand All @@ -109,7 +106,7 @@ public static void main(String[] args) {
.start()
.block();

// expect error here because of closed channel
// expect normal execution here
rSocket.requestResponse(payload1).block();
}

Expand All @@ -121,7 +118,6 @@ public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiv

@Override
public Mono<Void> fireAndForget(Payload payload) {
// System.out.println(payload.getDataUtf8());
payload.release();
return Mono.empty();
}
Expand All @@ -133,7 +129,7 @@ public Mono<Payload> requestResponse(Payload payload) {

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads).subscribeOn(Schedulers.single());
return Flux.from(payloads);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.examples.transport.ws;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import io.rsocket.util.ByteBufPayload;
import java.time.Duration;
import java.util.HashMap;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.server.HttpServer;

public class WebSocketServerTransportHeadersSample {
static final Payload payload1 = ByteBufPayload.create("Hello ");

public static void main(String[] args) {

CloseableChannel disposableServer =
RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(new SocketAcceptorImpl())
.transport(
WebsocketServerTransport.builder()
.filteringInbound(
headers -> headers.containsValue("Authorization", "test", true))
.closingWithStatus(headers -> new WebSocketCloseStatus(4404, "Unauthorized"))
.build(HttpServer.create().host("localhost").port(8080)))
.start()
.block();

WebsocketClientTransport clientTransport =
WebsocketClientTransport.create(disposableServer.address());

MonoProcessor<WebSocketCloseStatus> statusMonoProcessor = MonoProcessor.create();
clientTransport.setTransportHeaders(
() -> {
HashMap<String, String> map = new HashMap<>();
map.put("Authorization", "1");
return map;
});

clientTransport.setCloseStatusConsumer(
webSocketCloseStatusMono -> webSocketCloseStatusMono.log().subscribe(statusMonoProcessor));

RSocket socket =
RSocketFactory.connect()
.keepAliveAckTimeout(Duration.ofMinutes(10))
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(clientTransport)
.start()
.block();

try {
Flux.range(0, 100).concatMap(i -> socket.fireAndForget(payload1.retain())).blockLast();

} catch (Exception e) {
System.out.println("Observed WebSocket Close Status " + statusMonoProcessor.peek());
}

socket.dispose();

WebsocketClientTransport clientTransport2 =
WebsocketClientTransport.create(disposableServer.address());

clientTransport2.setTransportHeaders(
() -> {
HashMap<String, String> map = new HashMap<>();
map.put("Authorization", "test");
return map;
});

RSocket rSocket =
RSocketFactory.connect()
.keepAliveAckTimeout(Duration.ofMinutes(10))
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(clientTransport2)
.start()
.block();

// expect normal execution here
rSocket.requestResponse(payload1).block();
}

private static class SocketAcceptorImpl implements SocketAcceptor {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
return Mono.just(
new AbstractRSocket() {

@Override
public Mono<Void> fireAndForget(Payload payload) {
payload.release();
return Mono.empty();
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.just(payload);
}

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads);
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.rsocket.transport.netty.UriUtils.isSecure;

import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.rsocket.DuplexConnection;
import io.rsocket.fragmentation.FragmentationDuplexConnection;
import io.rsocket.transport.ClientTransport;
Expand All @@ -32,9 +33,11 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.tcp.TcpClient;

/**
Expand All @@ -48,7 +51,9 @@ public final class WebsocketClientTransport implements ClientTransport, Transpor

private final HttpClient client;

private String path;
private final String path;

private Consumer<Mono<WebSocketCloseStatus>> closeStatusConsumer;

private Supplier<Map<String, String>> transportHeaders = Collections::emptyMap;

Expand Down Expand Up @@ -161,6 +166,11 @@ public Mono<DuplexConnection> connect(int mtu) {
.connect()
.map(
c -> {
Consumer<Mono<WebSocketCloseStatus>> closeStatusConsumer =
this.closeStatusConsumer;
if (closeStatusConsumer != null) {
closeStatusConsumer.accept(((WebsocketInbound) c).receiveCloseStatus());
}
DuplexConnection connection = new WebsocketDuplexConnection(c);
if (mtu > 0) {
connection =
Expand All @@ -176,4 +186,8 @@ public void setTransportHeaders(Supplier<Map<String, String>> transportHeaders)
this.transportHeaders =
Objects.requireNonNull(transportHeaders, "transportHeaders must not be null");
}

public void setCloseStatusConsumer(Consumer<Mono<WebSocketCloseStatus>> closeStatusConsumer) {
this.closeStatusConsumer = closeStatusConsumer;
}
}
Loading