Skip to content

Commit 00db2f3

Browse files
committed
coordinate adapter shutdown
1 parent 9609fc6 commit 00db2f3

File tree

4 files changed

+185
-102
lines changed

4 files changed

+185
-102
lines changed

hivemq-edge/src/main/java/com/hivemq/common/executors/ioc/ExecutorsModule.java

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,55 +35,65 @@ public abstract class ExecutorsModule {
3535

3636
private static final @NotNull String GROUP_NAME = "hivemq-edge-group";
3737
private static final @NotNull String SCHEDULED_WORKER_GROUP_NAME = "hivemq-edge-scheduled-group";
38+
private static final int SCHEDULED_WORKER_GROUP_THREAD_COUNT = 4;
3839
private static final @NotNull String CACHED_WORKER_GROUP_NAME = "hivemq-edge-cached-group";
3940
private static final @NotNull ThreadGroup coreGroup = new ThreadGroup(GROUP_NAME);
4041

4142
@Provides
4243
@Singleton
4344
static @NotNull ScheduledExecutorService scheduledExecutor() {
44-
final ScheduledExecutorService executor =
45-
Executors.newScheduledThreadPool(4, new HiveMQEdgeThreadFactory(SCHEDULED_WORKER_GROUP_NAME));
46-
registerShutdownHook(executor, SCHEDULED_WORKER_GROUP_NAME);
45+
final var executor = Executors.newScheduledThreadPool(SCHEDULED_WORKER_GROUP_THREAD_COUNT,
46+
new HiveMQEdgeThreadFactory(SCHEDULED_WORKER_GROUP_NAME));
47+
// Shutdown hook removed - ProtocolAdapterManager now handles coordinated shutdown
4748
return executor;
4849
}
4950

5051
@Provides
5152
@Singleton
5253
static @NotNull ExecutorService executorService() {
53-
return registerShutdownHook(Executors.newCachedThreadPool(new HiveMQEdgeThreadFactory(CACHED_WORKER_GROUP_NAME)),
54-
CACHED_WORKER_GROUP_NAME);
54+
// Shutdown hook removed - ProtocolAdapterManager now handles coordinated shutdown
55+
return Executors.newCachedThreadPool(new HiveMQEdgeThreadFactory(CACHED_WORKER_GROUP_NAME));
5556
}
5657

57-
private static @NotNull ExecutorService registerShutdownHook(
58+
/**
59+
* Utility method for shutting down an executor service gracefully.
60+
* This is called by ProtocolAdapterManager's shutdown hook to ensure
61+
* executors are shut down AFTER adapters have stopped.
62+
*
63+
* @param executor the executor to shutdown
64+
* @param name the name of the executor for logging
65+
* @param timeoutSeconds how long to wait for termination
66+
*/
67+
public static void shutdownExecutor(
5868
final @NotNull ExecutorService executor,
59-
final @NotNull String name) {
60-
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
61-
if (log.isDebugEnabled()) {
62-
log.debug("Shutting down executor service: {}", name);
63-
}
64-
// Only initiate shutdown if not already shutting down
65-
// This allows ProtocolAdapterManager to shut down executors first
66-
if (!executor.isShutdown()) {
67-
executor.shutdown();
68-
}
69-
try {
70-
// Reduced timeout since ProtocolAdapterManager should have already
71-
// initiated shutdown for adapters
72-
if (!executor.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
73-
log.warn("Executor service {} did not terminate in time, forcing shutdown", name);
74-
executor.shutdownNow();
75-
// Give a final grace period after forced shutdown
76-
if (!executor.awaitTermination(2, java.util.concurrent.TimeUnit.SECONDS)) {
77-
log.error("Executor service {} still has running tasks after forced shutdown", name);
78-
}
79-
}
80-
} catch (final InterruptedException e) {
81-
Thread.currentThread().interrupt();
82-
log.warn("Interrupted while waiting for executor service {} to terminate", name);
69+
final @NotNull String name,
70+
final int timeoutSeconds) {
71+
if (log.isDebugEnabled()) {
72+
log.debug("Shutting down executor service: {}", name);
73+
}
74+
75+
if (!executor.isShutdown()) {
76+
executor.shutdown();
77+
}
78+
79+
try {
80+
if (!executor.awaitTermination(timeoutSeconds, java.util.concurrent.TimeUnit.SECONDS)) {
81+
log.warn("Executor service {} did not terminate in {}s, forcing shutdown", name, timeoutSeconds);
8382
executor.shutdownNow();
83+
// Give a final grace period after forced shutdown
84+
if (!executor.awaitTermination(2, java.util.concurrent.TimeUnit.SECONDS)) {
85+
log.error("Executor service {} still has running tasks after forced shutdown", name);
86+
}
87+
} else {
88+
if (log.isDebugEnabled()) {
89+
log.debug("Executor service {} shut down successfully", name);
90+
}
8491
}
85-
}, "shutdown-hook-" + name));
86-
return executor;
92+
} catch (final InterruptedException e) {
93+
Thread.currentThread().interrupt();
94+
log.warn("Interrupted while waiting for executor service {} to terminate", name);
95+
executor.shutdownNow();
96+
}
8797
}
8898

