Skip to content

Commit 9628937

Browse files
committed
strengthen code
1 parent b00680e commit 9628937

File tree

3 files changed

+134
-28
lines changed

3 files changed

+134
-28
lines changed

hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -578,21 +578,11 @@ public int getDepth() {
578578
return adapterNotFoundError(adapterId).get();
579579
}) :
580580
errorResponse(new ConfigWritingDisabled());
581-
582-
//TODO
583-
584-
// case ALREADY_USED_BY_ANOTHER_ADAPTER:
585-
// //noinspection DataFlowIssue cant be null here.
586-
// final @NotNull String tagName = domainTagUpdateResult.getErrorMessage();
587-
// return ErrorResponseUtil.errorResponse(new AlreadyExistsError("The tag '" +
588-
// tagName +
589-
// "' cannot be created since another item already exists with the same id."));
590581
}
591582

592583
@Override
593584
public @NotNull Response getDomainTags() {
594585
final List<com.hivemq.persistence.domain.DomainTag> domainTags = protocolAdapterManager.getDomainTags();
595-
// empty list is also 200 as discussed.
596586
return Response.ok(new DomainTagList().items(domainTags.stream()
597587
.map(com.hivemq.persistence.domain.DomainTag::toModel)
598588
.toList())).build();
@@ -818,6 +808,9 @@ public int getDepth() {
818808
cfg.getSouthboundMappings(),
819809
cfg.getTags()))
820810
.map(newCfg -> {
811+
// Enable skip flag to prevent refresh() from restarting adapter
812+
// The flag will be cleared by refresh() when it checks it
813+
ProtocolAdapterManager.enableSkipNextRefresh();
821814
if (!configExtractor.updateAdapter(newCfg)) {
822815
return adapterCannotBeUpdatedError(adapterId);
823816
}
@@ -865,6 +858,9 @@ public int getDepth() {
865858
converted,
866859
cfg.getTags()))
867860
.map(newCfg -> {
861+
// Enable skip flag to prevent refresh() from restarting adapter
862+
// The flag will be cleared by refresh() when it checks it
863+
ProtocolAdapterManager.enableSkipNextRefresh();
868864
if (!configExtractor.updateAdapter(newCfg)) {
869865
return adapterCannotBeUpdatedError(adapterId);
870866
}

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@
7676
public class ProtocolAdapterManager {
7777
private static final @NotNull Logger log = LoggerFactory.getLogger(ProtocolAdapterManager.class);
7878

79+
// ThreadLocal flag to prevent refresh() from restarting adapters during hot-reload config updates
80+
// AtomicBoolean to skip next refresh() call during hot-reload config updates
81+
// Must be atomic (not ThreadLocal) because refresh() runs in a different thread (refreshExecutor)
82+
private static final @NotNull AtomicBoolean skipRefreshForAdapter = new AtomicBoolean(false);
83+
7984
private final @NotNull Map<String, ProtocolAdapterWrapper> protocolAdapters;
8085
private final @NotNull MetricRegistry metricRegistry;
8186
private final @NotNull ModuleServicesImpl moduleServices;
@@ -173,6 +178,19 @@ private static boolean updateMappingsHotReload(
173178
}
174179
}
175180

181+
/**
182+
* Enables skipping the next refresh operation for hot-reload config updates.
183+
* This prevents the refresh() callback from restarting adapters when the config
184+
* change originates from a hot-reload operation.
185+
*/
186+
public static void enableSkipNextRefresh() {
187+
skipRefreshForAdapter.set(true);
188+
}
189+
190+
public static void disableSkipNextRefresh() {
191+
skipRefreshForAdapter.set(false);
192+
}
193+
176194
public void start() {
177195
if (log.isDebugEnabled()) {
178196
log.debug("Starting adapters");
@@ -182,6 +200,12 @@ public void start() {
182200

183201
public void refresh(final @NotNull List<ProtocolAdapterEntity> configs) {
184202
refreshExecutor.submit(() -> {
203+
// Atomically check and clear skip flag (hot-reload in progress)
204+
if (skipRefreshForAdapter.getAndSet(false)) {
205+
log.debug("Skipping refresh because hot-reload config update is in progress");
206+
return;
207+
}
208+
185209
log.info("Refreshing adapters");
186210

187211
final Map<String, ProtocolAdapterConfig> protocolAdapterConfigs = configs.stream()
@@ -355,15 +379,15 @@ public boolean updateNorthboundMappingsHotReload(
355379
final @NotNull List<NorthboundMapping> northboundMappings) {
356380
return getProtocolAdapterWrapperByAdapterId(adapterId).map(wrapper -> updateMappingsHotReload(wrapper,
357381
"northbound",
358-
() -> wrapper.updateMappingsHotReload(northboundMappings, null))).orElse(false);
382+
() -> wrapper.updateMappingsHotReload(northboundMappings, null, eventService))).orElse(false);
359383
}
360384

361385
public boolean updateSouthboundMappingsHotReload(
362386
final @NotNull String adapterId,
363387
final @NotNull List<SouthboundMapping> southboundMappings) {
364388
return getProtocolAdapterWrapperByAdapterId(adapterId).map(wrapper -> updateMappingsHotReload(wrapper,
365389
"southbound",
366-
() -> wrapper.updateMappingsHotReload(null, southboundMappings))).orElse(false);
390+
() -> wrapper.updateMappingsHotReload(null, southboundMappings, eventService))).orElse(false);
367391
}
368392

369393
public @NotNull List<DomainTag> getDomainTags() {

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java

Lines changed: 102 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public class ProtocolAdapterWrapper extends ProtocolAdapterFSM {
8080
private final @NotNull TagManager tagManager;
8181
private final @NotNull List<NorthboundTagConsumer> consumers;
8282
private final @NotNull ReentrantLock operationLock;
83+
private final @NotNull Object adapterLock; // protects underlying adapter start/stop calls
8384
private final @NotNull ExecutorService sharedAdapterExecutor;
8485
protected volatile @Nullable Long lastStartAttemptTime;
8586
private @Nullable CompletableFuture<Void> currentStartFuture;
@@ -114,6 +115,7 @@ public ProtocolAdapterWrapper(
114115
this.consumers = new CopyOnWriteArrayList<>();
115116
this.operationLock = new ReentrantLock();
116117
this.sharedAdapterExecutor = sharedAdapterExecutor;
118+
this.adapterLock = new Object();
117119

118120
if (log.isDebugEnabled()) {
119121
registerStateTransitionListener(state -> log.debug(
@@ -375,8 +377,17 @@ public boolean isBatchPolling() {
375377
}
376378

377379
public void addTagHotReload(final @NotNull Tag tag, final @NotNull EventService eventService) {
380+
// Wait for any in-progress operations before proceeding
381+
waitForOperationsToComplete();
382+
378383
operationLock.lock();
379384
try {
385+
if (startOperationInProgress || stopOperationInProgress) {
386+
throw new IllegalStateException("Cannot hot-reload tag for adapter '" +
387+
getId() +
388+
"': operation started during wait");
389+
}
390+
380391
// Update config with new tag regardless of adapter state
381392
final List<Tag> updatedTags = new ArrayList<>(config.getTags());
382393
updatedTags.add(tag);
@@ -407,9 +418,18 @@ public void addTagHotReload(final @NotNull Tag tag, final @NotNull EventService
407418

408419
public void updateMappingsHotReload(
409420
final @Nullable List<NorthboundMapping> northboundMappings,
410-
final @Nullable List<SouthboundMapping> southboundMappings) {
421+
final @Nullable List<SouthboundMapping> southboundMappings,
422+
final @NotNull EventService eventService) {
423+
waitForOperationsToComplete();
424+
411425
operationLock.lock();
412426
try {
427+
if (startOperationInProgress || stopOperationInProgress) {
428+
throw new IllegalStateException("Cannot hot-reload mappings for adapter '" +
429+
getId() +
430+
"': operation started during wait");
431+
}
432+
413433
if (northboundMappings != null) {
414434
config.setNorthboundMappings(northboundMappings);
415435
}
@@ -420,25 +440,54 @@ public void updateMappingsHotReload(
420440
log.debug("Adapter '{}' not started yet, only updating config for mappings", getId());
421441
return;
422442
}
423-
log.debug("Stopping existing consumers for adapter '{}'", getId());
424-
consumers.forEach(tagManager::removeConsumer);
425-
consumers.clear();
443+
444+
// Stop existing consumers and polling
426445
if (northboundMappings != null) {
446+
log.debug("Stopping existing consumers and polling for adapter '{}'", getId());
447+
consumers.forEach(tagManager::removeConsumer);
448+
consumers.clear();
449+
450+
// Stop polling to restart with new mappings
451+
stopPolling();
452+
427453
log.debug("Updating northbound mappings for adapter '{}'", getId());
428454
northboundMappings.stream()
429455
.map(mapping -> northboundConsumerFactory.build(this, mapping, protocolAdapterMetricsService))
430456
.forEach(consumer -> {
431457
tagManager.addConsumer(consumer);
432458
consumers.add(consumer);
433459
});
460+
461+
// Restart polling with new consumers
462+
log.debug("Restarting polling for adapter '{}'", getId());
463+
if (isBatchPolling()) {
464+
log.debug("Schedule batch polling for protocol adapter with id '{}'", getId());
465+
pollingService.schedulePolling(new PerAdapterSampler(this, eventService, tagManager));
466+
}
467+
if (isPolling()) {
468+
config.getTags()
469+
.forEach(tag -> pollingService.schedulePolling(new PerContextSampler(this,
470+
new PollingContextWrapper("unused",
471+
tag.getName(),
472+
MessageHandlingOptions.MQTTMessagePerTag,
473+
false,
474+
false,
475+
List.of(),
476+
1,
477+
-1),
478+
eventService,
479+
tagManager)));
480+
}
434481
}
482+
435483
if (southboundMappings != null && isWriting()) {
436484
log.debug("Updating southbound mappings for adapter '{}'", getId());
437485
final StateEnum currentSouthboundState = currentState().southbound();
438-
if (currentSouthboundState == StateEnum.CONNECTED) {
486+
final boolean wasConnected = (currentSouthboundState == StateEnum.CONNECTED);
487+
if (wasConnected) {
488+
log.debug("Stopping southbound for adapter '{}' before hot-reload", getId());
439489
stopWriting();
440-
}
441-
if (currentSouthboundState == StateEnum.CONNECTED) {
490+
log.debug("Restarting southbound for adapter '{}' after hot-reload", getId());
442491
startSouthbound();
443492
}
444493
}
@@ -448,6 +497,35 @@ public void updateMappingsHotReload(
448497
}
449498
}
450499

500+
private void waitForOperationsToComplete() {
501+
CompletableFuture<Void> futureToWait = null;
502+
operationLock.lock();
503+
try {
504+
if (startOperationInProgress) {
505+
log.debug("Adapter '{}': Waiting for start operation to complete before hot-reload", getId());
506+
futureToWait = currentStartFuture;
507+
} else if (stopOperationInProgress) {
508+
log.debug("Adapter '{}': Waiting for stop operation to complete before hot-reload", getId());
509+
futureToWait = currentStopFuture;
510+
}
511+
} finally {
512+
operationLock.unlock();
513+
}
514+
515+
if (futureToWait != null) {
516+
try {
517+
// Wait with a timeout to prevent indefinite blocking
518+
futureToWait.get(30, TimeUnit.SECONDS);
519+
log.debug("Adapter '{}': Operation completed, proceeding with hot-reload", getId());
520+
} catch (final TimeoutException e) {
521+
log.warn("Adapter '{}': Operation did not complete within 30 seconds, proceeding with hot-reload anyway",
522+
getId());
523+
} catch (final Exception e) {
524+
log.warn("Adapter '{}': Operation completed with error, but proceeding with hot-reload", getId(), e);
525+
}
526+
}
527+
}
528+
451529
private void cleanupConnectionStatusListener() {
452530
final Consumer<ProtocolAdapterState.ConnectionStatus> listenerToClean = connectionStatusListener;
453531
if (listenerToClean != null) {
@@ -548,10 +626,15 @@ private void stopWriting() {
548626
return () -> {
549627
startAdapter(); // start FSM
550628
final ProtocolAdapterStartOutputImpl output = new ProtocolAdapterStartOutputImpl();
551-
try {
552-
adapter.start(new ProtocolAdapterStartInputImpl(moduleServices), output);
553-
} catch (final Throwable t) {
554-
output.getStartFuture().completeExceptionally(t);
629+
synchronized (adapterLock) {
630+
log.debug("Adapter '{}': Calling adapter.start() in thread '{}'",
631+
getId(),
632+
Thread.currentThread().getName());
633+
try {
634+
adapter.start(new ProtocolAdapterStartInputImpl(moduleServices), output);
635+
} catch (final Throwable t) {
636+
output.getStartFuture().completeExceptionally(t);
637+
}
555638
}
556639
return output.getStartFuture();
557640
};
@@ -602,11 +685,14 @@ private void stopProtocolAdapterOnFailedStart() {
602685

603686
// Initiate adapter stop
604687
final var output = new ProtocolAdapterStopOutputImpl();
605-
try {
606-
adapter.stop(new ProtocolAdapterStopInputImpl(), output);
607-
} catch (final Throwable throwable) {
608-
log.error("Adapter '{}': Exception during adapter.stop()", adapter.getId(), throwable);
609-
output.getOutputFuture().completeExceptionally(throwable);
688+
synchronized (adapterLock) {
689+
log.debug("Adapter '{}': Calling adapter.stop() in thread '{}'", getId(), Thread.currentThread().getName());
690+
try {
691+
adapter.stop(new ProtocolAdapterStopInputImpl(), output);
692+
} catch (final Throwable throwable) {
693+
log.error("Adapter '{}': Exception during adapter.stop()", adapter.getId(), throwable);
694+
output.getOutputFuture().completeExceptionally(throwable);
695+
}
610696
}
611697

612698
log.debug("Adapter '{}': Waiting for stop output future", adapter.getId());

0 commit comments

Comments
 (0)