Skip to content

Commit d4f96c3

Browse files
authored
Add mop proxy admin part (#1575)
1 parent 6407272 commit d4f96c3

File tree

13 files changed

+672
-14
lines changed

13 files changed

+672
-14
lines changed

mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTCommonConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.google.common.collect.Sets;
1818
import java.util.List;
1919
import java.util.Objects;
20+
import java.util.Optional;
2021
import java.util.Set;
2122
import lombok.Getter;
2223
import lombok.Setter;
@@ -376,6 +377,12 @@ public class MQTTCommonConfiguration extends ServiceConfiguration {
376377
)
377378
private boolean systemEventEnabled = true;
378379

380+
@FieldContext(
381+
category = CATEGORY_MQTT_PROXY,
382+
doc = "Enable web http service."
383+
)
384+
private Optional<Integer> mopWebServicePort = Optional.of(5680);
385+
379386
public long getMqttTlsCertRefreshCheckDurationSec() {
380387
if (mqttTlsCertRefreshCheckDurationSec != 300) {
381388
return mqttTlsCertRefreshCheckDurationSec;

mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTConnectionManager.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414
package io.streamnative.pulsar.handlers.mqtt.common;
1515

1616
import static io.streamnative.pulsar.handlers.mqtt.common.systemtopic.EventType.CONNECT;
17+
import static io.streamnative.pulsar.handlers.mqtt.common.systemtopic.EventType.DISCONNECT;
1718
import io.netty.util.HashedWheelTimer;
1819
import io.netty.util.Timeout;
1920
import io.netty.util.concurrent.DefaultThreadFactory;
2021
import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.ConnectEvent;
2122
import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.EventListener;
2223
import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.MqttEvent;
24+
import java.util.ArrayList;
25+
import java.util.Collection;
2326
import java.util.concurrent.ConcurrentHashMap;
2427
import java.util.concurrent.ConcurrentMap;
2528
import java.util.concurrent.TimeUnit;
@@ -33,26 +36,33 @@
3336
@Slf4j
3437
public class MQTTConnectionManager {
3538

36-
private final ConcurrentMap<String, Connection> connections;
39+
private final ConcurrentMap<String, Connection> localConnections;
40+
41+
private final ConcurrentMap<String, Connection> eventConnections;
3742

3843
@Getter
3944
private static final HashedWheelTimer sessionExpireInterval =
4045
new HashedWheelTimer(
4146
new DefaultThreadFactory("session-expire-interval"), 1, TimeUnit.SECONDS);
4247

4348
@Getter
44-
private final EventListener eventListener;
49+
private final EventListener connectListener;
50+
51+
@Getter
52+
private final EventListener disconnectListener;
4553

4654
private final String advertisedAddress;
4755

4856
public MQTTConnectionManager(String advertisedAddress) {
4957
this.advertisedAddress = advertisedAddress;
50-
this.connections = new ConcurrentHashMap<>(2048);
51-
this.eventListener = new ConnectEventListener();
58+
this.localConnections = new ConcurrentHashMap<>(2048);
59+
this.eventConnections = new ConcurrentHashMap<>(2048);
60+
this.connectListener = new ConnectEventListener();
61+
this.disconnectListener = new DisconnectEventListener();
5262
}
5363

5464
public void addConnection(Connection connection) {
55-
Connection existing = connections.put(connection.getClientId(), connection);
65+
Connection existing = localConnections.put(connection.getClientId(), connection);
5666
if (existing != null) {
5767
log.warn("The clientId is existed. Close existing connection. CId={}", existing.getClientId());
5868
existing.disconnect();
@@ -68,7 +78,7 @@ public void addConnection(Connection connection) {
6878
*/
6979
public void newSessionExpireInterval(Consumer<Timeout> task, String clientId, int interval) {
7080
sessionExpireInterval.newTimeout(timeout -> {
71-
Connection connection = connections.get(clientId);
81+
Connection connection = localConnections.get(clientId);
7282
if (connection != null
7383
&& connection.getState() != Connection.ConnectionState.DISCONNECTED) {
7484
return;
@@ -80,16 +90,28 @@ public void newSessionExpireInterval(Consumer<Timeout> task, String clientId, in
8090
// Must use connections.remove(key, value).
8191
public void removeConnection(Connection connection) {
8292
if (connection != null) {
83-
connections.remove(connection.getClientId(), connection);
93+
localConnections.remove(connection.getClientId(), connection);
8494
}
8595
}
8696

8797
public Connection getConnection(String clientId) {
88-
return connections.get(clientId);
98+
return localConnections.get(clientId);
99+
}
100+
101+
public Collection<Connection> getLocalConnections() {
102+
return this.localConnections.values();
103+
}
104+
105+
public Collection<Connection> getAllConnections() {
106+
Collection<Connection> connections = new ArrayList<>(this.localConnections.values().size()
107+
+ this.eventConnections.values().size());
108+
connections.addAll(this.localConnections.values());
109+
connections.addAll(eventConnections.values());
110+
return connections;
89111
}
90112

91113
public void close() {
92-
connections.values().forEach(connection -> connection.getChannel().close());
114+
localConnections.values().forEach(connection -> connection.getChannel().close());
93115
}
94116

95117
class ConnectEventListener implements EventListener {
@@ -103,9 +125,25 @@ public void onChange(MqttEvent event) {
103125
if (connection != null) {
104126
log.warn("[ConnectEvent] close existing connection : {}", connection);
105127
connection.disconnect();
128+
} else {
129+
eventConnections.put(connectEvent.getClientId(), connection);
106130
}
107131
}
108132
}
109133
}
110134
}
135+
136+
//TODO
137+
class DisconnectEventListener implements EventListener {
138+
139+
@Override
140+
public void onChange(MqttEvent event) {
141+
if (event.getEventType() == DISCONNECT) {
142+
ConnectEvent connectEvent = (ConnectEvent) event.getSourceEvent();
143+
if (!connectEvent.getAddress().equals(advertisedAddress)) {
144+
eventConnections.remove(connectEvent.getClientId());
145+
}
146+
}
147+
}
148+
}
111149
}

mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/EventType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
public enum EventType {
1717
CONNECT,
18+
DISCONNECT,
1819
LAST_WILL_MESSAGE,
1920
RETAINED_MESSAGE,
2021
ADD_PSK_IDENTITY;

mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.streamnative.pulsar.handlers.mqtt.proxy.handler.LookupHandler;
3636
import io.streamnative.pulsar.handlers.mqtt.proxy.handler.PulsarServiceLookupHandler;
3737
import io.streamnative.pulsar.handlers.mqtt.proxy.impl.MQTTProxyException;
38+
import io.streamnative.pulsar.handlers.mqtt.proxy.web.WebService;
3839
import java.io.Closeable;
3940
import java.util.concurrent.Executors;
4041
import java.util.concurrent.ScheduledExecutorService;
@@ -74,6 +75,7 @@ public class MQTTProxyService implements Closeable {
7475
private Channel listenChannelTlsPsk;
7576
private final EventLoopGroup acceptorGroup;
7677
private final EventLoopGroup workerGroup;
78+
private final WebService webService;
7779

7880
private DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("mqtt-redirect-acceptor");
7981
private DefaultThreadFactory workerThreadFactory = new DefaultThreadFactory("mqtt-redirect-io");
@@ -101,7 +103,8 @@ public MQTTProxyService(BrokerService brokerService, MQTTProxyConfiguration prox
101103
this.eventService = proxyConfig.isSystemEventEnabled()
102104
? new SystemTopicBasedSystemEventService(pulsarService)
103105
: new DisabledSystemEventService();
104-
this.eventService.addListener(connectionManager.getEventListener());
106+
this.eventService.addListener(connectionManager.getConnectListener());
107+
this.eventService.addListener(connectionManager.getDisconnectListener());
105108
this.eventService.addListener(new RetainedMessageHandler(eventService).getEventListener());
106109
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(proxyConfig.getMqttProxyNumAcceptorThreads(),
107110
false, acceptorThreadFactory);
@@ -112,6 +115,7 @@ public MQTTProxyService(BrokerService brokerService, MQTTProxyConfiguration prox
112115
this.proxyAdapter = new MQTTProxyAdapter(this);
113116
this.sslContextRefresher = Executors.newSingleThreadScheduledExecutor(
114117
new DefaultThreadFactory("mop-proxy-ssl-context-refresher"));
118+
this.webService = new WebService(this);
115119
}
116120

117121
private void configValid(MQTTProxyConfiguration proxyConfig) {
@@ -168,6 +172,7 @@ public void start() throws MQTTProxyException {
168172
}
169173
this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig);
170174
this.eventService.start();
175+
this.webService.start();
171176
}
172177

173178
public void start0() throws MQTTProxyException {

mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,11 @@ public void processDisconnect(final MqttAdapterMessage msg) {
280280
@Override
281281
public void processConnectionLost() {
282282
if (log.isDebugEnabled()) {
283-
log.debug("[Proxy Connection Lost] [{}] ", connection.getClientId());
283+
String clientId = "unknown";
284+
if (connection != null) {
285+
clientId = connection.getClientId();
286+
}
287+
log.debug("[Proxy Connection Lost] [{}] ", clientId);
284288
}
285289
autoSubscribeHandler.close();
286290
if (connection != null) {

0 commit comments

Comments
 (0)