Skip to content

Commit 90599ec

Browse files
committed
Disconnect monitoring
1 parent 485695b commit 90599ec

File tree

2 files changed

+86
-1
lines changed

2 files changed

+86
-1
lines changed

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@
4747
import java.util.Optional;
4848
import java.util.concurrent.CompletableFuture;
4949
import java.util.concurrent.ExecutionException;
50+
import java.util.concurrent.Executors;
51+
import java.util.concurrent.ScheduledExecutorService;
52+
import java.util.concurrent.ScheduledFuture;
5053
import java.util.concurrent.TimeUnit;
5154
import java.util.concurrent.TimeoutException;
5255
import java.util.concurrent.atomic.AtomicReference;
@@ -55,6 +58,7 @@
5558

5659
public class OpcUaClientConnection {
5760
private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaClientConnection.class);
61+
private static final int HEALTH_CHECK_INTERVAL_SECONDS = 30;
5862

5963
private final @NotNull OpcUaSpecificAdapterConfig config;
6064
private final @NotNull List<OpcuaTag> tags;
@@ -66,6 +70,8 @@ public class OpcUaClientConnection {
6670
private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService;
6771

6872
private final @NotNull AtomicReference<ConnectionContext> context = new AtomicReference<>();
73+
private final @NotNull ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor();
74+
private final @NotNull AtomicReference<ScheduledFuture<?>> healthCheckFuture = new AtomicReference<>();
6975

7076
OpcUaClientConnection(
7177
final @NotNull String adapterId,
@@ -202,12 +208,20 @@ synchronized boolean start(final ParsedConfig parsedConfig) {
202208
context.set(new ConnectionContext(subscription.getClient(), faultListener, activityListener));
203209
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.CONNECTED);
204210
client.addSessionActivityListener(activityListener);
211+
212+
// Schedule periodic health check to detect stale connections
213+
scheduleHealthCheck();
214+
205215
log.info("Client created and connected successfully");
206216
return true;
207217
}
208218

209219
synchronized void stop() {
210220
log.info("Stopping OPC UA client");
221+
222+
// Cancel health check
223+
cancelHealthCheck();
224+
211225
final ConnectionContext ctx = context.get();
212226
if(ctx != null) {
213227
quietlyCloseClient(ctx.client(),true, ctx.faultListener(), ctx.activityListener());
@@ -217,6 +231,19 @@ synchronized void stop() {
217231

218232
void destroy() {
219233
log.info("Destroying OPC UA client");
234+
235+
// Cancel health check and shutdown scheduler
236+
cancelHealthCheck();
237+
healthCheckScheduler.shutdown();
238+
try {
239+
if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
240+
healthCheckScheduler.shutdownNow();
241+
}
242+
} catch (InterruptedException e) {
243+
Thread.currentThread().interrupt();
244+
healthCheckScheduler.shutdownNow();
245+
}
246+
220247
final ConnectionContext ctx = context.get();
221248
if(ctx != null) {
222249
quietlyCloseClient(ctx.client(), false, ctx.faultListener(), ctx.activityListener());
@@ -283,6 +310,57 @@ private static void quietlyCloseClient(
283310
}
284311
}
285312

313+
/**
314+
* Schedules periodic health checks to detect stale connections.
315+
* Runs every HEALTH_CHECK_INTERVAL_SECONDS to verify session is active.
316+
*/
317+
private void scheduleHealthCheck() {
318+
final ScheduledFuture<?> future = healthCheckScheduler.scheduleAtFixedRate(() -> {
319+
try {
320+
final ConnectionContext ctx = context.get();
321+
if (ctx == null) {
322+
log.trace("Health check skipped - client not connected for adapter '{}'", adapterId);
323+
return;
324+
}
325+
326+
final OpcUaClient client = ctx.client();
327+
if (client.getSession().isEmpty()) {
328+
log.warn("Health check failed for adapter '{}' - session is not active", adapterId);
329+
eventService
330+
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
331+
.withMessage("Connection health check failed for adapter '" + adapterId + "' - session inactive")
332+
.withSeverity(Event.SEVERITY.WARN)
333+
.fire();
334+
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
335+
} else {
336+
log.trace("Health check passed for adapter '{}' - session is active", adapterId);
337+
}
338+
} catch (Exception e) {
339+
log.warn("Health check exception for adapter '{}'", adapterId, e);
340+
}
341+
}, HEALTH_CHECK_INTERVAL_SECONDS, HEALTH_CHECK_INTERVAL_SECONDS, TimeUnit.SECONDS);
342+
343+
// Store future and cancel any existing health check
344+
final ScheduledFuture<?> oldFuture = healthCheckFuture.getAndSet(future);
345+
if (oldFuture != null && !oldFuture.isDone()) {
346+
oldFuture.cancel(false);
347+
}
348+
349+
log.debug("Scheduled connection health check every {} seconds for adapter '{}'",
350+
HEALTH_CHECK_INTERVAL_SECONDS, adapterId);
351+
}
352+
353+
/**
354+
* Cancels any pending health check.
355+
*/
356+
private void cancelHealthCheck() {
357+
final ScheduledFuture<?> future = healthCheckFuture.getAndSet(null);
358+
if (future != null && !future.isDone()) {
359+
future.cancel(false);
360+
log.debug("Cancelled health check for adapter '{}'", adapterId);
361+
}
362+
}
363+
286364
private record ConnectionContext(@NotNull OpcUaClient client, @NotNull ServiceFaultListener faultListener, @NotNull SessionActivityListener activityListener) {
287365
}
288366
}

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/client/OpcUaClientConfigurator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.hivemq.edge.adapters.opcua.Constants;
1919
import org.eclipse.milo.opcua.sdk.client.OpcUaClientConfigBuilder;
2020
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
21+
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
2122
import org.jetbrains.annotations.NotNull;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
@@ -54,8 +55,14 @@ public void accept(final @NotNull OpcUaClientConfigBuilder configBuilder) {
5455
.setApplicationName(LocalizedText.english(Constants.OPCUA_APPLICATION_NAME))
5556
.setApplicationUri(applicationUri)
5657
.setProductUri(Constants.OPCUA_PRODUCT_URI)
57-
.setSessionName(() -> Constants.OPCUA_SESSION_NAME_PREFIX + adapterId);
58+
.setSessionName(() -> Constants.OPCUA_SESSION_NAME_PREFIX + adapterId)
59+
// Configure timeouts to prevent silent disconnects
60+
.setSessionTimeout(UInteger.valueOf(120_000)) // 2 minutes - session renewal interval
61+
.setRequestTimeout(UInteger.valueOf(30_000)) // 30 seconds - request timeout
62+
.setKeepAliveInterval(UInteger.valueOf(10_000)) // 10 seconds - keep-alive ping interval
63+
.setKeepAliveFailuresAllowed(UInteger.valueOf(3)); // 3 failures = 30s before disconnect
5864

65+
log.info("Configured OPC UA timeouts: session=120s, request=30s, keepAlive=10s, failuresAllowed=3");
5966
log.info("TLS is enabled: {}", parsedConfig.tlsEnabled());
6067
if (parsedConfig.tlsEnabled()) {
6168
if (log.isDebugEnabled()) {

0 commit comments

Comments
 (0)