Skip to content

Commit e0f9d0e

Browse files
committed
Add default for noChecks and check if adapter is stopped
1 parent 4110ded commit e0f9d0e

File tree

2 files changed

+69
-23
lines changed

2 files changed

+69
-23
lines changed

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java

Lines changed: 66 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ public class OpcUaProtocolAdapter implements WritingProtocolAdapter {
8585
private volatile ParsedConfig parsedConfig;
8686
private volatile ModuleServices moduleServices;
8787

88+
// Flag to prevent scheduling after stop
89+
private volatile boolean stopped = false;
90+
8891
public OpcUaProtocolAdapter(
8992
final @NotNull ProtocolAdapterInformation adapterInformation,
9093
final @NotNull ProtocolAdapterInput<OpcUaSpecificAdapterConfig> input) {
@@ -109,6 +112,10 @@ public synchronized void start(
109112
final @NotNull ProtocolAdapterStartInput input,
110113
final @NotNull ProtocolAdapterStartOutput output) {
111114
log.info("Starting OPC UA protocol adapter {}", adapterId);
115+
116+
// Reset stopped flag
117+
stopped = false;
118+
112119
final ParsedConfig parsedConfig;
113120
final var result = ParsedConfig.fromConfig(config);
114121
if (result instanceof Failure<ParsedConfig, String>(final String failure)) {
@@ -161,10 +168,16 @@ public synchronized void stop(
161168
final @NotNull ProtocolAdapterStopOutput output) {
162169
log.info("Stopping OPC UA protocol adapter {}", adapterId);
163170

171+
// Set stopped flag to prevent new scheduling
172+
stopped = true;
173+
164174
// Cancel any pending retries and health checks
165175
cancelRetry();
166176
cancelHealthCheck();
167177

178+
// Shutdown schedulers immediately to prevent new tasks
179+
shutdownSchedulers();
180+
168181
// Clear stored configuration to prevent reconnection after stop
169182
this.parsedConfig = null;
170183
this.moduleServices = null;
@@ -184,6 +197,12 @@ public synchronized void stop(
184197
* Requires that start() has been called previously to initialize parsedConfig and moduleServices.
185198
*/
186199
private void reconnect() {
200+
// Check if adapter has been stopped
201+
if (stopped) {
202+
log.debug("Skipping reconnection for adapter '{}' - adapter has been stopped", adapterId);
203+
return;
204+
}
205+
187206
log.info("Reconnecting OPC UA adapter '{}'", adapterId);
188207

189208
// Verify we have the necessary configuration
@@ -233,8 +252,20 @@ public ModuleServices moduleServices() {
233252
* Schedules periodic health check that monitors connection health and triggers reconnection if needed.
234253
*/
235254
private void scheduleHealthCheck() {
255+
// Check if adapter has been stopped
256+
if (stopped) {
257+
log.debug("Skipping health check scheduling for adapter '{}' - adapter has been stopped", adapterId);
258+
return;
259+
}
260+
236261
final int healthCheckIntervalSeconds = config.getHealthCheckInterval();
237262
final ScheduledFuture<?> future = healthCheckScheduler.scheduleAtFixedRate(() -> {
263+
// Check if adapter was stopped before health check executes
264+
if (stopped) {
265+
log.debug("Health check skipped for adapter '{}' - adapter was stopped", adapterId);
266+
return;
267+
}
268+
238269
final OpcUaClientConnection conn = opcUaClientConnection.get();
239270
if (conn == null) {
240271
log.debug("Health check skipped - no active connection for adapter '{}'", adapterId);
@@ -275,6 +306,32 @@ private void cancelHealthCheck() {
275306
}
276307
}
277308

309+
/**
310+
* Shuts down both retry and health check schedulers.
311+
* Uses immediate shutdown to cancel all pending tasks.
312+
*/
313+
private void shutdownSchedulers() {
314+
// Shutdown retry scheduler - use shutdownNow() to cancel pending tasks immediately
315+
if (!retryScheduler.isShutdown()) {
316+
retryScheduler.shutdownNow();
317+
try {
318+
retryScheduler.awaitTermination(5, TimeUnit.SECONDS);
319+
} catch (final InterruptedException e) {
320+
Thread.currentThread().interrupt();
321+
}
322+
}
323+
324+
// Shutdown health check scheduler - use shutdownNow() to cancel pending tasks immediately
325+
if (!healthCheckScheduler.isShutdown()) {
326+
healthCheckScheduler.shutdownNow();
327+
try {
328+
healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS);
329+
} catch (final InterruptedException e) {
330+
Thread.currentThread().interrupt();
331+
}
332+
}
333+
}
334+
278335
@Override
279336
public void destroy() {
280337
log.info("Destroying OPC UA protocol adapter {}", adapterId);
@@ -283,27 +340,8 @@ public void destroy() {
283340
cancelRetry();
284341
cancelHealthCheck();
285342

286-
// Shutdown retry scheduler
287-
retryScheduler.shutdown();
288-
try {
289-
if (!retryScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
290-
retryScheduler.shutdownNow();
291-
}
292-
} catch (final InterruptedException e) {
293-
Thread.currentThread().interrupt();
294-
retryScheduler.shutdownNow();
295-
}
296-
297-
// Shutdown health check scheduler
298-
healthCheckScheduler.shutdown();
299-
try {
300-
if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
301-
healthCheckScheduler.shutdownNow();
302-
}
303-
} catch (final InterruptedException e) {
304-
Thread.currentThread().interrupt();
305-
healthCheckScheduler.shutdownNow();
306-
}
343+
// Shutdown schedulers (if not already shutdown in stop())
344+
shutdownSchedulers();
307345

308346
final OpcUaClientConnection conn = opcUaClientConnection.getAndSet(null);
309347
if (conn != null) {
@@ -491,10 +529,16 @@ private void scheduleRetry(
491529
final @NotNull ParsedConfig parsedConfig,
492530
final @NotNull ProtocolAdapterStartInput input) {
493531

532+
// Check if adapter has been stopped
533+
if (stopped) {
534+
log.debug("Skipping retry scheduling for adapter '{}' - adapter has been stopped", adapterId);
535+
return;
536+
}
537+
494538
final int retryIntervalSeconds = config.getRetryInterval();
495539
final ScheduledFuture<?> future = retryScheduler.schedule(() -> {
496540
// Check if adapter was stopped before retry executes
497-
if (this.parsedConfig == null || this.moduleServices == null) {
541+
if (stopped || this.parsedConfig == null || this.moduleServices == null) {
498542
log.debug("OPC UA adapter '{}' retry cancelled - adapter was stopped", adapterId);
499543
return;
500544
}

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/Tls.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Objects;
2525

2626
import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
27+
import static java.util.Objects.requireNonNullElse;
2728

2829
public record Tls (@JsonProperty("enabled")
2930
@ModuleConfigField(title = "Enable TLS",
@@ -50,6 +51,7 @@ public record Tls (@JsonProperty("enabled")
5051
@Nullable Truststore truststore
5152
) {
5253
@JsonCreator
53-
public Tls{
54+
public Tls {
55+
noChecks = requireNonNullElse(noChecks, false);
5456
}
5557
}

0 commit comments

Comments
 (0)