Skip to content

Commit 8af0b51

Browse files
committed
stop adapters orderly on shutdown of EDGE
1 parent c04660e commit 8af0b51

File tree

5 files changed

+187
-23
lines changed

5 files changed

+187
-23
lines changed

hivemq-edge/src/main/java/com/hivemq/common/executors/ioc/ExecutorsModule.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,21 @@ public abstract class ExecutorsModule {
6161
if (log.isDebugEnabled()) {
6262
log.debug("Shutting down executor service: {}", name);
6363
}
64-
executor.shutdown();
64+
// Only initiate shutdown if not already shutting down
65+
// This allows ProtocolAdapterManager to shut down executors first
66+
if (!executor.isShutdown()) {
67+
executor.shutdown();
68+
}
6569
try {
66-
if (!executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)) {
70+
// Reduced timeout since ProtocolAdapterManager should have already
71+
// initiated shutdown for adapters
72+
if (!executor.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
6773
log.warn("Executor service {} did not terminate in time, forcing shutdown", name);
6874
executor.shutdownNow();
75+
// Give a final grace period after forced shutdown
76+
if (!executor.awaitTermination(2, java.util.concurrent.TimeUnit.SECONDS)) {
77+
log.error("Executor service {} still has running tasks after forced shutdown", name);
78+
}
6979
}
7080
} catch (final InterruptedException e) {
7181
Thread.currentThread().interrupt();

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@
5757
import java.util.concurrent.ExecutorService;
5858
import java.util.concurrent.Executors;
5959
import java.util.concurrent.Future;
60+
import java.util.concurrent.TimeUnit;
61+
import java.util.concurrent.TimeoutException;
62+
import java.util.concurrent.atomic.AtomicBoolean;
6063
import java.util.function.Function;
6164
import java.util.function.Supplier;
6265
import java.util.stream.Collectors;
@@ -85,6 +88,7 @@ public class ProtocolAdapterManager {
8588
private final @NotNull ProtocolAdapterExtractor protocolAdapterConfig;
8689
private final @NotNull ExecutorService executorService;
8790
private final @NotNull ExecutorService sharedAdapterExecutor;
91+
private final @NotNull AtomicBoolean shutdownInitiated;
8892

8993
@Inject
9094
public ProtocolAdapterManager(
@@ -118,7 +122,18 @@ public ProtocolAdapterManager(
118122
this.sharedAdapterExecutor = sharedAdapterExecutor;
119123
this.protocolAdapters = new ConcurrentHashMap<>();
120124
this.executorService = Executors.newSingleThreadExecutor();
121-
Runtime.getRuntime().addShutdownHook(new Thread(executorService::shutdown));
125+
this.shutdownInitiated = new AtomicBoolean(false);
126+
127+
// Register coordinated shutdown hook that stops adapters BEFORE executors shutdown
128+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
129+
if (shutdownInitiated.compareAndSet(false, true)) {
130+
log.info("Initiating coordinated shutdown of Protocol Adapter Manager");
131+
stopAllAdaptersOnShutdown();
132+
shutdownExecutorsGracefully();
133+
log.info("Protocol Adapter Manager shutdown completed");
134+
}
135+
}, "protocol-adapter-manager-shutdown"));
136+
122137
protocolAdapterWritingService.addWritingChangedCallback(() -> protocolAdapterFactoryManager.writingEnabledChanged(
123138
protocolAdapterWritingService.writingEnabled()));
124139
}
@@ -519,4 +534,96 @@ public boolean writingEnabled() {
519534
configConverter.convertTagDefinitionToJsonNode(tag.getDefinition())))
520535
.toList());
521536
}
537+
538+
/**
539+
* Stop all adapters during shutdown in a coordinated manner.
540+
* This method is called by the shutdown hook BEFORE executors are shut down,
541+
* ensuring adapters can complete their stop operations cleanly.
542+
*/
543+
private void stopAllAdaptersOnShutdown() {
544+
final List<ProtocolAdapterWrapper> adaptersToStop = new ArrayList<>(protocolAdapters.values());
545+
546+
if (adaptersToStop.isEmpty()) {
547+
log.debug("No adapters to stop during shutdown");
548+
return;
549+
}
550+
551+
log.info("Stopping {} protocol adapters during shutdown", adaptersToStop.size());
552+
final List<CompletableFuture<Void>> stopFutures = new ArrayList<>();
553+
554+
// Initiate stop for all adapters
555+
for (final ProtocolAdapterWrapper wrapper : adaptersToStop) {
556+
try {
557+
log.debug("Initiating stop for adapter '{}'", wrapper.getId());
558+
final CompletableFuture<Void> stopFuture = wrapper.stopAsync();
559+
stopFutures.add(stopFuture);
560+
} catch (final Exception e) {
561+
log.error("Error initiating stop for adapter '{}' during shutdown", wrapper.getId(), e);
562+
}
563+
}
564+
565+
// Wait for all adapters to stop, with timeout
566+
final CompletableFuture<Void> allStops = CompletableFuture.allOf(stopFutures.toArray(new CompletableFuture[0]));
567+
568+
try {
569+
// Give adapters 20 seconds to stop gracefully
570+
allStops.get(20, TimeUnit.SECONDS);
571+
log.info("All adapters stopped successfully during shutdown");
572+
} catch (final TimeoutException e) {
573+
log.warn("Timeout waiting for adapters to stop during shutdown (waited 20s). Proceeding with executor shutdown.");
574+
// Log which adapters failed to stop
575+
for (int i = 0; i < stopFutures.size(); i++) {
576+
if (!stopFutures.get(i).isDone()) {
577+
log.warn("Adapter '{}' did not complete stop operation within timeout", adaptersToStop.get(i).getId());
578+
}
579+
}
580+
} catch (final InterruptedException e) {
581+
Thread.currentThread().interrupt();
582+
log.error("Interrupted while waiting for adapters to stop during shutdown", e);
583+
} catch (final ExecutionException e) {
584+
log.error("Error occurred while stopping adapters during shutdown", e.getCause());
585+
}
586+
}
587+
588+
/**
589+
* Shutdown executors gracefully after adapters have stopped.
590+
* This ensures a clean shutdown sequence.
591+
*/
592+
private void shutdownExecutorsGracefully() {
593+
log.debug("Shutting down protocol adapter manager executors");
594+
595+
// Shutdown the single-threaded executor used for adapter refresh
596+
executorService.shutdown();
597+
try {
598+
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
599+
log.warn("Executor service did not terminate in time, forcing shutdown");
600+
executorService.shutdownNow();
601+
}
602+
} catch (final InterruptedException e) {
603+
Thread.currentThread().interrupt();
604+
log.warn("Interrupted while waiting for executor service to terminate");
605+
executorService.shutdownNow();
606+
}
607+
608+
// Shutdown the shared adapter executor
609+
// Note: This may also be shut down by ExecutorsModule shutdown hook,
610+
// but calling shutdown() multiple times is safe (idempotent)
611+
sharedAdapterExecutor.shutdown();
612+
try {
613+
if (!sharedAdapterExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
614+
log.warn("Shared adapter executor did not terminate in time, forcing shutdown");
615+
sharedAdapterExecutor.shutdownNow();
616+
// Wait a bit more after forced shutdown
617+
if (!sharedAdapterExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
618+
log.error("Shared adapter executor still has running tasks after forced shutdown");
619+
}
620+
}
621+
} catch (final InterruptedException e) {
622+
Thread.currentThread().interrupt();
623+
log.warn("Interrupted while waiting for shared adapter executor to terminate");
624+
sharedAdapterExecutor.shutdownNow();
625+
}
626+
627+
log.debug("Protocol adapter manager executors shutdown completed");
628+
}
522629
}

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,38 +336,54 @@ private void cleanupConnectionStatusListener() {
336336
final var input = new ProtocolAdapterStopInputImpl();
337337
final var output = new ProtocolAdapterStopOutputImpl();
338338

339+
log.debug("Adapter '{}': Creating stop operation future", getId());
340+
339341
final var stopFuture = CompletableFuture.supplyAsync(() -> {
342+
log.debug("Adapter '{}': Stop operation executing in thread '{}'", getId(), Thread.currentThread().getName());
343+
340344
// Signal FSM to stop (calls onStopping() internally)
345+
log.debug("Adapter '{}': Stopping adapter FSM", getId());
341346
stopAdapter();
342347

343348
// Clean up listeners to prevent memory leaks
349+
log.debug("Adapter '{}': Cleaning up connection status listener", getId());
344350
cleanupConnectionStatusListener();
345351

346352
// Remove consumers - must be done within async context
353+
log.debug("Adapter '{}': Removing {} consumers", getId(), consumers.size());
347354
consumers.forEach(tagManager::removeConsumer);
348355

356+
log.debug("Adapter '{}': Stopping polling", getId());
349357
stopPolling(protocolAdapterPollingService);
358+
359+
log.debug("Adapter '{}': Stopping writing", getId());
350360
stopWriting(protocolAdapterWritingService);
351361

352362
try {
363+
log.debug("Adapter '{}': Calling adapter.stop()", getId());
353364
adapter.stop(input, output);
354365
} catch (final Throwable throwable) {
366+
log.error("Adapter '{}': Exception during adapter.stop()", getId(), throwable);
355367
output.getOutputFuture().completeExceptionally(throwable);
356368
}
369+
log.debug("Adapter '{}': Waiting for stop output future", getId());
357370
return output.getOutputFuture();
358371
}, sharedAdapterExecutor) // Use shared executor to reduce thread overhead
359372
.thenCompose(Function.identity()).whenComplete((result, throwable) -> {
373+
log.debug("Adapter '{}': Stop operation completed, starting cleanup", getId());
374+
360375
// Always call destroy() to ensure all resources are properly released
361376
// This prevents resource leaks from underlying client libraries
362377
try {
363378
log.info("Destroying adapter with id '{}' to release all resources", getId());
364379
adapter.destroy();
380+
log.debug("Adapter '{}': destroy() completed successfully", getId());
365381
} catch (final Exception destroyException) {
366382
log.error("Error destroying adapter with id {}", adapter.getId(), destroyException);
367383
}
368384

369385
if (throwable == null) {
370-
log.info("Stopped adapter with id {}", adapter.getId());
386+
log.info("Stopped adapter with id '{}' successfully", adapter.getId());
371387
} else {
372388
log.error("Error stopping adapter with id {}", adapter.getId(), throwable);
373389
}
@@ -376,6 +392,7 @@ private void cleanupConnectionStatusListener() {
376392
operationLock.lock();
377393
try {
378394
currentStopFuture = null;
395+
log.debug("Adapter '{}': Cleared currentStopFuture reference", getId());
379396
} finally {
380397
operationLock.unlock();
381398
}

modules/hivemq-edge-module-etherip/src/main/java/com/hivemq/edge/adapters/etherip/EipPollingProtocolAdapter.java

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838

3939
import java.util.List;
4040
import java.util.Map;
41+
import java.util.concurrent.locks.Lock;
42+
import java.util.concurrent.locks.ReentrantLock;
4143
import java.util.stream.Collectors;
4244

4345

@@ -50,7 +52,8 @@ public class EipPollingProtocolAdapter implements BatchPollingProtocolAdapter {
5052
private final @NotNull ProtocolAdapterState protocolAdapterState;
5153
protected final @NotNull AdapterFactories adapterFactories;
5254
private final @NotNull String adapterId;
53-
private volatile @Nullable EtherNetIP etherNetIP;
55+
private final @NotNull Lock connectionLock;
56+
private @Nullable EtherNetIP etherNetIP; // GuardedBy connectionLock
5457
private final @NotNull PublishChangedDataOnlyHandler lastSamples = new PublishChangedDataOnlyHandler();
5558
private final @NotNull DataPointFactory dataPointFactory;
5659

@@ -69,6 +72,7 @@ public EipPollingProtocolAdapter(
6972
.collect(Collectors.toMap(tag -> tag.getDefinition().getAddress(), tag -> tag));
7073
this.protocolAdapterState = input.getProtocolAdapterState();
7174
this.adapterFactories = input.adapterFactories();
75+
this.connectionLock = new ReentrantLock();
7276
}
7377

7478
@Override
@@ -80,35 +84,53 @@ public EipPollingProtocolAdapter(
8084
public void start(
8185
final @NotNull ProtocolAdapterStartInput input, final @NotNull ProtocolAdapterStartOutput output) {
8286
// any setup which should be done before the adapter starts polling comes here.
87+
connectionLock.lock();
8388
try {
84-
final EtherNetIP etherNetIP = new EtherNetIP(adapterConfig.getHost(), adapterConfig.getSlot());
85-
etherNetIP.connectTcp();
86-
this.etherNetIP = etherNetIP;
87-
output.startedSuccessfully();
89+
if (etherNetIP != null) {
90+
log.warn("Adapter {} is already started, ignoring start request", adapterId);
91+
output.startedSuccessfully();
92+
return;
93+
}
94+
95+
final EtherNetIP newConnection = new EtherNetIP(adapterConfig.getHost(), adapterConfig.getSlot());
96+
newConnection.connectTcp();
97+
this.etherNetIP = newConnection;
8898
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.CONNECTED);
99+
output.startedSuccessfully();
89100
} catch (final Exception e) {
101+
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED);
90102
output.failStart(e, null);
103+
} finally {
104+
connectionLock.unlock();
91105
}
92106
}
93107

94108
@Override
95109
public void stop(
96110
final @NotNull ProtocolAdapterStopInput protocolAdapterStopInput,
97111
final @NotNull ProtocolAdapterStopOutput protocolAdapterStopOutput) {
112+
connectionLock.lock();
98113
try {
99114
final EtherNetIP etherNetIPTemp = etherNetIP;
100115
etherNetIP = null;
116+
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED);
117+
101118
if (etherNetIPTemp != null) {
102-
etherNetIPTemp.close();
103-
protocolAdapterStopOutput.stoppedSuccessfully();
104-
log.info("Stopped");
119+
try {
120+
etherNetIPTemp.close();
121+
log.info("Stopped adapter {}", adapterId);
122+
} catch (final Exception e) {
123+
log.warn("Error closing EtherNetIP connection for adapter {}", adapterId, e);
124+
}
105125
} else {
106-
protocolAdapterStopOutput.stoppedSuccessfully();
107-
log.info("Stopped without an open connection");
126+
log.info("Stopped adapter {} without an open connection", adapterId);
108127
}
128+
protocolAdapterStopOutput.stoppedSuccessfully();
109129
} catch (final Exception e) {
110130
protocolAdapterStopOutput.failStop(e, "Unable to stop Ethernet IP connection");
111-
log.error("Unable to stop", e);
131+
log.error("Unable to stop adapter {}", adapterId, e);
132+
} finally {
133+
connectionLock.unlock();
112134
}
113135
}
114136

@@ -121,10 +143,16 @@ public void stop(
121143
@Override
122144
public void poll(
123145
final @NotNull BatchPollingInput pollingInput, final @NotNull BatchPollingOutput pollingOutput) {
124-
final var client = etherNetIP;
125-
if (client == null) {
126-
pollingOutput.fail("Polling failed because adapter wasn't started.");
127-
return;
146+
final EtherNetIP client;
147+
connectionLock.lock();
148+
try {
149+
client = etherNetIP;
150+
if (client == null) {
151+
pollingOutput.fail("Polling failed because adapter wasn't started.");
152+
return;
153+
}
154+
} finally {
155+
connectionLock.unlock();
128156
}
129157

130158
final var tagAddresses = tags.values().stream().map(v -> v.getDefinition().getAddress()).toArray(String[]::new);
@@ -148,14 +176,14 @@ public void poll(
148176
pollingOutput.finish();
149177
} catch (final CipException e) {
150178
if (e.getStatusCode() == 0x04) {
151-
log.warn("A Tag doesn't exist on device.", e);
179+
log.warn("A Tag doesn't exist on device for adapter {}", adapterId, e);
152180
pollingOutput.fail(e, "Tag doesn't exist on device");
153181
} else {
154-
log.warn("Problem accessing tag on device.", e);
182+
log.warn("Problem accessing tag on device for adapter {}", adapterId, e);
155183
pollingOutput.fail(e, "Problem accessing tag on device.");
156184
}
157185
} catch (final Exception e) {
158-
log.warn("An exception occurred while reading tags '{}'.", tagAddresses, e);
186+
log.warn("An exception occurred while reading tags '{}' for adapter {}", tagAddresses, adapterId, e);
159187
pollingOutput.fail(e, "An exception occurred while reading tags '" + tagAddresses + "'.");
160188
}
161189
}

modules/hivemq-edge-module-etherip/src/main/java/com/hivemq/edge/adapters/etherip/PublishChangedDataOnlyHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ public boolean replaceIfValueIsNew(final @NotNull String tagName, final @NotNull
3939
}
4040
});
4141

42-
return newValue != computedValue;
42+
// Return true if the value was actually replaced (i.e., computedValue is the new value)
43+
// When values are equal, compute() returns the old value, so references will differ
44+
return computedValue == newValue;
4345
}
4446

4547
public void clear() {

0 commit comments

Comments
 (0)