Skip to content

Commit 22468da

Browse files
authored
MoP accept ws proxy connections (#1586)
1 parent d4f96c3 commit 22468da

File tree

9 files changed

+176
-40
lines changed

9 files changed

+176
-40
lines changed

docs/using-mqtt-over-websocket.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,31 @@ mqttTlsKeyFilePath=/xxx/server.key
5454
>
5555
> Secure WebSocket Port: `8084`
5656
57+
### How to use Proxy
58+
59+
```
60+
mqttProxyEnabled=true
61+
mqttProxyTlsEnabled=true
62+
mqttProxyTlsPskEnabled=true
63+
mqttProxyWsEnabled=true
64+
mqttProxyWssEnabled=true
65+
```
66+
67+
> #### Note
68+
> MQTT Proxy Info:
69+
>
70+
> Server: `broker.steamnative.io`
71+
>
72+
> Proxy TCP Port: `5682``
73+
>
74+
> Proxy TLS Port: `5683`
75+
>
76+
> Proxy TLS PSK Port: `5684`
77+
>
78+
> Proxy WebSocket Port: `5083
79+
>
80+
> Proxy Secure WebSocket Port: `5084`
81+
5782
## Get Started with MQTT over WebSocket
5883

5984
### Install MQTT WebSocket Client

mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTChannelInitializer.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,17 @@
1515

1616
import static org.apache.pulsar.client.impl.PulsarChannelInitializer.TLS_HANDLER;
1717
import io.netty.channel.ChannelInitializer;
18-
import io.netty.channel.ChannelPipeline;
1918
import io.netty.channel.socket.SocketChannel;
20-
import io.netty.handler.codec.http.HttpContentCompressor;
21-
import io.netty.handler.codec.http.HttpObjectAggregator;
22-
import io.netty.handler.codec.http.HttpServerCodec;
23-
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
2419
import io.netty.handler.codec.mqtt.MqttDecoder;
2520
import io.netty.handler.ssl.SslHandler;
2621
import io.netty.handler.timeout.IdleStateHandler;
2722
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTServerConfiguration;
2823
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTService;
29-
import io.streamnative.pulsar.handlers.mqtt.broker.codec.MqttWebSocketCodec;
30-
import io.streamnative.pulsar.handlers.mqtt.common.Constants;
3124
import io.streamnative.pulsar.handlers.mqtt.common.adapter.CombineAdapterHandler;
3225
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterDecoder;
3326
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterEncoder;
3427
import io.streamnative.pulsar.handlers.mqtt.common.psk.PSKUtils;
28+
import io.streamnative.pulsar.handlers.mqtt.common.utils.WebSocketUtils;
3529
import java.util.concurrent.ScheduledExecutorService;
3630
import java.util.concurrent.TimeUnit;
3731
import lombok.extern.slf4j.Slf4j;
@@ -89,7 +83,7 @@ public void initChannel(SocketChannel ch) throws Exception {
8983
new SslHandler(PSKUtils.createServerEngine(ch, mqttService.getPskConfiguration())));
9084
}
9185
if (this.enableWs) {
92-
addWsHandler(ch.pipeline());
86+
WebSocketUtils.addWsHandler(ch.pipeline(), mqttConfig);
9387
}
9488
// Decoder
9589
ch.pipeline().addLast(MqttAdapterDecoder.NAME, new MqttAdapterDecoder());
@@ -101,27 +95,6 @@ public void initChannel(SocketChannel ch) throws Exception {
10195
ch.pipeline().addLast(MQTTBrokerInboundHandler.NAME, new MQTTBrokerInboundHandler(mqttService));
10296
}
10397

