Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/using-mqtt-over-websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,31 @@ mqttTlsKeyFilePath=/xxx/server.key
>
> Secure WebSocket Port: `8084`

### How to use Proxy

```
mqttProxyEnabled=true
mqttProxyTlsEnabled=true
mqttProxyTlsPskEnabled=true
mqttProxyWsEnabled=true
mqttProxyWssEnabled=true
```

> #### Note
> MQTT Proxy Info:
>
> Server: `broker.steamnative.io`
>
> Proxy TCP Port: `5682``
>
> Proxy TLS Port: `5683`
>
> Proxy TLS PSK Port: `5684`
>
> Proxy WebSocket Port: `5083
>
> Proxy Secure WebSocket Port: `5084`

## Get Started with MQTT over WebSocket

### Install MQTT WebSocket Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,17 @@

import static org.apache.pulsar.client.impl.PulsarChannelInitializer.TLS_HANDLER;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTServerConfiguration;
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTService;
import io.streamnative.pulsar.handlers.mqtt.broker.codec.MqttWebSocketCodec;
import io.streamnative.pulsar.handlers.mqtt.common.Constants;
import io.streamnative.pulsar.handlers.mqtt.common.adapter.CombineAdapterHandler;
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterDecoder;
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterEncoder;
import io.streamnative.pulsar.handlers.mqtt.common.psk.PSKUtils;
import io.streamnative.pulsar.handlers.mqtt.common.utils.WebSocketUtils;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -89,7 +83,7 @@ public void initChannel(SocketChannel ch) throws Exception {
new SslHandler(PSKUtils.createServerEngine(ch, mqttService.getPskConfiguration())));
}
if (this.enableWs) {
addWsHandler(ch.pipeline());
WebSocketUtils.addWsHandler(ch.pipeline(), mqttConfig);
}
// Decoder
ch.pipeline().addLast(MqttAdapterDecoder.NAME, new MqttAdapterDecoder());
Expand All @@ -101,27 +95,6 @@ public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(MQTTBrokerInboundHandler.NAME, new MQTTBrokerInboundHandler(mqttService));
}

/**
* Add websocket handler.
* @param pipeline
*/
private void addWsHandler(ChannelPipeline pipeline) {
// Encode or decode request and reply messages into HTTP messages
pipeline.addLast(Constants.HANDLER_HTTP_CODEC, new HttpServerCodec());

// Combine the parts of an HTTP message into a complete HTTP message
pipeline.addLast(Constants.HANDLER_HTTP_AGGREGATOR,
new HttpObjectAggregator(mqttConfig.getHttpMaxContentLength()));

// Compress and encode HTTP messages
pipeline.addLast(Constants.HANDLER_HTTP_COMPRESSOR, new HttpContentCompressor());

pipeline.addLast(Constants.HANDLER_WEB_SOCKET_SERVER_PROTOCOL,
new WebSocketServerProtocolHandler(mqttConfig.getWebSocketPath(), Constants.MQTT_SUB_PROTOCOL_CSV_LIST,
true, mqttConfig.getWebSocketMaxFrameSize()));
pipeline.addLast(Constants.HANDLER_MQTT_WEB_SOCKET_CODEC, new MqttWebSocketCodec());
}

