@@ -85,6 +85,9 @@ public class OpcUaProtocolAdapter implements WritingProtocolAdapter {
8585 private volatile ParsedConfig parsedConfig ;
8686 private volatile ModuleServices moduleServices ;
8787
88+ // Flag to prevent scheduling after stop
89+ private volatile boolean stopped = false ;
90+
8891 public OpcUaProtocolAdapter (
8992 final @ NotNull ProtocolAdapterInformation adapterInformation ,
9093 final @ NotNull ProtocolAdapterInput <OpcUaSpecificAdapterConfig > input ) {
@@ -109,6 +112,10 @@ public synchronized void start(
109112 final @ NotNull ProtocolAdapterStartInput input ,
110113 final @ NotNull ProtocolAdapterStartOutput output ) {
111114 log .info ("Starting OPC UA protocol adapter {}" , adapterId );
115+
116+ // Reset stopped flag
117+ stopped = false ;
118+
112119 final ParsedConfig parsedConfig ;
113120 final var result = ParsedConfig .fromConfig (config );
114121 if (result instanceof Failure <ParsedConfig , String >(final String failure )) {
@@ -161,10 +168,16 @@ public synchronized void stop(
161168 final @ NotNull ProtocolAdapterStopOutput output ) {
162169 log .info ("Stopping OPC UA protocol adapter {}" , adapterId );
163170
171+ // Set stopped flag to prevent new scheduling
172+ stopped = true ;
173+
164174 // Cancel any pending retries and health checks
165175 cancelRetry ();
166176 cancelHealthCheck ();
167177
178+ // Shutdown schedulers immediately to prevent new tasks
179+ shutdownSchedulers ();
180+
168181 // Clear stored configuration to prevent reconnection after stop
169182 this .parsedConfig = null ;
170183 this .moduleServices = null ;
@@ -184,6 +197,12 @@ public synchronized void stop(
184197 * Requires that start() has been called previously to initialize parsedConfig and moduleServices.
185198 */
186199 private void reconnect () {
200+ // Check if adapter has been stopped
201+ if (stopped ) {
202+ log .debug ("Skipping reconnection for adapter '{}' - adapter has been stopped" , adapterId );
203+ return ;
204+ }
205+
187206 log .info ("Reconnecting OPC UA adapter '{}'" , adapterId );
188207
189208 // Verify we have the necessary configuration
@@ -233,8 +252,20 @@ public ModuleServices moduleServices() {
233252 * Schedules periodic health check that monitors connection health and triggers reconnection if needed.
234253 */
235254 private void scheduleHealthCheck () {
255+ // Check if adapter has been stopped
256+ if (stopped ) {
257+ log .debug ("Skipping health check scheduling for adapter '{}' - adapter has been stopped" , adapterId );
258+ return ;
259+ }
260+
236261 final int healthCheckIntervalSeconds = config .getHealthCheckInterval ();
237262 final ScheduledFuture <?> future = healthCheckScheduler .scheduleAtFixedRate (() -> {
263+ // Check if adapter was stopped before health check executes
264+ if (stopped ) {
265+ log .debug ("Health check skipped for adapter '{}' - adapter was stopped" , adapterId );
266+ return ;
267+ }
268+
238269 final OpcUaClientConnection conn = opcUaClientConnection .get ();
239270 if (conn == null ) {
240271 log .debug ("Health check skipped - no active connection for adapter '{}'" , adapterId );
@@ -275,6 +306,32 @@ private void cancelHealthCheck() {
275306 }
276307 }
277308
309+ /**
310+ * Shuts down both retry and health check schedulers.
311+ * Uses immediate shutdown to cancel all pending tasks.
312+ */
313+ private void shutdownSchedulers () {
314+ // Shutdown retry scheduler - use shutdownNow() to cancel pending tasks immediately
315+ if (!retryScheduler .isShutdown ()) {
316+ retryScheduler .shutdownNow ();
317+ try {
318+ retryScheduler .awaitTermination (5 , TimeUnit .SECONDS );
319+ } catch (final InterruptedException e ) {
320+ Thread .currentThread ().interrupt ();
321+ }
322+ }
323+
324+ // Shutdown health check scheduler - use shutdownNow() to cancel pending tasks immediately
325+ if (!healthCheckScheduler .isShutdown ()) {
326+ healthCheckScheduler .shutdownNow ();
327+ try {
328+ healthCheckScheduler .awaitTermination (5 , TimeUnit .SECONDS );
329+ } catch (final InterruptedException e ) {
330+ Thread .currentThread ().interrupt ();
331+ }
332+ }
333+ }
334+
278335 @ Override
279336 public void destroy () {
280337 log .info ("Destroying OPC UA protocol adapter {}" , adapterId );
@@ -283,27 +340,8 @@ public void destroy() {
283340 cancelRetry ();
284341 cancelHealthCheck ();
285342
286- // Shutdown retry scheduler
287- retryScheduler .shutdown ();
288- try {
289- if (!retryScheduler .awaitTermination (5 , TimeUnit .SECONDS )) {
290- retryScheduler .shutdownNow ();
291- }
292- } catch (final InterruptedException e ) {
293- Thread .currentThread ().interrupt ();
294- retryScheduler .shutdownNow ();
295- }
296-
297- // Shutdown health check scheduler
298- healthCheckScheduler .shutdown ();
299- try {
300- if (!healthCheckScheduler .awaitTermination (5 , TimeUnit .SECONDS )) {
301- healthCheckScheduler .shutdownNow ();
302- }
303- } catch (final InterruptedException e ) {
304- Thread .currentThread ().interrupt ();
305- healthCheckScheduler .shutdownNow ();
306- }
343+ // Shutdown schedulers (if not already shutdown in stop())
344+ shutdownSchedulers ();
307345
308346 final OpcUaClientConnection conn = opcUaClientConnection .getAndSet (null );
309347 if (conn != null ) {
@@ -491,10 +529,16 @@ private void scheduleRetry(
491529 final @ NotNull ParsedConfig parsedConfig ,
492530 final @ NotNull ProtocolAdapterStartInput input ) {
493531
532+ // Check if adapter has been stopped
533+ if (stopped ) {
534+ log .debug ("Skipping retry scheduling for adapter '{}' - adapter has been stopped" , adapterId );
535+ return ;
536+ }
537+
494538 final int retryIntervalSeconds = config .getRetryInterval ();
495539 final ScheduledFuture <?> future = retryScheduler .schedule (() -> {
496540 // Check if adapter was stopped before retry executes
497- if (this .parsedConfig == null || this .moduleServices == null ) {
541+ if (stopped || this .parsedConfig == null || this .moduleServices == null ) {
498542 log .debug ("OPC UA adapter '{}' retry cancelled - adapter was stopped" , adapterId );
499543 return ;
500544 }
0 commit comments