104-
/**
105-
* Add websocket handler.
106-
* @param pipeline
107-
*/
108-
private void addWsHandler(ChannelPipeline pipeline) {
109-
// Encode or decode request and reply messages into HTTP messages
110-
pipeline.addLast(Constants.HANDLER_HTTP_CODEC, new HttpServerCodec());
111-
112-
// Combine the parts of an HTTP message into a complete HTTP message
113-
pipeline.addLast(Constants.HANDLER_HTTP_AGGREGATOR,
114-
new HttpObjectAggregator(mqttConfig.getHttpMaxContentLength()));
115-
116-
// Compress and encode HTTP messages
117-
pipeline.addLast(Constants.HANDLER_HTTP_COMPRESSOR, new HttpContentCompressor());
118-
119-
pipeline.addLast(Constants.HANDLER_WEB_SOCKET_SERVER_PROTOCOL,
120-
new WebSocketServerProtocolHandler(mqttConfig.getWebSocketPath(), Constants.MQTT_SUB_PROTOCOL_CSV_LIST,
121-
true, mqttConfig.getWebSocketMaxFrameSize()));
122-
pipeline.addLast(Constants.HANDLER_MQTT_WEB_SOCKET_CODEC, new MqttWebSocketCodec());
123-
}
124-
12598
protected PulsarSslConfiguration buildSslConfiguration(MQTTServerConfiguration config) {
12699
return PulsarSslConfiguration.builder()
127100
.tlsProvider(config.getMqttTlsProvider())

mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTCommonConfiguration.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,20 @@ public class MQTTCommonConfiguration extends ServiceConfiguration {
124124
)
125125
private int mqttProxyTlsPskPort = 5684;
126126

127+
@FieldContext(
128+
category = CATEGORY_MQTT_PROXY,
129+
required = false,
130+
doc = "The mqtt proxy ws port"
131+
)
132+
private int mqttProxyWsPort = 5083;
133+
134+
@FieldContext(
135+
category = CATEGORY_MQTT_PROXY,
136+
required = false,
137+
doc = "The mqtt proxy wss port"
138+
)
139+
private int mqttProxyWssPort = 5084;
140+
127141
@FieldContext(
128142
category = CATEGORY_MQTT_PROXY,
129143
required = false,
@@ -160,6 +174,20 @@ public class MQTTCommonConfiguration extends ServiceConfiguration {
160174
)
161175
private boolean mqttProxyTlsPskEnabled = false;
162176

177+
@FieldContext(
178+
category = CATEGORY_MQTT_PROXY,
179+
required = false,
180+
doc = "Whether start mqtt protocol handler with proxy ws"
181+
)
182+
private boolean mqttProxyWsEnabled = false;
183+
184+
@FieldContext(
185+
category = CATEGORY_MQTT_PROXY,
186+
required = false,
187+
doc = "Whether start mqtt protocol handler with proxy wss"
188+
)
189+
private boolean mqttProxyWssEnabled = false;
190+
163191
@FieldContext(
164192
category = CATEGORY_MQTT_PROXY,
165193
required = false,

mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/codec/MqttWebSocketCodec.java renamed to mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/codec/MqttWebSocketCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt.broker.codec;
14+
package io.streamnative.pulsar.handlers.mqtt.common.codec;
1515

1616
import io.netty.buffer.ByteBuf;
1717
import io.netty.channel.ChannelHandlerContext;

mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/codec/package-info.java renamed to mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/codec/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt.broker.codec;
14+
package io.streamnative.pulsar.handlers.mqtt.common.codec;

mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/ConfigurationUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public final class ConfigurationUtils {
5555
"^((mqtt)(\\+ssl)?(\\+psk)?|(ws)(\\+ssl)?)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";
5656

5757
public static final String PROXY_LISTENER_PATTERN =
58-
"^(mqtt-proxy)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";
58+
"^((mqtt-proxy)(\\\\+ssl)?(\\\\+psk)?|(ws-proxy)(\\\\+ssl)?)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";
5959

6060
/**
6161
* Creates PulsarConfiguration and loads it with populated attribute values loaded from provided property file.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.pulsar.handlers.mqtt.common.utils;
15+
16+
import io.netty.channel.ChannelPipeline;
17+
import io.netty.handler.codec.http.HttpContentCompressor;
18+
import io.netty.handler.codec.http.HttpObjectAggregator;
19+
import io.netty.handler.codec.http.HttpServerCodec;
20+
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
21+
import io.streamnative.pulsar.handlers.mqtt.common.Constants;
22+
import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration;
23+
import io.streamnative.pulsar.handlers.mqtt.common.codec.MqttWebSocketCodec;
24+
25+
public class WebSocketUtils {
26+
27+
/**
28+
* Add websocket handler.
29+
*
30+
* @param pipeline
31+
*/
32+
public static void addWsHandler(ChannelPipeline pipeline, MQTTCommonConfiguration configuration) {
33+
// Encode or decode request and reply messages into HTTP messages
34+
pipeline.addLast(Constants.HANDLER_HTTP_CODEC, new HttpServerCodec());
35+
36+
// Combine the parts of an HTTP message into a complete HTTP message
37+
pipeline.addLast(Constants.HANDLER_HTTP_AGGREGATOR,
38+
new HttpObjectAggregator(configuration.getHttpMaxContentLength()));
39+
40+
// Compress and encode HTTP messages
41+
pipeline.addLast(Constants.HANDLER_HTTP_COMPRESSOR, new HttpContentCompressor());
42+
43+
pipeline.addLast(Constants.HANDLER_WEB_SOCKET_SERVER_PROTOCOL,
44+
new WebSocketServerProtocolHandler(configuration.getWebSocketPath(),
45+
Constants.MQTT_SUB_PROTOCOL_CSV_LIST, true, configuration.getWebSocketMaxFrameSize()));
46+
pipeline.addLast(Constants.HANDLER_MQTT_WEB_SOCKET_CODEC, new MqttWebSocketCodec());
47+
}
48+
}

mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public class MQTTProxyService implements Closeable {
7373
private Channel listenChannel;
7474
private Channel listenChannelTls;
7575
private Channel listenChannelTlsPsk;
76+
private Channel listenChannelWs;
77+
private Channel listenChannelWss;
7678
private final EventLoopGroup acceptorGroup;
7779
private final EventLoopGroup workerGroup;
7880
private final WebService webService;
@@ -130,7 +132,7 @@ public void start() throws MQTTProxyException {
130132
serverBootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
131133
EventLoopUtil.enableTriggeredMode(serverBootstrap);
132134
serverBootstrap.childHandler(new MQTTProxyChannelInitializer(
133-
this, proxyConfig, false, sslContextRefresher));
135+
this, proxyConfig, false, false, sslContextRefresher));
134136

135137
try {
136138
listenChannel = serverBootstrap.bind(proxyConfig.getMqttProxyPort()).sync().channel();
@@ -142,7 +144,7 @@ public void start() throws MQTTProxyException {
142144
if (proxyConfig.isMqttProxyTlsEnabled() || proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) {
143145
ServerBootstrap tlsBootstrap = serverBootstrap.clone();
144146
tlsBootstrap.childHandler(new MQTTProxyChannelInitializer(
145-
this, proxyConfig, true, sslContextRefresher));
147+
this, proxyConfig, true, false, sslContextRefresher));
146148
try {
147149
listenChannelTls = tlsBootstrap.bind(proxyConfig.getMqttProxyTlsPort()).sync().channel();
148150
log.info("Started MQTT Proxy with TLS on {}", listenChannelTls.localAddress());
@@ -162,14 +164,38 @@ public void start() throws MQTTProxyException {
162164
// Add channel initializer
163165
ServerBootstrap tlsPskBootstrap = serverBootstrap.clone();
164166
tlsPskBootstrap.childHandler(new MQTTProxyChannelInitializer(
165-
this, proxyConfig, false, true, sslContextRefresher));
167+
this, proxyConfig, false, true, false, sslContextRefresher));
166168
try {
167169
listenChannelTlsPsk = tlsPskBootstrap.bind(proxyConfig.getMqttProxyTlsPskPort()).sync().channel();
168170
log.info("Started MQTT Proxy with TLS-PSK on {}", listenChannelTlsPsk.localAddress());
169171
} catch (InterruptedException e) {
170172
throw new MQTTProxyException(e);
171173
}
172174
}
175+
176+
if (proxyConfig.isMqttProxyWsEnabled()) {
177+
ServerBootstrap wsBootstrap = serverBootstrap.clone();
178+
wsBootstrap.childHandler(new MQTTProxyChannelInitializer(
179+
this, proxyConfig, false, true, sslContextRefresher));
180+
try {
181+
listenChannelWs = wsBootstrap.bind(proxyConfig.getMqttProxyWsPort()).sync().channel();
182+
log.info("Started MQTT Proxy with WS on {}", listenChannelWs.localAddress());
183+
} catch (InterruptedException e) {
184+
throw new MQTTProxyException(e);
185+
}
186+
}
187+
188+
if (proxyConfig.isMqttProxyWssEnabled()) {
189+
ServerBootstrap wssBootstrap = serverBootstrap.clone();
190+
wssBootstrap.childHandler(new MQTTProxyChannelInitializer(
191+
this, proxyConfig, true, true, sslContextRefresher));
192+
try {
193+
listenChannelWss = wssBootstrap.bind(proxyConfig.getMqttProxyWssPort()).sync().channel();
194+
log.info("Started MQTT Proxy with WSS on {}", listenChannelWss.localAddress());
195+
} catch (InterruptedException e) {
196+
throw new MQTTProxyException(e);
197+
}
198+
}
173199
this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig);
174200
this.eventService.start();
175201
this.webService.start();
@@ -184,7 +210,7 @@ public void start0() throws MQTTProxyException {
184210
if (proxyConfig.isMqttProxyTlsEnabled() || proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) {
185211
ServerBootstrap tlsBootstrap = serverBootstrap.clone();
186212
tlsBootstrap.childHandler(new MQTTProxyChannelInitializer(
187-
this, proxyConfig, true, sslContextRefresher));
213+
this, proxyConfig, true, false, sslContextRefresher));
188214
try {
189215
listenChannelTls = tlsBootstrap.bind(proxyConfig.getMqttProxyTlsPort()).sync().channel();
190216
log.info("Started MQTT Proxy with TLS on {}", listenChannelTls.localAddress());
@@ -204,14 +230,38 @@ public void start0() throws MQTTProxyException {
204230
// Add channel initializer
205231
ServerBootstrap tlsPskBootstrap = serverBootstrap.clone();
206232
tlsPskBootstrap.childHandler(new MQTTProxyChannelInitializer(
207-
this, proxyConfig, false, true, sslContextRefresher));
233+
this, proxyConfig, false, true, false, sslContextRefresher));
208234
try {
209235
listenChannelTlsPsk = tlsPskBootstrap.bind(proxyConfig.getMqttProxyTlsPskPort()).sync().channel();
210236
log.info("Started MQTT Proxy with TLS-PSK on {}", listenChannelTlsPsk.localAddress());
211237
} catch (InterruptedException e) {
212238
throw new MQTTProxyException(e);
213239
}
214240
}
241+
242+
if (proxyConfig.isMqttProxyWsEnabled()) {
243+
ServerBootstrap wsBootstrap = serverBootstrap.clone();
244+
wsBootstrap.childHandler(new MQTTProxyChannelInitializer(
245+
this, proxyConfig, false, true, sslContextRefresher));
246+
try {
247+
listenChannelWs = wsBootstrap.bind(proxyConfig.getMqttProxyWsPort()).sync().channel();
248+
log.info("Started MQTT Proxy with WS on {}", listenChannelWs.localAddress());
249+
} catch (InterruptedException e) {
250+
throw new MQTTProxyException(e);
251+
}
252+
}
253+
254+
if (proxyConfig.isMqttProxyWssEnabled()) {
255+
ServerBootstrap wssBootstrap = serverBootstrap.clone();
256+
wssBootstrap.childHandler(new MQTTProxyChannelInitializer(
257+
this, proxyConfig, true, true, sslContextRefresher));
258+
try {
259+
listenChannelWss = wssBootstrap.bind(proxyConfig.getMqttProxyWssPort()).sync().channel();
260+
log.info("Started MQTT Proxy with WSS on {}", listenChannelWss.localAddress());
261+
} catch (InterruptedException e) {
262+
throw new MQTTProxyException(e);
263+
}
264+
}
215265
this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig);
216266
this.eventService.start();
217267
}
@@ -227,6 +277,12 @@ public void close() {
227277
if (listenChannelTlsPsk != null) {
228278
listenChannelTlsPsk.close();
229279
}
280+
if (listenChannelWs != null) {
281+
listenChannelWs.close();
282+
}
283+
if (listenChannelWss != null) {
284+
listenChannelWss.close();
285+
}
230286
this.acceptorGroup.shutdownGracefully();
231287
this.workerGroup.shutdownGracefully();
232288
this.eventService.close();

mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyChannelInitializer.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterDecoder;
2424
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterEncoder;
2525
import io.streamnative.pulsar.handlers.mqtt.common.psk.PSKUtils;
26+
import io.streamnative.pulsar.handlers.mqtt.common.utils.WebSocketUtils;
2627
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyConfiguration;
2728
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService;
2829
import io.streamnative.pulsar.handlers.mqtt.proxy.impl.MQTTProxyException;
@@ -45,22 +46,24 @@ public class MQTTProxyChannelInitializer extends ChannelInitializer<SocketChanne
4546

4647
private final boolean enableTls;
4748
private final boolean enableTlsPsk;
49+
private final boolean enableWs;
4850
private PulsarSslFactory sslFactory;
4951

5052
public MQTTProxyChannelInitializer(MQTTProxyService proxyService, MQTTProxyConfiguration proxyConfig,
51-
boolean enableTls,
53+
boolean enableTls, boolean enableWs,
5254
ScheduledExecutorService sslContextRefresher) throws MQTTProxyException {
53-
this(proxyService, proxyConfig, enableTls, false, sslContextRefresher);
55+
this(proxyService, proxyConfig, enableTls, false, enableWs, sslContextRefresher);
5456
}
5557

5658
public MQTTProxyChannelInitializer(MQTTProxyService proxyService, MQTTProxyConfiguration proxyConfig,
57-
boolean enableTls, boolean enableTlsPsk,
59+
boolean enableTls, boolean enableTlsPsk, boolean enableWs,
5860
ScheduledExecutorService sslContextRefresher) throws MQTTProxyException {
5961
try {
6062
this.proxyService = proxyService;
6163
this.proxyConfig = proxyConfig;
6264
this.enableTls = enableTls;
6365
this.enableTlsPsk = enableTlsPsk;
66+
this.enableWs = enableWs;
6467
if (this.enableTls) {
6568
PulsarSslConfiguration sslConfiguration = buildSslConfiguration(proxyConfig);
6669
this.sslFactory = (PulsarSslFactory) Class.forName(proxyConfig.getSslFactoryPlugin())
@@ -88,6 +91,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
8891
ch.pipeline().addLast(TLS_HANDLER,
8992
new SslHandler(PSKUtils.createServerEngine(ch, proxyService.getPskConfiguration())));
9093
}
94+
if (this.enableWs) {
95+
WebSocketUtils.addWsHandler(ch.pipeline(), proxyConfig);
96+
}
9197
ch.pipeline().addLast(MqttAdapterDecoder.NAME, new MqttAdapterDecoder());
9298
ch.pipeline().addLast("mqtt-decoder", new MqttDecoder(proxyConfig.getMqttMessageMaxLength()));
9399
// Encoder

0 commit comments

Comments
 (0)