Skip to content

Commit cd64622

Browse files
committed
Add connection health monitoring and reconnect
1 parent 48a296b commit cd64622

File tree

2 files changed

+62
-76
lines changed

2 files changed

+62
-76
lines changed

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ public class OpcUaClientConnection {
6868
private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService;
6969

7070
private final @NotNull AtomicReference<ConnectionContext> context = new AtomicReference<>();
71-
private final @NotNull ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor();
72-
private final @NotNull AtomicReference<ScheduledFuture<?>> healthCheckFuture = new AtomicReference<>();
7371

7472
OpcUaClientConnection(
7573
final @NotNull String adapterId,
@@ -208,19 +206,13 @@ synchronized boolean start(final ParsedConfig parsedConfig) {
208206
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.CONNECTED);
209207
client.addSessionActivityListener(activityListener);
210208

211-
// Schedule periodic health check to detect stale connections
212-
scheduleHealthCheck();
213-
214209
log.info("Client created and connected successfully");
215210
return true;
216211
}
217212

218213
synchronized void stop() {
219214
log.info("Stopping OPC UA client");
220215

221-
// Cancel health check
222-
cancelHealthCheck();
223-
224216
final ConnectionContext ctx = context.get();
225217
if(ctx != null) {
226218
quietlyCloseClient(ctx.client(),true, ctx.faultListener(), ctx.activityListener());
@@ -231,18 +223,6 @@ synchronized void stop() {
231223
void destroy() {
232224
log.info("Destroying OPC UA client");
233225

234-
// Cancel health check and shutdown scheduler
235-
cancelHealthCheck();
236-
healthCheckScheduler.shutdown();
237-
try {
238-
if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
239-
healthCheckScheduler.shutdownNow();
240-
}
241-
} catch (final InterruptedException e) {
242-
Thread.currentThread().interrupt();
243-
healthCheckScheduler.shutdownNow();
244-
}
245-
246226
final ConnectionContext ctx = context.get();
247227
if(ctx != null) {
248228
quietlyCloseClient(ctx.client(), false, ctx.faultListener(), ctx.activityListener());
@@ -343,58 +323,6 @@ private static void quietlyCloseClient(
343323
}
344324
}
345325

346-
/**
347-
* Schedules periodic health checks to detect stale connections.
348-
* Runs at configured health check interval to verify session is active.
349-
*/
350-
private void scheduleHealthCheck() {
351-
final int healthCheckIntervalSeconds = config.getHealthCheckInterval();
352-
final ScheduledFuture<?> future = healthCheckScheduler.scheduleAtFixedRate(() -> {
353-
try {
354-
final ConnectionContext ctx = context.get();
355-
if (ctx == null) {
356-
log.trace("Health check skipped - client not connected for adapter '{}'", adapterId);
357-
return;
358-
}
359-
360-
final OpcUaClient client = ctx.client();
361-
if (client.getSession().isEmpty()) {
362-
log.warn("Health check failed for adapter '{}' - session is not active", adapterId);
363-
eventService
364-
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
365-
.withMessage("Connection health check failed for adapter '" + adapterId + "' - session inactive")
366-
.withSeverity(Event.SEVERITY.WARN)
367-
.fire();
368-
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
369-
} else {
370-
log.trace("Health check passed for adapter '{}' - session is active", adapterId);
371-
}
372-
} catch (final Exception e) {
373-
log.warn("Health check exception for adapter '{}'", adapterId, e);
374-
}
375-
}, healthCheckIntervalSeconds, healthCheckIntervalSeconds, TimeUnit.SECONDS);
376-
377-
// Store future and cancel any existing health check
378-
final ScheduledFuture<?> oldFuture = healthCheckFuture.getAndSet(future);
379-
if (oldFuture != null && !oldFuture.isDone()) {
380-
oldFuture.cancel(false);
381-
}
382-
383-
log.debug("Scheduled connection health check every {} seconds for adapter '{}'",
384-
healthCheckIntervalSeconds, adapterId);
385-
}
386-
387-
/**
388-
* Cancels any pending health check.
389-
*/
390-
private void cancelHealthCheck() {
391-
final ScheduledFuture<?> future = healthCheckFuture.getAndSet(null);
392-
if (future != null && !future.isDone()) {
393-
future.cancel(false);
394-
log.debug("Cancelled health check for adapter '{}'", adapterId);
395-
}
396-
}
397-
398326
private record ConnectionContext(@NotNull OpcUaClient client,
399327
@NotNull ServiceFaultListener faultListener,
400328
@NotNull SessionActivityListener activityListener,

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ public class OpcUaProtocolAdapter implements WritingProtocolAdapter {
7878
private final @NotNull OpcUaSpecificAdapterConfig config;
7979
private final @NotNull ScheduledExecutorService retryScheduler = Executors.newSingleThreadScheduledExecutor();
8080
private final @NotNull AtomicReference<ScheduledFuture<?>> retryFuture = new AtomicReference<>();
81+
private final @NotNull ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor();
82+
private final @NotNull AtomicReference<ScheduledFuture<?>> healthCheckFuture = new AtomicReference<>();
8183

8284
// Stored for reconnection - set during start()
8385
private volatile ParsedConfig parsedConfig;
@@ -159,8 +161,9 @@ public synchronized void stop(
159161
final @NotNull ProtocolAdapterStopOutput output) {
160162
log.info("Stopping OPC UA protocol adapter {}", adapterId);
161163

162-
// Cancel any pending retries
164+
// Cancel any pending retries and health checks
163165
cancelRetry();
166+
cancelHealthCheck();
164167

165168
// Clear stored configuration to prevent reconnection after stop
166169
this.parsedConfig = null;
@@ -189,8 +192,9 @@ private void reconnect() {
189192
return;
190193
}
191194

192-
// Cancel any pending retries
195+
// Cancel any pending retries and health checks
193196
cancelRetry();
197+
cancelHealthCheck();
194198

195199
// Stop and clean up current connection
196200
final OpcUaClientConnection oldConn = opcUaClientConnection.getAndSet(null);
@@ -225,12 +229,54 @@ public ModuleServices moduleServices() {
225229
}
226230
}
227231

232+
/**
233+
* Schedules periodic health check that monitors connection health and triggers reconnection if needed.
234+
*/
235+
private void scheduleHealthCheck() {
236+
final int healthCheckIntervalSeconds = config.getHealthCheckInterval();
237+
final ScheduledFuture<?> future = healthCheckScheduler.scheduleAtFixedRate(() -> {
238+
final OpcUaClientConnection conn = opcUaClientConnection.get();
239+
if (conn == null) {
240+
log.debug("Health check skipped - no active connection for adapter '{}'", adapterId);
241+
return;
242+
}
243+
244+
if (!conn.isHealthy()) {
245+
log.warn("Health check failed for adapter '{}' - triggering reconnection", adapterId);
246+
reconnect();
247+
} else {
248+
log.debug("Health check passed for adapter '{}'", adapterId);
249+
}
250+
}, healthCheckIntervalSeconds, healthCheckIntervalSeconds, TimeUnit.SECONDS);
251+
252+
// Store future so it can be cancelled if needed
253+
final ScheduledFuture<?> oldFuture = healthCheckFuture.getAndSet(future);
254+
if (oldFuture != null && !oldFuture.isDone()) {
255+
oldFuture.cancel(false);
256+
}
257+
258+
log.debug("Scheduled connection health check every {} seconds for adapter '{}'",
259+
healthCheckIntervalSeconds, adapterId);
260+
}
261+
262+
/**
263+
* Cancels any pending health check.
264+
*/
265+
private void cancelHealthCheck() {
266+
final ScheduledFuture<?> future = healthCheckFuture.getAndSet(null);
267+
if (future != null && !future.isDone()) {
268+
future.cancel(false);
269+
log.debug("Cancelled health check for adapter '{}'", adapterId);
270+
}
271+
}
272+
228273
@Override
229274
public void destroy() {
230275
log.info("Destroying OPC UA protocol adapter {}", adapterId);
231276

232-
// Cancel any pending retries
277+
// Cancel any pending retries and health checks
233278
cancelRetry();
279+
cancelHealthCheck();
234280

235281
// Shutdown retry scheduler
236282
retryScheduler.shutdown();
@@ -243,6 +289,17 @@ public void destroy() {
243289
retryScheduler.shutdownNow();
244290
}
245291

292+
// Shutdown health check scheduler
293+
healthCheckScheduler.shutdown();
294+
try {
295+
if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
296+
healthCheckScheduler.shutdownNow();
297+
}
298+
} catch (final InterruptedException e) {
299+
Thread.currentThread().interrupt();
300+
healthCheckScheduler.shutdownNow();
301+
}
302+
246303
final OpcUaClientConnection conn = opcUaClientConnection.getAndSet(null);
247304
if (conn != null) {
248305
CompletableFuture.runAsync(() -> {
@@ -398,8 +455,9 @@ private void attemptConnection(
398455

399456
CompletableFuture.supplyAsync(() -> conn.start(parsedConfig)).whenComplete((success, throwable) -> {
400457
if (success && throwable == null) {
401-
// Connection succeeded - cancel any pending retries
458+
// Connection succeeded - cancel any pending retries and start health check
402459
cancelRetry();
460+
scheduleHealthCheck();
403461
log.info("OPC UA adapter '{}' connected successfully", adapterId);
404462
} else {
405463
// Connection failed - clean up and schedule retry

0 commit comments

Comments
 (0)