Skip to content

Commit 4584ecb

Browse files
authored
Fix connection event error (#1730)
1 parent 1daa690 commit 4584ecb

File tree

3 files changed

+21
-16
lines changed

3 files changed

+21
-16
lines changed

mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTConnectionManager.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@
2121
import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.ConnectEvent;
2222
import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.EventListener;
2323
import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.MqttEvent;
24-
import java.util.ArrayList;
2524
import java.util.Collection;
25+
import java.util.LinkedHashSet;
26+
import java.util.Set;
2627
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.ConcurrentMap;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.function.Consumer;
3031
import lombok.Getter;
3132
import lombok.extern.slf4j.Slf4j;
33+
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.ConcurrentHashSet;
3234

3335
/**
3436
* Proxy connection manager.
@@ -38,7 +40,7 @@ public class MQTTConnectionManager {
3840

3941
private final ConcurrentMap<String, Connection> localConnections;
4042

41-
private final ConcurrentMap<String, Connection> eventConnections;
43+
private final ConcurrentHashSet<String> eventClientIds;
4244

4345
@Getter
4446
private static final HashedWheelTimer sessionExpireInterval =
@@ -56,7 +58,7 @@ public class MQTTConnectionManager {
5658
public MQTTConnectionManager(String advertisedAddress) {
5759
this.advertisedAddress = advertisedAddress;
5860
this.localConnections = new ConcurrentHashMap<>(2048);
59-
this.eventConnections = new ConcurrentHashMap<>(2048);
61+
this.eventClientIds = new ConcurrentHashSet<>(2048);
6062
this.connectListener = new ConnectEventListener();
6163
this.disconnectListener = new DisconnectEventListener();
6264
}
@@ -102,11 +104,11 @@ public Collection<Connection> getLocalConnections() {
102104
return this.localConnections.values();
103105
}
104106

105-
public Collection<Connection> getAllConnections() {
106-
Collection<Connection> connections = new ArrayList<>(this.localConnections.values().size()
107-
+ this.eventConnections.values().size());
108-
connections.addAll(this.localConnections.values());
109-
connections.addAll(eventConnections.values());
107+
public Collection<String> getAllConnectionsId() {
108+
Set<String> connections = new LinkedHashSet<>(this.localConnections.keySet().size()
109+
+ this.eventClientIds.size());
110+
connections.addAll(this.localConnections.keySet());
111+
connections.addAll(eventClientIds);
110112
return connections;
111113
}
112114

@@ -126,7 +128,7 @@ public void onChange(MqttEvent event) {
126128
log.warn("[ConnectEvent] close existing connection : {}", connection);
127129
connection.disconnect();
128130
} else {
129-
eventConnections.put(connectEvent.getClientId(), connection);
131+
eventClientIds.add(connectEvent.getClientId());
130132
}
131133
}
132134
}
@@ -141,7 +143,7 @@ public void onChange(MqttEvent event) {
141143
if (event.getEventType() == DISCONNECT) {
142144
ConnectEvent connectEvent = (ConnectEvent) event.getSourceEvent();
143145
if (!connectEvent.getAddress().equals(advertisedAddress)) {
144-
eventConnections.remove(connectEvent.getClientId());
146+
eventClientIds.remove(connectEvent.getClientId());
145147
}
146148
}
147149
}

mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,13 @@ private void refreshCache(Message<MqttEvent> msg) {
237237
default:
238238
break;
239239
}
240-
listeners.forEach(listener -> listener.onChange(value));
240+
listeners.forEach(listener -> {
241+
try {
242+
listener.onChange(value);
243+
} catch (Throwable e) {
244+
log.error("Failed to process event : {}", value.getKey(), e);
245+
}
246+
});
241247
} catch (Throwable ex) {
242248
log.error("refresh cache error", ex);
243249
}

mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,11 @@
1313
*/
1414
package io.streamnative.pulsar.handlers.mqtt.proxy.web.admin;
1515

16-
import io.streamnative.pulsar.handlers.mqtt.common.Connection;
1716
import io.swagger.annotations.Api;
1817
import io.swagger.annotations.ApiOperation;
1918
import io.swagger.annotations.ApiResponse;
2019
import io.swagger.annotations.ApiResponses;
2120
import java.util.Collection;
22-
import java.util.stream.Collectors;
2321
import javax.ws.rs.GET;
2422
import javax.ws.rs.Path;
2523
import javax.ws.rs.Produces;
@@ -45,9 +43,8 @@ public class Devices extends WebResource {
4543
@ApiResponse(code = 500, message = "Internal server error")})
4644
public void getList(@Suspended final AsyncResponse asyncResponse) {
4745
try {
48-
final Collection<Connection> allConnections = service().getConnectionManager().getAllConnections();
49-
asyncResponse.resume(allConnections.stream().map(e ->
50-
e.getClientId()).collect(Collectors.toList()));
46+
final Collection<String> allConnections = service().getConnectionManager().getAllConnectionsId();
47+
asyncResponse.resume(allConnections);
5148
} catch (Exception e) {
5249
log.error("[{}] Failed to list devices {}", clientAppId(), e);
5350
asyncResponse.resume(new RestException(e));

0 commit comments

Comments
 (0)