Skip to content

Commit 0ea6c26

Browse files
committed
remove race condition
1 parent 4f6738e commit 0ea6c26

File tree

2 files changed

+45
-11
lines changed

2 files changed

+45
-11
lines changed

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,12 @@ public void start() {
199199
}
200200

201201
public void refresh(final @NotNull List<ProtocolAdapterEntity> configs) {
202+
// Don't submit refresh if shutdown initiated or executor is shutting down
203+
if (shutdownInitiated.get() || refreshExecutor.isShutdown()) {
204+
log.debug("Skipping refresh because manager is shutting down");
205+
return;
206+
}
207+
202208
refreshExecutor.submit(() -> {
203209
// Atomically check and clear skip flag (hot-reload in progress)
204210
if (skipRefreshForAdapter.getAndSet(false)) {
@@ -264,18 +270,33 @@ public void refresh(final @NotNull List<ProtocolAdapterEntity> configs) {
264270
log.error(
265271
"Existing adapters were modified while a refresh was ongoing, adapter with name '{}' was deleted and could not be updated",
266272
name);
273+
return;
267274
}
268-
if (wrapper != null && !protocolAdapterConfigs.get(name).equals(wrapper.getConfig())) {
269-
if (log.isDebugEnabled()) {
270-
log.debug("Updating adapter '{}'", name);
275+
276+
if (!protocolAdapterConfigs.get(name).equals(wrapper.getConfig())) {
277+
final boolean isStarted =
278+
wrapper.getRuntimeStatus() == ProtocolAdapterState.RuntimeStatus.STARTED;
279+
280+
if (!isStarted) {
281+
// Adapter is stopped - update config by recreating wrapper but don't start
282+
if (log.isDebugEnabled()) {
283+
log.debug("Updating config for stopped adapter '{}' without starting", name);
284+
}
285+
deleteAdapterInternal(name);
286+
createAdapterInternal(protocolAdapterConfigs.get(name), versionProvider.getVersion());
287+
} else {
288+
// Adapter is started - do full stop->delete->create->start cycle
289+
if (log.isDebugEnabled()) {
290+
log.debug("Updating adapter '{}'", name);
291+
}
292+
stopAsync(name).thenApply(v -> {
293+
deleteAdapterInternal(name);
294+
return null;
295+
})
296+
.thenCompose(ignored -> startAsync(createAdapterInternal(protocolAdapterConfigs.get(
297+
name), versionProvider.getVersion())))
298+
.get();
271299
}
272-
stopAsync(name).thenApply(v -> {
273-
deleteAdapterInternal(name);
274-
return null;
275-
})
276-
.thenCompose(ignored -> startAsync(createAdapterInternal(protocolAdapterConfigs.get(name),
277-
versionProvider.getVersion())))
278-
.get();
279300
} else {
280301
if (log.isDebugEnabled()) {
281302
log.debug("Not-updating adapter '{}' since the config is unchanged", name);

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,20 @@ public boolean startSouthbound() {
157157
.map(InternalWritingContextImpl::new)
158158
.collect(Collectors.<InternalWritingContext>toList()));
159159
if (started) {
160-
log.info("Southbound started for adapter {}", adapter.getId());
160+
// Allow time for writing capabilities to initialize after hot-reload
161+
// This prevents data loss by ensuring ReactiveWriters complete MQTT subscription
162+
// and are ready to process messages before hot-reload completes.
163+
// TODO: Replace with proper event-based initialization by:
164+
// 1. Making ReactiveWriter.start() return CompletableFuture<Void>
165+
// 2. Having ProtocolAdapterWritingServiceImpl.startWriting() wait on all futures
166+
// 3. Removing this sleep in favor of the future-based approach
167+
try {
168+
Thread.sleep(1000);
169+
log.info("Southbound started for adapter {}", adapter.getId());
170+
} catch (final InterruptedException e) {
171+
Thread.currentThread().interrupt();
172+
log.warn("Interrupted while waiting for southbound initialization after hot-reload for adapter '{}'", getId());
173+
}
161174
transitionSouthboundState(StateEnum.CONNECTED);
162175
} else {
163176
log.error("Southbound start failed for adapter {}", adapter.getId());

0 commit comments

Comments
 (0)