Skip to content

Commit dc79b7a

Browse files
codepitbullCopilot
andauthored
Rework bridge handling to fix a bug (#1045)
* Rework bridge handling to fix a bug * Switch to synchronized --------- Co-authored-by: Copilot <[email protected]>
1 parent a9bbce1 commit dc79b7a

File tree

2 files changed

+58
-46
lines changed

2 files changed

+58
-46
lines changed

hivemq-edge/src/main/java/com/hivemq/bridge/BridgeService.java

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,20 @@
2828
import com.hivemq.edge.model.HiveMQEdgeRemoteEvent;
2929
import com.hivemq.metrics.HiveMQMetrics;
3030
import com.hivemq.util.Checkpoints;
31+
import jakarta.inject.Inject;
32+
import jakarta.inject.Singleton;
3133
import org.jetbrains.annotations.NotNull;
3234
import org.jetbrains.annotations.Nullable;
3335
import org.slf4j.Logger;
3436
import org.slf4j.LoggerFactory;
3537

36-
import jakarta.inject.Inject;
37-
import jakarta.inject.Singleton;
38-
import java.util.Collections;
3938
import java.util.HashMap;
4039
import java.util.HashSet;
4140
import java.util.List;
4241
import java.util.Map;
43-
import java.util.Optional;
4442
import java.util.concurrent.ConcurrentHashMap;
4543
import java.util.concurrent.ExecutorService;
44+
import java.util.function.Function;
4645
import java.util.stream.Collectors;
4746

4847
@Singleton
@@ -57,8 +56,8 @@ public class BridgeService {
5756

5857
private final @NotNull Map<String, Throwable> bridgeNameToLastError = new ConcurrentHashMap<>(0);
5958

60-
private volatile @NotNull Map<String, MqttBridgeAndClient> activeBridgeNamesToClient = Collections.synchronizedMap(new HashMap<>());
61-
private volatile @NotNull Map<String, MqttBridge> allKnownBridgeConfigs = Collections.synchronizedMap(new HashMap<>());
59+
private final @NotNull Map<String, MqttBridgeAndClient> activeBridgeNamesToClient = new HashMap<>();
60+
private final @NotNull Map<String, MqttBridge> allKnownBridgeConfigs = new HashMap<>();
6261

6362
@Inject
6463
public BridgeService(
@@ -73,7 +72,7 @@ public BridgeService(
7372
this.bridgeMqttClientFactory = bridgeMqttClientFactory;
7473
this.executorService = executorService;
7574
this.remoteService = remoteService;
76-
metricRegistry.registerGauge(HiveMQMetrics.BRIDGES_CURRENT.name(), () -> allKnownBridgeConfigs.size());
75+
metricRegistry.registerGauge(HiveMQMetrics.BRIDGES_CURRENT.name(), allKnownBridgeConfigs::size);
7776
shutdownHooks.add(new BridgeShutdownHook(this));
7877
bridgeConfig.registerConsumer(this::updateBridges);
7978
}
@@ -82,24 +81,36 @@ public BridgeService(
8281
* Synchronizes ALL bridges from the config into runtime instances
8382
*/
8483
public synchronized void updateBridges(final @NotNull List<MqttBridge> bridges) {
85-
final Map<String, MqttBridge> newMapOfKnownBridges = Collections.synchronizedMap(new HashMap<>());
86-
final Map<String, MqttBridgeAndClient> newMapOfKActiveBridges = Collections.synchronizedMap(new HashMap<>());
87-
bridges.forEach(config -> newMapOfKnownBridges.put(config.getId(), config));
84+
85+
final var bridgeIdToConfig = bridges.stream().collect(Collectors.toMap(MqttBridge::getId, Function.identity()));
86+
87+
final var newBridgeIds = bridges.stream().map(MqttBridge::getId).collect(Collectors.toSet());
88+
89+
final var toRemove = new HashSet<>(allKnownBridgeConfigs.keySet());
90+
toRemove.removeAll(newBridgeIds);
91+
92+
final var toAdd = new HashSet<>(newBridgeIds);
93+
toAdd.removeAll(allKnownBridgeConfigs.keySet());
94+
95+
final var toUpdate = new HashSet<>(allKnownBridgeConfigs.keySet());
96+
toUpdate.removeAll(toAdd);
97+
toUpdate.removeAll(toRemove);
98+
99+
88100

89101
final long start = System.currentTimeMillis();
90102
if (log.isDebugEnabled()) {
91103
log.debug("Updating {} active bridges connections from {} configured connections",
92104
activeBridgeNamesToClient.size(),
93-
newMapOfKnownBridges.size());
105+
toUpdate.size() + toAdd.size());
94106
}
95107

96108
// first stop bridges as they might use the same clientId in case the id of a bridge was changed
97109
//remove any orphaned connections
98-
var missingBridges = new HashSet<>(allKnownBridgeConfigs.keySet());
99-
missingBridges.removeAll(newMapOfKnownBridges.keySet());
100110

101-
missingBridges.forEach(bridgeId -> {
111+
toRemove.forEach(bridgeId -> {
102112
final var active = activeBridgeNamesToClient.remove(bridgeId);
113+
allKnownBridgeConfigs.remove(bridgeId);
103114
if(active != null) {
104115
log.info("Removing bridge {}", bridgeId);
105116
internalStopBridge(active, true, List.of());
@@ -108,52 +119,51 @@ public synchronized void updateBridges(final @NotNull List<MqttBridge> bridges)
108119
}
109120
});
110121

111-
newMapOfKnownBridges.forEach((bridgeId, bridge) -> {
122+
toUpdate.forEach(bridgeId -> {
112123
final var active = activeBridgeNamesToClient.get(bridgeId);
124+
final var newBridge = bridgeIdToConfig.get(bridgeId);
113125
if(active != null) {
114-
if(active.bridge().equals(bridge)) {
126+
if(active.bridge().equals(newBridge)) {
115127
log.debug("Not restarting bridge {} because config is unchanged", bridgeId);
116128
} else {
117129
log.info("Restarting bridge {} because config has changed", bridgeId);
130+
allKnownBridgeConfigs.put(bridgeId, newBridge);
118131
internalStopBridge(active, true, List.of());
119-
newMapOfKActiveBridges.put(
120-
bridge.getId(),
121-
new MqttBridgeAndClient(bridge, internalStartBridge(bridge)));
122-
};
123-
};
132+
activeBridgeNamesToClient.put(
133+
bridgeId,
134+
new MqttBridgeAndClient(newBridge, internalStartBridge(newBridge)));
135+
}
136+
}
124137
});
125138

126-
newMapOfKnownBridges.forEach((bridgeId, bridge) -> {
127-
if (!activeBridgeNamesToClient.containsKey(bridgeId)) {
128-
log.info("Adding bridge {}", bridgeId);
129-
newMapOfKActiveBridges.put(
130-
bridge.getId(),
131-
new MqttBridgeAndClient(bridge, internalStartBridge(bridge)));
132-
}
139+
toAdd.forEach(bridgeId -> {
140+
final var newBridge = bridgeIdToConfig.get(bridgeId);
141+
log.info("Adding bridge {}", bridgeId);
142+
allKnownBridgeConfigs.put(bridgeId, newBridge);
143+
activeBridgeNamesToClient.put(
144+
bridgeId,
145+
new MqttBridgeAndClient(newBridge, internalStartBridge(newBridge)));
133146
});
134147

135148
if (log.isTraceEnabled()) {
136149
log.trace("Updating bridges complete in {}ms", (System.currentTimeMillis() - start));
137150
}
138-
139-
this.activeBridgeNamesToClient = newMapOfKActiveBridges;
140-
this.allKnownBridgeConfigs = newMapOfKnownBridges;
141151
}
142152

143153

144154
public @Nullable Throwable getLastError(final @NotNull String bridgeName) {
145155
return bridgeNameToLastError.get(bridgeName);
146156
}
147157

148-
public boolean isConnected(final @NotNull String bridgeName) {
158+
public synchronized boolean isConnected(final @NotNull String bridgeName) {
149159
final var mqttBridgeAndClient = activeBridgeNamesToClient.get(bridgeName);
150160
if(mqttBridgeAndClient != null) {
151161
return mqttBridgeAndClient.mqttClient().isConnected();
152162
}
153163
return false;
154164
}
155165

156-
public boolean isRunning(final @NotNull String bridgeName) {
166+
public synchronized boolean isRunning(final @NotNull String bridgeName) {
157167
return activeBridgeNamesToClient.containsKey(bridgeName);
158168
}
159169

@@ -175,28 +185,30 @@ public synchronized void stopBridge(
175185
}
176186

177187
public synchronized boolean restartBridge(
178-
final @NotNull String bridgId, final @Nullable MqttBridge newBridgeConfig) {
179-
final var bridgeToClient = activeBridgeNamesToClient.remove(bridgId);
188+
final @NotNull String bridgeId, final @Nullable MqttBridge newBridgeConfig) {
189+
final var bridgeToClient = activeBridgeNamesToClient.get(bridgeId);
180190
if (bridgeToClient != null) {
181-
log.info("Restarting bridge '{}'", bridgeToClient);
191+
log.info("Restarting bridge '{}'", bridgeId);
182192
final List<String> unchangedForwarders = newForwarderIds(newBridgeConfig);
183-
stopBridge(bridgId, true, unchangedForwarders);
193+
stopBridge(bridgeId, true, unchangedForwarders);
184194
final var mqttBridgeAndClient = new MqttBridgeAndClient(
185195
newBridgeConfig,
186196
internalStartBridge(newBridgeConfig != null ? newBridgeConfig : bridgeToClient.bridge()));
187197
activeBridgeNamesToClient.put(
188-
bridgId,
198+
bridgeId,
189199
mqttBridgeAndClient);
190-
allKnownBridgeConfigs.put(bridgId, newBridgeConfig);
200+
if(newBridgeConfig != null) {
201+
allKnownBridgeConfigs.put(bridgeId, newBridgeConfig);
202+
}
191203
return true;
192204
} else {
193-
log.debug("Not restarting bridge '{}' since it wasn't active", bridgeToClient);
205+
log.debug("Not restarting bridge '{}' since it wasn't active", bridgeId);
194206
return false;
195207
}
196208
}
197209

198210
public synchronized boolean startBridge(final @NotNull String bridgId) {
199-
var bridge = allKnownBridgeConfigs.get(bridgId);
211+
final var bridge = allKnownBridgeConfigs.get(bridgId);
200212
if (bridge != null && !activeBridgeNamesToClient.containsKey(bridgId)) {
201213
log.info("Starting bridge '{}'", bridgId);
202214
final var mqttBridgeAndClient = new MqttBridgeAndClient(

hivemq-edge/src/main/java/com/hivemq/configuration/reader/BridgeExtractor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,10 @@ public boolean needsRestartWithConfig(final HiveMQConfigEntity config) {
112112

113113
@Override
114114
public synchronized Configurator.ConfigResult updateConfig(final HiveMQConfigEntity config) {
115-
var bridgeEntities = convertBridgeConfigs(config);
115+
final var bridgeEntities = convertBridgeConfigs(config);
116116

117-
Set<String> bridgeIds = new HashSet<>();
118-
var duplicates = bridgeEntities.stream()
117+
final Set<String> bridgeIds = new HashSet<>();
118+
final var duplicates = bridgeEntities.stream()
119119
.filter(n -> !bridgeIds.add(n.getId()))
120120
.toList();
121121

@@ -207,7 +207,7 @@ public void registerConsumer(final Consumer<List<@NotNull MqttBridge>> consumer)
207207
}
208208

209209
private @NotNull List<LocalSubscription> convertLocalSubscriptions(
210-
final @NotNull String name, @NotNull List<ForwardedTopicEntity> forwardedTopics) {
210+
final @NotNull String name, final @NotNull List<ForwardedTopicEntity> forwardedTopics) {
211211
final ImmutableList.Builder<LocalSubscription> builder = ImmutableList.builder();
212212
for (final ForwardedTopicEntity forwardedTopic : forwardedTopics) {
213213
validateTopicFilters(name, forwardedTopic.getFilters());
@@ -283,7 +283,7 @@ private static List<CustomUserProperty> convertCustomUserProperties(
283283
final @NotNull String name, final @NotNull List<CustomUserPropertyEntity> customUserProperties) {
284284
final ImmutableList.Builder<CustomUserProperty> builder = ImmutableList.builder();
285285

286-
for (CustomUserPropertyEntity customUserProperty : customUserProperties) {
286+
for (final CustomUserPropertyEntity customUserProperty : customUserProperties) {
287287
if (customUserProperty.getKey() != null && customUserProperty.getValue() != null) {
288288
builder.add(CustomUserProperty.of(customUserProperty.getKey(), customUserProperty.getValue()));
289289
} else {

0 commit comments

Comments
 (0)