21
21
22
22
import reactor .core .publisher .Mono ;
23
23
import reactor .netty .http .server .HttpServerResponse ;
24
+ import reactor .netty .http .server .WebsocketServerSpec ;
24
25
25
26
import org .springframework .core .io .buffer .NettyDataBufferFactory ;
26
27
import org .springframework .http .server .reactive .AbstractServerHttpResponse ;
27
28
import org .springframework .http .server .reactive .ServerHttpResponse ;
28
29
import org .springframework .http .server .reactive .ServerHttpResponseDecorator ;
29
30
import org .springframework .lang .Nullable ;
31
+ import org .springframework .util .Assert ;
30
32
import org .springframework .web .reactive .socket .HandshakeInfo ;
31
33
import org .springframework .web .reactive .socket .WebSocketHandler ;
32
- import org .springframework .web .reactive .socket .adapter .NettyWebSocketSessionSupport ;
33
34
import org .springframework .web .reactive .socket .adapter .ReactorNettyWebSocketSession ;
34
35
import org .springframework .web .reactive .socket .server .RequestUpgradeStrategy ;
35
36
import org .springframework .web .server .ServerWebExchange ;
42
43
*/
43
44
public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
44
45
45
- private int maxFramePayloadLength = NettyWebSocketSessionSupport . DEFAULT_FRAME_MAX_SIZE ;
46
+ private final Supplier < WebsocketServerSpec . Builder > specBuilderSupplier ;
46
47
47
- private boolean handlePing = false ;
48
+ @ Nullable
49
+ private Integer maxFramePayloadLength ;
48
50
51
+ @ Nullable
52
+ private Boolean handlePing ;
53
+
54
+
55
+ /**
56
+ * Create an instances with a default {@link WebsocketServerSpec.Builder}.
57
+ * @since 5.2.6
58
+ */
59
+ public ReactorNettyRequestUpgradeStrategy () {
60
+ this (WebsocketServerSpec .builder ());
61
+ }
62
+
63
+
64
+ /**
65
+ * Create an instance with a pre-configured {@link WebsocketServerSpec.Builder}
66
+ * to use for WebSocket upgrades.
67
+ * @since 5.2.6
68
+ */
69
+ public ReactorNettyRequestUpgradeStrategy (Supplier <WebsocketServerSpec .Builder > builderSupplier ) {
70
+ Assert .notNull (builderSupplier , "WebsocketServerSpec.Builder is required" );
71
+ this .specBuilderSupplier = builderSupplier ;
72
+ }
73
+
74
+
75
+ /**
76
+ * Build an instance of {@code WebsocketServerSpec} that reflects the current
77
+ * configuration. This can be used to check the configured parameters except
78
+ * for sub-protocols which depend on the {@link WebSocketHandler} that is used
79
+ * for a given upgrade.
80
+ * @since 5.2.6
81
+ */
82
+ public WebsocketServerSpec getWebsocketServerSpec () {
83
+ return buildSpec (null );
84
+ }
85
+
86
+ private WebsocketServerSpec buildSpec (@ Nullable String subProtocol ) {
87
+ WebsocketServerSpec .Builder builder = this .specBuilderSupplier .get ();
88
+ if (subProtocol != null ) {
89
+ builder .protocols (subProtocol );
90
+ }
91
+ if (this .maxFramePayloadLength != null ) {
92
+ builder .maxFramePayloadLength (this .maxFramePayloadLength );
93
+ }
94
+ if (this .handlePing != null ) {
95
+ builder .handlePing (this .handlePing );
96
+ }
97
+ return builder .build ();
98
+ }
49
99
50
100
/**
51
101
* Configure the maximum allowable frame payload length. Setting this value
@@ -57,17 +107,22 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg
57
107
* <p>By default set to 65536 (64K).
58
108
* @param maxFramePayloadLength the max length for frames.
59
109
* @since 5.1
110
+ * @deprecated as of 5.2.6 in favor of providing a supplier of
111
+ * {@link WebsocketServerSpec.Builder} wiht a constructor argument.
60
112
*/
113
+ @ Deprecated
61
114
public void setMaxFramePayloadLength (Integer maxFramePayloadLength ) {
62
115
this .maxFramePayloadLength = maxFramePayloadLength ;
63
116
}
64
117
65
118
/**
66
119
* Return the configured max length for frames.
67
120
* @since 5.1
121
+ * @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()}
68
122
*/
123
+ @ Deprecated
69
124
public int getMaxFramePayloadLength () {
70
- return this .maxFramePayloadLength ;
125
+ return getWebsocketServerSpec () .maxFramePayloadLength () ;
71
126
}
72
127
73
128
/**
@@ -80,43 +135,46 @@ public int getMaxFramePayloadLength() {
80
135
* frames will be passed through to the {@link WebSocketHandler}.
81
136
* @param handlePing whether to let Ping frames through for handling
82
137
* @since 5.2.4
138
+ * @deprecated as of 5.2.6 in favor of providing a supplier of
139
+ * {@link WebsocketServerSpec.Builder} wiht a constructor argument.
83
140
*/
141
+ @ Deprecated
84
142
public void setHandlePing (boolean handlePing ) {
85
143
this .handlePing = handlePing ;
86
144
}
87
145
88
146
/**
89
147
* Return the configured {@link #setHandlePing(boolean)}.
90
148
* @since 5.2.4
149
+ * @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()}
91
150
*/
151
+ @ Deprecated
92
152
public boolean getHandlePing () {
93
- return this .handlePing ;
153
+ return getWebsocketServerSpec () .handlePing () ;
94
154
}
95
155
96
156
97
157
@ Override
98
- @ SuppressWarnings ("deprecation" )
99
158
public Mono <Void > upgrade (ServerWebExchange exchange , WebSocketHandler handler ,
100
159
@ Nullable String subProtocol , Supplier <HandshakeInfo > handshakeInfoFactory ) {
101
160
102
161
ServerHttpResponse response = exchange .getResponse ();
103
162
HttpServerResponse reactorResponse = getNativeResponse (response );
104
163
HandshakeInfo handshakeInfo = handshakeInfoFactory .get ();
105
164
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory ) response .bufferFactory ();
165
+ URI uri = exchange .getRequest ().getURI ();
106
166
107
167
// Trigger WebFlux preCommit actions and upgrade
108
168
return response .setComplete ()
109
- .then (Mono .defer (() -> reactorResponse .sendWebsocket (
110
- subProtocol ,
111
- this .maxFramePayloadLength ,
112
- this .handlePing ,
113
- (in , out ) -> {
114
- ReactorNettyWebSocketSession session =
115
- new ReactorNettyWebSocketSession (
116
- in , out , handshakeInfo , bufferFactory , this .maxFramePayloadLength );
117
- URI uri = exchange .getRequest ().getURI ();
118
- return handler .handle (session ).checkpoint (uri + " [ReactorNettyRequestUpgradeStrategy]" );
119
- })));
169
+ .then (Mono .defer (() -> {
170
+ WebsocketServerSpec spec = buildSpec (subProtocol );
171
+ return reactorResponse .sendWebsocket ((in , out ) -> {
172
+ ReactorNettyWebSocketSession session =
173
+ new ReactorNettyWebSocketSession (
174
+ in , out , handshakeInfo , bufferFactory , spec .maxFramePayloadLength ());
175
+ return handler .handle (session ).checkpoint (uri + " [ReactorNettyRequestUpgradeStrategy]" );
176
+ }, spec );
177
+ }));
120
178
}
121
179
122
180
private static HttpServerResponse getNativeResponse (ServerHttpResponse response ) {
0 commit comments