Skip to content

Commit c2431a6

Browse files
authored
Seperate proxy and broker a single module (#1510)
1 parent 8121554 commit c2431a6

File tree

207 files changed

+1683
-176
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

207 files changed

+1683
-176
lines changed

.ci/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ ARG MOP_VERSION
66
USER root
77
RUN rm -rf /pulsar/protocols/pulsar-protocol-handler-mqtt-*.nar
88

9-
COPY ../mqtt-impl/target/pulsar-protocol-handler-mqtt-${MOP_VERSION}.nar /pulsar/protocols
9+
COPY ../mqtt-broker/target/pulsar-protocol-handler-mqtt-${MOP_VERSION}.nar /pulsar/protocols

.github/workflows/pr_test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ jobs:
6161
id: list-test
6262
run: |
6363
TESTS=`find tests/src/test/java/io/streamnative/pulsar/handlers/mqtt \
64-
mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt -name "*Test.java" | \
64+
mqtt-broker/src/test/java/io/streamnative/pulsar/handlers/mqtt -name "*Test.java" | \
6565
awk -F "/" '{ print $NF }' | \
6666
awk -F "." '{ print $1 }' | \
6767
jq -R -s -c 'split("\n") | map(select(. != ""))'`
@@ -146,7 +146,7 @@ jobs:
146146
- name: Download jacoco artifact
147147
uses: actions/download-artifact@v3
148148
with:
149-
path: mqtt-impl/target
149+
path: mqtt-broker/target
150150

151151
- name: Merge jacoco report
152152
run: mvn jacoco:merge

README.md

Lines changed: 1 addition & 1 deletion
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@
2727
<name>StreamNative :: Pulsar Protocol Handler :: MQTT</name>
2828
<description>MQTT on Pulsar implemented using Pulsar Protocol Handler</description>
2929

30+
<dependencies>
31+
<dependency>
32+
<groupId>io.streamnative.pulsar.handlers</groupId>
33+
<artifactId>pulsar-protocol-handler-mqtt-proxy</artifactId>
34+
<version>${project.version}</version>
35+
</dependency>
36+
</dependencies>
3037
<build>
3138
<plugins>
3239
<plugin>

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTProtocolHandler.java renamed to mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTProtocolHandler.java

File renamed without changes.

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTServerConfiguration.java renamed to mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTServerConfiguration.java

File renamed without changes.

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTService.java renamed to mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,17 @@ public MQTTService(BrokerService brokerService, MQTTServerConfiguration serverCo
9393
this.brokerService = brokerService;
9494
this.pulsarService = brokerService.pulsar();
9595
this.serverConfiguration = serverConfiguration;
96-
this.pskConfiguration = new PSKConfiguration(serverConfiguration);
96+
this.pskConfiguration = new PSKConfiguration(serverConfiguration.getMqttTlsPskIdentityHint(),
97+
serverConfiguration.getMqttTlsPskIdentity(), serverConfiguration.getMqttTlsPskIdentityFile(),
98+
serverConfiguration.getMqttTlsProtocols(), serverConfiguration.getMqttTlsCiphers());
9799
this.authorizationService = brokerService.getAuthorizationService();
98100
this.bundleOwnershipListener = new MQTTNamespaceBundleOwnershipListener(pulsarService.getNamespaceService());
99101
this.metricsCollector = new MQTTMetricsCollector(serverConfiguration);
100102
this.metricsProvider = new MQTTMetricsProvider(metricsCollector);
101103
this.pulsarService.addPrometheusRawMetricsProvider(metricsProvider);
102104
this.authenticationService = serverConfiguration.isMqttAuthenticationEnabled()
103105
? new MQTTAuthenticationService(brokerService,
104-
serverConfiguration.getMqttAuthenticationMethods(),
105-
serverConfiguration.isMqttProxyMTlsAuthenticationEnabled()) : null;
106+
serverConfiguration.getMqttAuthenticationMethods()) : null;
106107
this.connectionManager = new MQTTConnectionManager(pulsarService.getAdvertisedAddress());
107108
this.subscriptionManager = new MQTTSubscriptionManager();
108109
if (getServerConfiguration().isMqttProxyEnabled()) {
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.pulsar.handlers.mqtt.broker.channel;
15+
16+
import static com.google.common.base.Preconditions.checkArgument;
17+
import io.netty.channel.ChannelHandler.Sharable;
18+
import io.netty.channel.ChannelHandlerContext;
19+
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTService;
20+
import io.streamnative.pulsar.handlers.mqtt.broker.processor.MQTTBrokerProtocolMethodProcessor;
21+
import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonInboundHandler;
22+
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage;
23+
import java.util.concurrent.CompletableFuture;
24+
import lombok.extern.slf4j.Slf4j;
25+
26+
/**
27+
* MQTT in bound handler.
28+
*/
29+
@Sharable
30+
@Slf4j
31+
public class MQTTBrokerInboundHandler extends MQTTCommonInboundHandler {
32+
33+
public static final String NAME = "handler";
34+
35+
private final MQTTService mqttService;
36+
37+
public MQTTBrokerInboundHandler(MQTTService mqttService) {
38+
this.mqttService = mqttService;
39+
}
40+
41+
@Override
42+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
43+
super.channelActive(ctx);
44+
}
45+
46+
@Override
47+
public void channelRead(ChannelHandlerContext ctx, Object message) {
48+
checkArgument(message instanceof MqttAdapterMessage);
49+
MqttAdapterMessage adapterMsg = (MqttAdapterMessage) message;
50+
processors.computeIfAbsent(adapterMsg.getClientId(), key -> {
51+
MQTTBrokerProtocolMethodProcessor p = new MQTTBrokerProtocolMethodProcessor(mqttService, ctx);
52+
CompletableFuture<Void> inactiveFuture = p.getInactiveFuture();
53+
inactiveFuture.whenComplete((id, ex) -> {
54+
processors.remove(adapterMsg.getClientId());
55+
});
56+
return p;
57+
});
58+
super.channelRead(ctx, message);
59+
}
60+
}

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTChannelInitializer.java renamed to mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTChannelInitializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void initChannel(SocketChannel ch) throws Exception {
9898
ch.pipeline().addLast(MqttAdapterEncoder.NAME, MqttAdapterEncoder.INSTANCE);
9999
// Handler
100100
ch.pipeline().addLast(CombineAdapterHandler.NAME, new CombineAdapterHandler());
101-
ch.pipeline().addLast(MQTTInboundHandler.NAME, new MQTTInboundHandler(mqttService));
101+
ch.pipeline().addLast(MQTTBrokerInboundHandler.NAME, new MQTTBrokerInboundHandler(mqttService));
102102
}
103103

104104
/**

0 commit comments

Comments
 (0)