8999
private static class HiveMQEdgeThreadFactory implements ThreadFactory {

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.concurrent.ExecutorService;
5858
import java.util.concurrent.Executors;
5959
import java.util.concurrent.Future;
60+
import java.util.concurrent.ScheduledExecutorService;
6061
import java.util.concurrent.TimeUnit;
6162
import java.util.concurrent.TimeoutException;
6263
import java.util.concurrent.atomic.AtomicBoolean;
@@ -88,6 +89,7 @@ public class ProtocolAdapterManager {
8889
private final @NotNull ProtocolAdapterExtractor protocolAdapterConfig;
8990
private final @NotNull ExecutorService executorService;
9091
private final @NotNull ExecutorService sharedAdapterExecutor;
92+
private final @NotNull ScheduledExecutorService scheduledExecutor;
9193
private final @NotNull AtomicBoolean shutdownInitiated;
9294

9395
@Inject
@@ -105,7 +107,8 @@ public ProtocolAdapterManager(
105107
final @NotNull NorthboundConsumerFactory northboundConsumerFactory,
106108
final @NotNull TagManager tagManager,
107109
final @NotNull ProtocolAdapterExtractor protocolAdapterConfig,
108-
final @NotNull ExecutorService sharedAdapterExecutor) {
110+
final @NotNull ExecutorService sharedAdapterExecutor,
111+
final @NotNull ScheduledExecutorService scheduledExecutor) {
109112
this.metricRegistry = metricRegistry;
110113
this.moduleServices = moduleServices;
111114
this.remoteService = remoteService;
@@ -120,6 +123,7 @@ public ProtocolAdapterManager(
120123
this.tagManager = tagManager;
121124
this.protocolAdapterConfig = protocolAdapterConfig;
122125
this.sharedAdapterExecutor = sharedAdapterExecutor;
126+
this.scheduledExecutor = scheduledExecutor;
123127
this.protocolAdapters = new ConcurrentHashMap<>();
124128
this.executorService = Executors.newSingleThreadExecutor();
125129
this.shutdownInitiated = new AtomicBoolean(false);
@@ -587,43 +591,38 @@ private void stopAllAdaptersOnShutdown() {
587591

588592
/**
589593
* Shutdown executors gracefully after adapters have stopped.
590-
* This ensures a clean shutdown sequence.
594+
* This ensures a clean shutdown sequence where adapters stop BEFORE
595+
* the executors they depend on are shut down.
596+
* <p>
597+
* Shutdown order:
598+
* 1. Protocol adapter manager's internal executor (used for refresh operations)
599+
* 2. Shared adapter executor (used by all adapters for lifecycle operations)
600+
* 3. Scheduled executor (used for scheduled tasks)
591601
*/
592602
private void shutdownExecutorsGracefully() {
593-
log.debug("Shutting down protocol adapter manager executors");
594-
595-
// Shutdown the single-threaded executor used for adapter refresh
596-
executorService.shutdown();
597-
try {
598-
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
599-
log.warn("Executor service did not terminate in time, forcing shutdown");
600-
executorService.shutdownNow();
601-
}
602-
} catch (final InterruptedException e) {
603-
Thread.currentThread().interrupt();
604-
log.warn("Interrupted while waiting for executor service to terminate");
605-
executorService.shutdownNow();
606-
}
607-
608-
// Shutdown the shared adapter executor
609-
// Note: This may also be shut down by ExecutorsModule shutdown hook,
610-
// but calling shutdown() multiple times is safe (idempotent)
611-
sharedAdapterExecutor.shutdown();
612-
try {
613-
if (!sharedAdapterExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
614-
log.warn("Shared adapter executor did not terminate in time, forcing shutdown");
615-
sharedAdapterExecutor.shutdownNow();
616-
// Wait a bit more after forced shutdown
617-
if (!sharedAdapterExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
618-
log.error("Shared adapter executor still has running tasks after forced shutdown");
619-
}
620-
}
621-
} catch (final InterruptedException e) {
622-
Thread.currentThread().interrupt();
623-
log.warn("Interrupted while waiting for shared adapter executor to terminate");
624-
sharedAdapterExecutor.shutdownNow();
625-
}
626-
627-
log.debug("Protocol adapter manager executors shutdown completed");
603+
log.info("Shutting down protocol adapter manager executors");
604+
605+
// 1. Shutdown the single-threaded executor used for adapter refresh
606+
com.hivemq.common.executors.ioc.ExecutorsModule.shutdownExecutor(
607+
executorService,
608+
"protocol-adapter-manager-executor",
609+
5);
610+
611+
// 2. Shutdown the shared adapter executor
612+
// This is the critical executor that was causing the race condition.
613+
// By shutting it down here AFTER all adapters have stopped, we ensure
614+
// no adapter stop operations are rejected with RejectedExecutionException
615+
com.hivemq.common.executors.ioc.ExecutorsModule.shutdownExecutor(
616+
sharedAdapterExecutor,
617+
"hivemq-edge-cached-group",
618+
10);
619+
620+
// 3. Shutdown the scheduled executor
621+
com.hivemq.common.executors.ioc.ExecutorsModule.shutdownExecutor(
622+
scheduledExecutor,
623+
"hivemq-edge-scheduled-group",
624+
5);
625+
626+
log.info("Protocol adapter manager executors shutdown completed");
628627
}
629628
}

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java

Lines changed: 85 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.hivemq.adapter.sdk.api.discovery.ProtocolAdapterDiscoveryInput;
2323
import com.hivemq.adapter.sdk.api.discovery.ProtocolAdapterDiscoveryOutput;
2424
import com.hivemq.adapter.sdk.api.events.EventService;
25+
import com.hivemq.adapter.sdk.api.model.ProtocolAdapterStopInput;
26+
import com.hivemq.adapter.sdk.api.model.ProtocolAdapterStopOutput;
2527
import com.hivemq.adapter.sdk.api.factories.ProtocolAdapterFactory;
2628
import com.hivemq.adapter.sdk.api.polling.PollingProtocolAdapter;
2729
import com.hivemq.adapter.sdk.api.polling.batch.BatchPollingProtocolAdapter;
@@ -338,38 +340,48 @@ private void cleanupConnectionStatusListener() {
338340

339341
log.debug("Adapter '{}': Creating stop operation future", getId());
340342

341-
final var stopFuture = CompletableFuture.supplyAsync(() -> {
342-
log.debug("Adapter '{}': Stop operation executing in thread '{}'", getId(), Thread.currentThread().getName());
343-
344-
// Signal FSM to stop (calls onStopping() internally)
345-
log.debug("Adapter '{}': Stopping adapter FSM", getId());
346-
stopAdapter();
347-
348-
// Clean up listeners to prevent memory leaks
349-
log.debug("Adapter '{}': Cleaning up connection status listener", getId());
350-
cleanupConnectionStatusListener();
351-
352-
// Remove consumers - must be done within async context
353-
log.debug("Adapter '{}': Removing {} consumers", getId(), consumers.size());
354-
consumers.forEach(tagManager::removeConsumer);
355-
356-
log.debug("Adapter '{}': Stopping polling", getId());
357-
stopPolling(protocolAdapterPollingService);
343+
// Defensive check: if executor is shutdown, execute stop synchronously
344+
// This can happen during JVM shutdown if there's a race between shutdown hooks
345+
if (sharedAdapterExecutor.isShutdown()) {
346+
log.warn("Adapter '{}': Executor is shutdown, executing stop operation synchronously in current thread", getId());
347+
try {
348+
// Execute stop logic directly in calling thread
349+
final CompletableFuture<Void> syncFuture = performStopOperation(input, output)
350+
.whenComplete((result, throwable) -> {
351+
log.debug("Adapter '{}': Synchronous stop operation completed, starting cleanup", getId());
352+
353+
// Always call destroy() to ensure all resources are properly released
354+
try {
355+
log.info("Destroying adapter with id '{}' to release all resources", getId());
356+
adapter.destroy();
357+
log.debug("Adapter '{}': destroy() completed successfully", getId());
358+
} catch (final Exception destroyException) {
359+
log.error("Error destroying adapter with id {}", adapter.getId(), destroyException);
360+
}
361+
362+
if (throwable == null) {
363+
log.info("Stopped adapter with id '{}' successfully", adapter.getId());
364+
} else {
365+
log.error("Error stopping adapter with id {}", adapter.getId(), throwable);
366+
}
367+
368+
// Clear reference to stop future
369+
log.debug("Adapter '{}': Cleared currentStopFuture reference", getId());
370+
currentStopFuture = null;
371+
});
358372

359-
log.debug("Adapter '{}': Stopping writing", getId());
360-
stopWriting(protocolAdapterWritingService);
373+
currentStopFuture = syncFuture;
374+
return syncFuture;
375+
} catch (final Exception e) {
376+
log.error("Adapter '{}': Exception during synchronous stop", getId(), e);
377+
return CompletableFuture.failedFuture(e);
378+
}
379+
}
361380

362-
try {
363-
log.debug("Adapter '{}': Calling adapter.stop()", getId());
364-
adapter.stop(input, output);
365-
} catch (final Throwable throwable) {
366-
log.error("Adapter '{}': Exception during adapter.stop()", getId(), throwable);
367-
output.getOutputFuture().completeExceptionally(throwable);
368-
}
369-
log.debug("Adapter '{}': Waiting for stop output future", getId());
370-
return output.getOutputFuture();
371-
}, sharedAdapterExecutor) // Use shared executor to reduce thread overhead
372-
.thenCompose(Function.identity()).whenComplete((result, throwable) -> {
381+
final var stopFuture = CompletableFuture.supplyAsync(() -> performStopOperation(input, output),
382+
sharedAdapterExecutor) // Use shared executor to reduce thread overhead
383+
.thenCompose(Function.identity())
384+
.whenComplete((result, throwable) -> {
373385
log.debug("Adapter '{}': Stop operation completed, starting cleanup", getId());
374386

375387
// Always call destroy() to ensure all resources are properly released
@@ -566,4 +578,47 @@ private void createAndSubscribeTagConsumer() {
566578
consumers.add(northboundTagConsumer);
567579
});
568580
}
581+
582+
/**
583+
* Performs the actual stop operation for the adapter.
584+
* Extracted into a separate method so it can be called both asynchronously
585+
* (normal case) and synchronously (when executor is shutdown during JVM shutdown).
586+
*
587+
* @param input the stop input
588+
* @param output the stop output
589+
* @return the completion future from the adapter's stop operation
590+
*/
591+
private @NotNull CompletableFuture<Void> performStopOperation(
592+
final @NotNull ProtocolAdapterStopInput input,
593+
final @NotNull ProtocolAdapterStopOutput output) {
594+
log.debug("Adapter '{}': Stop operation executing in thread '{}'", getId(), Thread.currentThread().getName());
595+
596+
// Signal FSM to stop (calls onStopping() internally)
597+
log.debug("Adapter '{}': Stopping adapter FSM", getId());
598+
stopAdapter();
599+
600+
// Clean up listeners to prevent memory leaks
601+
log.debug("Adapter '{}': Cleaning up connection status listener", getId());
602+
cleanupConnectionStatusListener();
603+
604+
// Remove consumers
605+
log.debug("Adapter '{}': Removing {} consumers", getId(), consumers.size());
606+
consumers.forEach(tagManager::removeConsumer);
607+
608+
log.debug("Adapter '{}': Stopping polling", getId());
609+
stopPolling(protocolAdapterPollingService);
610+
611+
log.debug("Adapter '{}': Stopping writing", getId());
612+
stopWriting(protocolAdapterWritingService);
613+
614+
try {
615+
log.debug("Adapter '{}': Calling adapter.stop()", getId());
616+
adapter.stop(input, output);
617+
} catch (final Throwable throwable) {
618+
log.error("Adapter '{}': Exception during adapter.stop()", getId(), throwable);
619+
((ProtocolAdapterStopOutputImpl) output).getOutputFuture().completeExceptionally(throwable);
620+
}
621+
log.debug("Adapter '{}': Waiting for stop output future", getId());
622+
return ((ProtocolAdapterStopOutputImpl) output).getOutputFuture();
623+
}
569624
}

0 commit comments

Comments
 (0)