Skip to content

Commit 8121554

Browse files
authored
Refactor MoP to prepare for split Proxy to seperate module (#1509)
1 parent 07d7526 commit 8121554

File tree

202 files changed

+809
-621
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

202 files changed

+809
-621
lines changed

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTProtocolHandler.java renamed to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTProtocolHandler.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,26 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt;
14+
package io.streamnative.pulsar.handlers.mqtt.broker;
1515

1616
import static com.google.common.base.Preconditions.checkArgument;
17-
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.LISTENER_DEL;
18-
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.PLAINTEXT_PREFIX;
19-
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.PROTOCOL_NAME;
20-
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.SSL_PREFIX;
21-
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.SSL_PSK_PREFIX;
22-
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.WS_PLAINTEXT_PREFIX;
23-
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.WS_SSL_PREFIX;
24-
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.getListenerPort;
17+
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.LISTENER_DEL;
18+
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.PLAINTEXT_PREFIX;
19+
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.PROTOCOL_NAME;
20+
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.SSL_PREFIX;
21+
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.SSL_PSK_PREFIX;
22+
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.WS_PLAINTEXT_PREFIX;
23+
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.WS_SSL_PREFIX;
24+
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.getListenerPort;
2525
import com.google.common.collect.ImmutableMap;
2626
import io.netty.channel.ChannelInitializer;
2727
import io.netty.channel.socket.SocketChannel;
2828
import io.netty.util.concurrent.DefaultThreadFactory;
29+
import io.streamnative.pulsar.handlers.mqtt.MopVersion;
30+
import io.streamnative.pulsar.handlers.mqtt.broker.channel.MQTTChannelInitializer;
31+
import io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils;
2932
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyConfiguration;
3033
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService;
31-
import io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils;
3234
import java.net.InetSocketAddress;
3335
import java.util.Map;
3436
import java.util.concurrent.Executors;
@@ -97,7 +99,7 @@ public void start(BrokerService brokerService) {
9799
try {
98100
MQTTProxyConfiguration proxyConfig =
99101
ConfigurationUtils.create(mqttConfig.getProperties(), MQTTProxyConfiguration.class);
100-
proxyService = new MQTTProxyService(mqttService, proxyConfig);
102+
proxyService = new MQTTProxyService(brokerService, proxyConfig);
101103
proxyService.start();
102104
log.info("Start MQTT proxy service at port: {}", proxyConfig.getMqttProxyPort());
103105
} catch (Exception ex) {

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTServerConfiguration.java renamed to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTServerConfiguration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt;
14+
package io.streamnative.pulsar.handlers.mqtt.broker;
1515

16+
import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration;
1617
import lombok.Getter;
1718
import lombok.Setter;
1819
import org.apache.pulsar.common.configuration.FieldContext;

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java renamed to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/MQTTService.java

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,25 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt;
15-
16-
import io.streamnative.pulsar.handlers.mqtt.support.MQTTMetricsCollector;
17-
import io.streamnative.pulsar.handlers.mqtt.support.MQTTMetricsProvider;
18-
import io.streamnative.pulsar.handlers.mqtt.support.QosPublishHandlersImpl;
19-
import io.streamnative.pulsar.handlers.mqtt.support.RetainedMessageHandler;
20-
import io.streamnative.pulsar.handlers.mqtt.support.WillMessageHandler;
21-
import io.streamnative.pulsar.handlers.mqtt.support.event.DisableEventCenter;
22-
import io.streamnative.pulsar.handlers.mqtt.support.event.PulsarEventCenter;
23-
import io.streamnative.pulsar.handlers.mqtt.support.event.PulsarEventCenterImpl;
24-
import io.streamnative.pulsar.handlers.mqtt.support.psk.PSKConfiguration;
25-
import io.streamnative.pulsar.handlers.mqtt.support.systemtopic.SystemEventService;
14+
package io.streamnative.pulsar.handlers.mqtt.broker;
15+
16+
import io.streamnative.pulsar.handlers.mqtt.broker.impl.MQTTNamespaceBundleOwnershipListener;
17+
import io.streamnative.pulsar.handlers.mqtt.broker.impl.MQTTSubscriptionManager;
18+
import io.streamnative.pulsar.handlers.mqtt.broker.metric.MQTTMetricsCollector;
19+
import io.streamnative.pulsar.handlers.mqtt.broker.metric.MQTTMetricsProvider;
20+
import io.streamnative.pulsar.handlers.mqtt.broker.mqtt5.WillMessageHandler;
21+
import io.streamnative.pulsar.handlers.mqtt.broker.qos.QosPublishHandlers;
22+
import io.streamnative.pulsar.handlers.mqtt.broker.qos.QosPublishHandlersImpl;
23+
import io.streamnative.pulsar.handlers.mqtt.common.MQTTConnectionManager;
24+
import io.streamnative.pulsar.handlers.mqtt.common.authentication.MQTTAuthenticationService;
25+
import io.streamnative.pulsar.handlers.mqtt.common.event.DisableEventCenter;
26+
import io.streamnative.pulsar.handlers.mqtt.common.event.PulsarEventCenter;
27+
import io.streamnative.pulsar.handlers.mqtt.common.event.PulsarEventCenterImpl;
28+
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.RetainedMessageHandler;
29+
import io.streamnative.pulsar.handlers.mqtt.common.psk.PSKConfiguration;
30+
import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.DisabledSystemEventService;
31+
import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.SystemEventService;
2632
import lombok.Getter;
27-
import lombok.Setter;
2833
import lombok.extern.slf4j.Slf4j;
2934
import org.apache.pulsar.broker.PulsarService;
3035
import org.apache.pulsar.broker.authorization.AuthorizationService;
@@ -82,8 +87,7 @@ public class MQTTService {
8287
private final QosPublishHandlers qosPublishHandlers;
8388

8489
@Getter
85-
@Setter
86-
private SystemEventService eventService;
90+
private final SystemEventService eventService;
8791

8892
public MQTTService(BrokerService brokerService, MQTTServerConfiguration serverConfiguration) {
8993
this.brokerService = brokerService;
@@ -107,21 +111,15 @@ public MQTTService(BrokerService brokerService, MQTTServerConfiguration serverCo
107111
this.eventCenter = new PulsarEventCenterImpl(brokerService,
108112
serverConfiguration.getEventCenterCallbackPoolThreadNum());
109113
}
110-
this.retainedMessageHandler = new RetainedMessageHandler(this);
114+
this.eventService = new DisabledSystemEventService();
115+
this.retainedMessageHandler = new RetainedMessageHandler(eventService);
111116
this.qosPublishHandlers = new QosPublishHandlersImpl(this);
112117
this.willMessageHandler = new WillMessageHandler(this);
113118
}
114119

115-
public boolean isSystemTopicEnabled() {
116-
return eventService != null;
117-
}
118-
119120
public void close() {
120121
this.connectionManager.close();
121122
this.eventCenter.shutdown();
122-
if (eventService != null) {
123-
eventService.close();
124-
}
125123
this.willMessageHandler.close();
126124
}
127125
}

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTChannelInitializer.java renamed to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTChannelInitializer.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt;
14+
package io.streamnative.pulsar.handlers.mqtt.broker.channel;
1515

1616
import static org.apache.pulsar.client.impl.PulsarChannelInitializer.TLS_HANDLER;
1717
import io.netty.channel.ChannelInitializer;
@@ -24,11 +24,14 @@
2424
import io.netty.handler.codec.mqtt.MqttDecoder;
2525
import io.netty.handler.ssl.SslHandler;
2626
import io.netty.handler.timeout.IdleStateHandler;
27-
import io.streamnative.pulsar.handlers.mqtt.adapter.CombineAdapterHandler;
28-
import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterDecoder;
29-
import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterEncoder;
30-
import io.streamnative.pulsar.handlers.mqtt.codec.MqttWebSocketCodec;
31-
import io.streamnative.pulsar.handlers.mqtt.support.psk.PSKUtils;
27+
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTServerConfiguration;
28+
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTService;
29+
import io.streamnative.pulsar.handlers.mqtt.broker.codec.MqttWebSocketCodec;
30+
import io.streamnative.pulsar.handlers.mqtt.common.Constants;
31+
import io.streamnative.pulsar.handlers.mqtt.common.adapter.CombineAdapterHandler;
32+
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterDecoder;
33+
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterEncoder;
34+
import io.streamnative.pulsar.handlers.mqtt.common.psk.PSKUtils;
3235
import java.util.concurrent.ScheduledExecutorService;
3336
import java.util.concurrent.TimeUnit;
3437
import lombok.extern.slf4j.Slf4j;

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTInboundHandler.java renamed to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTInboundHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt;
14+
package io.streamnative.pulsar.handlers.mqtt.broker.channel;
1515

1616
import io.netty.channel.ChannelHandler.Sharable;
1717
import io.netty.channel.ChannelHandlerContext;
18+
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTService;
19+
import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonInboundHandler;
1820
import lombok.extern.slf4j.Slf4j;
1921
/**
2022
* MQTT in bound handler.

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTServerCnx.java renamed to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTServerCnx.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt.support;
14+
package io.streamnative.pulsar.handlers.mqtt.broker.channel;
1515

1616
import io.netty.channel.ChannelHandlerContext;
17+
import io.streamnative.pulsar.handlers.mqtt.broker.impl.consumer.MQTTConsumer;
1718
import java.util.Optional;
1819
import java.util.concurrent.CompletableFuture;
1920
import lombok.extern.slf4j.Slf4j;
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
/**
16+
* Package info.
17+
*/
18+
package io.streamnative.pulsar.handlers.mqtt.broker.channel;

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/codec/MqttWebSocketCodec.java renamed to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/codec/MqttWebSocketCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt.codec;
14+
package io.streamnative.pulsar.handlers.mqtt.broker.codec;
1515

1616
import io.netty.buffer.ByteBuf;
1717
import io.netty.channel.ChannelHandlerContext;

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/package-info.java renamed to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/codec/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515
/**
1616
* Package info.
1717
*/
18-
package io.streamnative.pulsar.handlers.mqtt.adapter;
18+
package io.streamnative.pulsar.handlers.mqtt.broker.codec;

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTNamespaceBundleOwnershipListener.java renamed to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/MQTTNamespaceBundleOwnershipListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt;
14+
package io.streamnative.pulsar.handlers.mqtt.broker.impl;
1515

1616
import java.util.ArrayList;
1717
import java.util.List;

0 commit comments

Comments
 (0)