Skip to content

Commit 417bb30

Browse files
committed
ReactorNettyWebSocketSession implements close properly
Issue: SPR-16774
1 parent b0aa08a commit 417bb30

File tree

2 files changed

+32
-7
lines changed

2 files changed

+32
-7
lines changed

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,10 +15,12 @@
1515
*/
1616
package org.springframework.web.reactive.socket.adapter;
1717

18+
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
1819
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
1920
import org.reactivestreams.Publisher;
2021
import reactor.core.publisher.Flux;
2122
import reactor.core.publisher.Mono;
23+
import reactor.core.publisher.MonoProcessor;
2224
import reactor.ipc.netty.NettyInbound;
2325
import reactor.ipc.netty.NettyOutbound;
2426
import reactor.ipc.netty.NettyPipeline;
@@ -42,6 +44,8 @@
4244
public class ReactorNettyWebSocketSession
4345
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
4446

47+
private final MonoProcessor<WebSocketFrame> closeMono = MonoProcessor.create();
48+
4549

4650
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
4751
HandshakeInfo info, NettyDataBufferFactory bufferFactory) {
@@ -69,11 +73,8 @@ public Mono<Void> send(Publisher<WebSocketMessage> messages) {
6973

7074
@Override
7175
public Mono<Void> close(CloseStatus status) {
72-
return Mono.error(new UnsupportedOperationException(
73-
"Reactor Netty does not support closing the session from anywhere. " +
74-
"You will need to work with the Flux returned from receive() method, " +
75-
"either subscribing to it and using the returned Disposable, " +
76-
"or using an operator that cancels (e.g. take)."));
76+
WebSocketFrame closeFrame = new CloseWebSocketFrame(status.getCode(), status.getReason());
77+
return getDelegate().getOutbound().sendObject(closeFrame).then();
7778
}
7879

7980

spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -127,6 +127,21 @@ public void customHeader() throws Exception {
127127
assertEquals("my-header:my-value", output.block(Duration.ofMillis(5000)));
128128
}
129129

130+
@Test
131+
public void sessionClosing() throws Exception {
132+
this.client.execute(getUrl("/close"),
133+
session -> {
134+
logger.debug("Starting..");
135+
return session.receive()
136+
.doOnNext(s -> logger.debug("inbound " + s))
137+
.then()
138+
.doFinally(signalType -> {
139+
logger.debug("Completed with: " + signalType);
140+
});
141+
})
142+
.block(Duration.ofMillis(5000));
143+
}
144+
130145

131146
@Configuration
132147
static class WebConfig {
@@ -137,6 +152,7 @@ public HandlerMapping handlerMapping() {
137152
map.put("/echo", new EchoWebSocketHandler());
138153
map.put("/sub-protocol", new SubProtocolWebSocketHandler());
139154
map.put("/custom-header", new CustomHeaderHandler());
155+
map.put("/close", new SessionClosingHandler());
140156

141157
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
142158
mapping.setUrlMap(map);
@@ -183,4 +199,12 @@ public Mono<Void> handle(WebSocketSession session) {
183199
}
184200
}
185201

202+
private static class SessionClosingHandler implements WebSocketHandler {
203+
204+
@Override
205+
public Mono<Void> handle(WebSocketSession session) {
206+
return Flux.never().mergeWith(session.close(CloseStatus.GOING_AWAY)).then();
207+
}
208+
}
209+
186210
}

0 commit comments

Comments
 (0)