protected PulsarSslConfiguration buildSslConfiguration(MQTTServerConfiguration config) {
return PulsarSslConfiguration.builder()
.tlsProvider(config.getMqttTlsProvider())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,20 @@ public class MQTTCommonConfiguration extends ServiceConfiguration {
)
private int mqttProxyTlsPskPort = 5684;

@FieldContext(
category = CATEGORY_MQTT_PROXY,
required = false,
doc = "The mqtt proxy ws port"
)
private int mqttProxyWsPort = 5083;

@FieldContext(
category = CATEGORY_MQTT_PROXY,
required = false,
doc = "The mqtt proxy wss port"
)
private int mqttProxyWssPort = 5084;

@FieldContext(
category = CATEGORY_MQTT_PROXY,
required = false,
Expand Down Expand Up @@ -160,6 +174,20 @@ public class MQTTCommonConfiguration extends ServiceConfiguration {
)
private boolean mqttProxyTlsPskEnabled = false;

@FieldContext(
category = CATEGORY_MQTT_PROXY,
required = false,
doc = "Whether start mqtt protocol handler with proxy ws"
)
private boolean mqttProxyWsEnabled = false;

@FieldContext(
category = CATEGORY_MQTT_PROXY,
required = false,
doc = "Whether start mqtt protocol handler with proxy wss"
)
private boolean mqttProxyWssEnabled = false;

@FieldContext(
category = CATEGORY_MQTT_PROXY,
required = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.broker.codec;
package io.streamnative.pulsar.handlers.mqtt.common.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.broker.codec;
package io.streamnative.pulsar.handlers.mqtt.common.codec;
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public final class ConfigurationUtils {
"^((mqtt)(\\+ssl)?(\\+psk)?|(ws)(\\+ssl)?)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";

public static final String PROXY_LISTENER_PATTERN =
"^(mqtt-proxy)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";
"^((mqtt-proxy)(\\\\+ssl)?(\\\\+psk)?|(ws-proxy)(\\\\+ssl)?)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";

/**
* Creates PulsarConfiguration and loads it with populated attribute values loaded from provided property file.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.common.utils;

import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.streamnative.pulsar.handlers.mqtt.common.Constants;
import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration;
import io.streamnative.pulsar.handlers.mqtt.common.codec.MqttWebSocketCodec;

public class WebSocketUtils {

/**
* Add websocket handler.
*
* @param pipeline
*/
public static void addWsHandler(ChannelPipeline pipeline, MQTTCommonConfiguration configuration) {
// Encode or decode request and reply messages into HTTP messages
pipeline.addLast(Constants.HANDLER_HTTP_CODEC, new HttpServerCodec());

// Combine the parts of an HTTP message into a complete HTTP message
pipeline.addLast(Constants.HANDLER_HTTP_AGGREGATOR,
new HttpObjectAggregator(configuration.getHttpMaxContentLength()));

// Compress and encode HTTP messages
pipeline.addLast(Constants.HANDLER_HTTP_COMPRESSOR, new HttpContentCompressor());

pipeline.addLast(Constants.HANDLER_WEB_SOCKET_SERVER_PROTOCOL,
new WebSocketServerProtocolHandler(configuration.getWebSocketPath(),
Constants.MQTT_SUB_PROTOCOL_CSV_LIST, true, configuration.getWebSocketMaxFrameSize()));
pipeline.addLast(Constants.HANDLER_MQTT_WEB_SOCKET_CODEC, new MqttWebSocketCodec());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class MQTTProxyService implements Closeable {
private Channel listenChannel;
private Channel listenChannelTls;
private Channel listenChannelTlsPsk;
private Channel listenChannelWs;
private Channel listenChannelWss;
private final EventLoopGroup acceptorGroup;
private final EventLoopGroup workerGroup;
private final WebService webService;
Expand Down Expand Up @@ -130,7 +132,7 @@ public void start() throws MQTTProxyException {
serverBootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
EventLoopUtil.enableTriggeredMode(serverBootstrap);
serverBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, false, sslContextRefresher));
this, proxyConfig, false, false, sslContextRefresher));

try {
listenChannel = serverBootstrap.bind(proxyConfig.getMqttProxyPort()).sync().channel();
Expand All @@ -142,7 +144,7 @@ public void start() throws MQTTProxyException {
if (proxyConfig.isMqttProxyTlsEnabled() || proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) {
ServerBootstrap tlsBootstrap = serverBootstrap.clone();
tlsBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, true, sslContextRefresher));
this, proxyConfig, true, false, sslContextRefresher));
try {
listenChannelTls = tlsBootstrap.bind(proxyConfig.getMqttProxyTlsPort()).sync().channel();
log.info("Started MQTT Proxy with TLS on {}", listenChannelTls.localAddress());
Expand All @@ -162,14 +164,38 @@ public void start() throws MQTTProxyException {
// Add channel initializer
ServerBootstrap tlsPskBootstrap = serverBootstrap.clone();
tlsPskBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, false, true, sslContextRefresher));
this, proxyConfig, false, true, false, sslContextRefresher));
try {
listenChannelTlsPsk = tlsPskBootstrap.bind(proxyConfig.getMqttProxyTlsPskPort()).sync().channel();
log.info("Started MQTT Proxy with TLS-PSK on {}", listenChannelTlsPsk.localAddress());
} catch (InterruptedException e) {
throw new MQTTProxyException(e);
}
}

if (proxyConfig.isMqttProxyWsEnabled()) {
ServerBootstrap wsBootstrap = serverBootstrap.clone();
wsBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, false, true, sslContextRefresher));
try {
listenChannelWs = wsBootstrap.bind(proxyConfig.getMqttProxyWsPort()).sync().channel();
log.info("Started MQTT Proxy with WS on {}", listenChannelWs.localAddress());
} catch (InterruptedException e) {
throw new MQTTProxyException(e);
}
}

