@@ -71,7 +71,37 @@ Then map it to a URL and add a `WebSocketHandlerAdapter`:
71
71
[[webflux-websockethandler]]
72
72
=== WebSocketHandler
73
73
74
- The most basic implementation of a handler is one that handles inbound messages:
74
+ The `handle` method of `WebSocketHandler` takes `WebSocketSession` and returns `Mono<Void>`
75
+ to indicate when application handling of the session is complete. The session is handled
76
+ through two streams, one for inbound and one for outbound messages:
77
+
78
+ [options="header"]
79
+ |===
80
+ | WebSocketSession method | Description
81
+
82
+ | `Flux<WebSocketMessage> receive()`
83
+ | Provides access to the inbound message stream, and completes when the connection is closed.
84
+
85
+ | `Mono<Void> send(Publisher<WebSocketMessage>)`
86
+ | Takes a source for outgoing messages, writes the messages, and returns a `Mono<Void>` that
87
+ completes when the source completes and writing is done.
88
+
89
+ |===
90
+
91
+ A `WebSocketHandler` must compose the inbound and outbound streams into a unified flow, and
92
+ return a `Mono<Void>` that reflects the completion of that flow. Depending on application
93
+ requirements, the unified flow completes when:
94
+
95
+ * Either inbound or outbound message streams complete.
96
+ * Inbound stream completes (i.e. connection closed), while outbound is infinite.
97
+ * At a chosen point through the `close` method of `WebSocketSession`.
98
+
99
+ When inbound and outbound message streams are composed together, there is no need to
100
+ check if the connection is open, since Reactive Streams signals will terminate activity.
101
+ The inbound stream receives a completion/error signal, and the outbound stream receives
102
+ receives a cancellation signal.
103
+
104
+ The most basic implementation of a handler is one that handles the inbound stream:
75
105
76
106
[source,java,indent=0]
77
107
[subs="verbatim,quotes"]
@@ -94,17 +124,17 @@ class ExampleHandler implements WebSocketHandler {
94
124
<1> Access stream of inbound messages.
95
125
<2> Do something with each message.
96
126
<3> Perform nested async operation using message content.
97
- <4> Return `Mono<Void>` that doesn't complete while we continue to receive .
127
+ <4> Return `Mono<Void>` that completes when receiving completes .
98
128
99
- [NOTE ]
129
+ [TIP ]
100
130
====
101
- If performing a nested, asynchronous operation , you'll need to call
102
- `message.retain()` if the underlying server uses pooled data buffers (e.g. Netty), or
103
- otherwise the data buffer may be released before you've had a chance to read the data.
104
- For more on this see <<core.adoc#databuffers,Data Buffers and Codecs>>.
131
+ For nested, asynchronous operations , you may need to call `message.retain()` on underlying
132
+ servers that use pooled data buffers (e.g. Netty), or otherwise the data buffer may be
133
+ released before you've had a chance to read the data. For more background see
134
+ <<core.adoc#databuffers,Data Buffers and Codecs>>.
105
135
====
106
136
107
- A handler can work with inbound and outbound messages as independent streams:
137
+ The below implementation combines the inbound with the outbound streams:
108
138
109
139
[source,java,indent=0]
110
140
[subs="verbatim,quotes"]
@@ -114,28 +144,25 @@ class ExampleHandler implements WebSocketHandler {
114
144
@Override
115
145
public Mono<Void> handle(WebSocketSession session) {
116
146
117
- Mono<Void> input = session.receive() <1>
147
+ Flux<WebSocketMessage> output = session.receive() <1>
118
148
.doOnNext(message -> {
119
149
// ...
120
150
})
121
151
.concatMap(message -> {
122
152
// ...
123
153
})
124
- .then();
125
-
126
- Flux<String> source = ... ;
127
- Mono<Void> output = session.send(source.map(session::textMessage)); <2>
154
+ .map(value -> session.textMessage("Echo " + value)); <2>
128
155
129
- return Mono.zip(input, output).then(); <3>
156
+ return session.send( output); <3>
130
157
}
131
158
}
132
159
----
133
160
<1> Handle inbound message stream.
134
- <2> Send outgoing messages.
135
- <3> Join the streams and return `Mono<Void>` that completes when _either_ stream ends.
161
+ <2> Create outbound message, producing a combined flow.
162
+ <3> Return `Mono<Void>` that doesn't complete while we continue to receive.
163
+
164
+ Inbound and outbound streams can be independent, and joined only for completion:
136
165
137
- A handler can compose a connected flow of inbound and outbound messages:
138
- 4
139
166
[source,java,indent=0]
140
167
[subs="verbatim,quotes"]
141
168
----
@@ -144,22 +171,25 @@ class ExampleHandler implements WebSocketHandler {
144
171
@Override
145
172
public Mono<Void> handle(WebSocketSession session) {
146
173
147
- Flux<WebSocketMessage> output = session.receive() <1>
174
+ Mono<Void> input = session.receive() <1>
148
175
.doOnNext(message -> {
149
176
// ...
150
177
})
151
178
.concatMap(message -> {
152
179
// ...
153
180
})
154
- .map(value -> session.textMessage("Echo " + value)); <2>
181
+ .then();
155
182
156
- return session.send(output); <3>
183
+ Flux<String> source = ... ;
184
+ Mono<Void> output = session.send(source.map(session::textMessage)); <2>
185
+
186
+ return Mono.zip(input, output).then(); <3>
157
187
}
158
188
}
159
189
----
160
190
<1> Handle inbound message stream.
161
- <2> Create outbound message, producing a combined flow .
162
- <3> Return `Mono<Void>` that doesn't complete while we continue to receive .
191
+ <2> Send outgoing messages .
192
+ <3> Join the streams and return `Mono<Void>` that completes when _either_ stream ends .
163
193
164
194
165
195
@@ -172,6 +202,8 @@ of `HandshakeWebSocketService`, which performs basic checks on the WebSocket req
172
202
then uses `RequestUpgradeStrategy` for the server in use. Currently there is built-in
173
203
support for Reactor Netty, Tomcat, Jetty, and Undertow.
174
204
205
+ The above are just 3 examples to serve as a starting point.
206
+
175
207
176
208
177
209
[[webflux-websocket-server-config]]
0 commit comments