if (proxyConfig.isMqttProxyWssEnabled()) {
ServerBootstrap wssBootstrap = serverBootstrap.clone();
wssBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, true, true, sslContextRefresher));
try {
listenChannelWss = wssBootstrap.bind(proxyConfig.getMqttProxyWssPort()).sync().channel();
log.info("Started MQTT Proxy with WSS on {}", listenChannelWss.localAddress());
} catch (InterruptedException e) {
throw new MQTTProxyException(e);
}
}
this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig);
this.eventService.start();
this.webService.start();
Expand All @@ -184,7 +210,7 @@ public void start0() throws MQTTProxyException {
if (proxyConfig.isMqttProxyTlsEnabled() || proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) {
ServerBootstrap tlsBootstrap = serverBootstrap.clone();
tlsBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, true, sslContextRefresher));
this, proxyConfig, true, false, sslContextRefresher));
try {
listenChannelTls = tlsBootstrap.bind(proxyConfig.getMqttProxyTlsPort()).sync().channel();
log.info("Started MQTT Proxy with TLS on {}", listenChannelTls.localAddress());
Expand All @@ -204,14 +230,38 @@ public void start0() throws MQTTProxyException {
// Add channel initializer
ServerBootstrap tlsPskBootstrap = serverBootstrap.clone();
tlsPskBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, false, true, sslContextRefresher));
this, proxyConfig, false, true, false, sslContextRefresher));
try {
listenChannelTlsPsk = tlsPskBootstrap.bind(proxyConfig.getMqttProxyTlsPskPort()).sync().channel();
log.info("Started MQTT Proxy with TLS-PSK on {}", listenChannelTlsPsk.localAddress());
} catch (InterruptedException e) {
throw new MQTTProxyException(e);
}
}

if (proxyConfig.isMqttProxyWsEnabled()) {
ServerBootstrap wsBootstrap = serverBootstrap.clone();
wsBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, false, true, sslContextRefresher));
try {
listenChannelWs = wsBootstrap.bind(proxyConfig.getMqttProxyWsPort()).sync().channel();
log.info("Started MQTT Proxy with WS on {}", listenChannelWs.localAddress());
} catch (InterruptedException e) {
throw new MQTTProxyException(e);
}
}

if (proxyConfig.isMqttProxyWssEnabled()) {
ServerBootstrap wssBootstrap = serverBootstrap.clone();
wssBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, true, true, sslContextRefresher));
try {
listenChannelWss = wssBootstrap.bind(proxyConfig.getMqttProxyWssPort()).sync().channel();
log.info("Started MQTT Proxy with WSS on {}", listenChannelWss.localAddress());
} catch (InterruptedException e) {
throw new MQTTProxyException(e);
}
}
this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig);
this.eventService.start();
}
Expand All @@ -227,6 +277,12 @@ public void close() {
if (listenChannelTlsPsk != null) {
listenChannelTlsPsk.close();
}
if (listenChannelWs != null) {
listenChannelWs.close();
}
if (listenChannelWss != null) {
listenChannelWss.close();
}
this.acceptorGroup.shutdownGracefully();
this.workerGroup.shutdownGracefully();
this.eventService.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterDecoder;
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterEncoder;
import io.streamnative.pulsar.handlers.mqtt.common.psk.PSKUtils;
import io.streamnative.pulsar.handlers.mqtt.common.utils.WebSocketUtils;
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyConfiguration;
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService;
import io.streamnative.pulsar.handlers.mqtt.proxy.impl.MQTTProxyException;
Expand All @@ -45,22 +46,24 @@ public class MQTTProxyChannelInitializer extends ChannelInitializer<SocketChanne

private final boolean enableTls;
private final boolean enableTlsPsk;
private final boolean enableWs;
private PulsarSslFactory sslFactory;

public MQTTProxyChannelInitializer(MQTTProxyService proxyService, MQTTProxyConfiguration proxyConfig,
boolean enableTls,
boolean enableTls, boolean enableWs,
ScheduledExecutorService sslContextRefresher) throws MQTTProxyException {
this(proxyService, proxyConfig, enableTls, false, sslContextRefresher);
this(proxyService, proxyConfig, enableTls, false, enableWs, sslContextRefresher);
}

public MQTTProxyChannelInitializer(MQTTProxyService proxyService, MQTTProxyConfiguration proxyConfig,
boolean enableTls, boolean enableTlsPsk,
boolean enableTls, boolean enableTlsPsk, boolean enableWs,
ScheduledExecutorService sslContextRefresher) throws MQTTProxyException {
try {
this.proxyService = proxyService;
this.proxyConfig = proxyConfig;
this.enableTls = enableTls;
this.enableTlsPsk = enableTlsPsk;
this.enableWs = enableWs;
if (this.enableTls) {
PulsarSslConfiguration sslConfiguration = buildSslConfiguration(proxyConfig);
this.sslFactory = (PulsarSslFactory) Class.forName(proxyConfig.getSslFactoryPlugin())
Expand Down Expand Up @@ -88,6 +91,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(TLS_HANDLER,
new SslHandler(PSKUtils.createServerEngine(ch, proxyService.getPskConfiguration())));
}
if (this.enableWs) {
WebSocketUtils.addWsHandler(ch.pipeline(), proxyConfig);
}
ch.pipeline().addLast(MqttAdapterDecoder.NAME, new MqttAdapterDecoder());
ch.pipeline().addLast("mqtt-decoder", new MqttDecoder(proxyConfig.getMqttMessageMaxLength()));
// Encoder
Expand Down
Loading