Skip to content

Commit 0e6372c

Browse files
committed
Test cleanup
1 parent 90599ec commit 0e6372c

File tree

13 files changed

+313
-68
lines changed

13 files changed

+313
-68
lines changed

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
3737
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaSubscription;
3838
import org.eclipse.milo.opcua.stack.core.UaException;
39-
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
4039
import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
4140
import org.jetbrains.annotations.NotNull;
4241
import org.jetbrains.annotations.Nullable;
@@ -58,7 +57,6 @@
5857

5958
public class OpcUaClientConnection {
6059
private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaClientConnection.class);
61-
private static final int HEALTH_CHECK_INTERVAL_SECONDS = 30;
6260

6361
private final @NotNull OpcUaSpecificAdapterConfig config;
6462
private final @NotNull List<OpcuaTag> tags;
@@ -128,25 +126,26 @@ synchronized boolean start(final ParsedConfig parsedConfig) {
128126
config.getUri(),
129127
endpointFilter,
130128
ignore -> {},
131-
new OpcUaClientConfigurator(adapterId, parsedConfig));
129+
new OpcUaClientConfigurator(adapterId, parsedConfig, config));
132130
client.addFaultListener(faultListener);
133131

134132
// Add timeout to connection attempt to prevent hanging forever
135133
// Wrap synchronous connect() call with CompletableFuture timeout
134+
final int connectionTimeoutSeconds = config.getConnectionTimeout();
136135
try {
137136
CompletableFuture.runAsync(() -> {
138137
try {
139138
client.connect();
140139
} catch (final UaException e) {
141140
throw new RuntimeException(e);
142141
}
143-
}).get(30, TimeUnit.SECONDS);
142+
}).get(connectionTimeoutSeconds, TimeUnit.SECONDS);
144143
log.debug("OPC UA client connected successfully for adapter '{}'", adapterId);
145144
} catch (final TimeoutException e) {
146-
log.error("Connection timeout after 30 seconds for OPC UA adapter '{}'", adapterId);
145+
log.error("Connection timeout after {} seconds for OPC UA adapter '{}'", connectionTimeoutSeconds, adapterId);
147146
eventService
148147
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
149-
.withMessage("Connection timeout after 30 seconds for adapter '" + adapterId + "'")
148+
.withMessage("Connection timeout after " + connectionTimeoutSeconds + " seconds for adapter '" + adapterId + "'")
150149
.withSeverity(Event.SEVERITY.ERROR)
151150
.fire();
152151
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
@@ -239,7 +238,7 @@ void destroy() {
239238
if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
240239
healthCheckScheduler.shutdownNow();
241240
}
242-
} catch (InterruptedException e) {
241+
} catch (final InterruptedException e) {
243242
Thread.currentThread().interrupt();
244243
healthCheckScheduler.shutdownNow();
245244
}
@@ -312,9 +311,10 @@ private static void quietlyCloseClient(
312311

313312
/**
314313
* Schedules periodic health checks to detect stale connections.
315-
* Runs every HEALTH_CHECK_INTERVAL_SECONDS to verify session is active.
314+
* Runs at configured health check interval to verify session is active.
316315
*/
317316
private void scheduleHealthCheck() {
317+
final int healthCheckIntervalSeconds = config.getHealthCheckInterval();
318318
final ScheduledFuture<?> future = healthCheckScheduler.scheduleAtFixedRate(() -> {
319319
try {
320320
final ConnectionContext ctx = context.get();
@@ -335,10 +335,10 @@ private void scheduleHealthCheck() {
335335
} else {
336336
log.trace("Health check passed for adapter '{}' - session is active", adapterId);
337337
}
338-
} catch (Exception e) {
338+
} catch (final Exception e) {
339339
log.warn("Health check exception for adapter '{}'", adapterId, e);
340340
}
341-
}, HEALTH_CHECK_INTERVAL_SECONDS, HEALTH_CHECK_INTERVAL_SECONDS, TimeUnit.SECONDS);
341+
}, healthCheckIntervalSeconds, healthCheckIntervalSeconds, TimeUnit.SECONDS);
342342

343343
// Store future and cancel any existing health check
344344
final ScheduledFuture<?> oldFuture = healthCheckFuture.getAndSet(future);
@@ -347,7 +347,7 @@ private void scheduleHealthCheck() {
347347
}
348348

349349
log.debug("Scheduled connection health check every {} seconds for adapter '{}'",
350-
HEALTH_CHECK_INTERVAL_SECONDS, adapterId);
350+
healthCheckIntervalSeconds, adapterId);
351351
}
352352

353353
/**

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464

6565
public class OpcUaProtocolAdapter implements WritingProtocolAdapter {
6666
private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaProtocolAdapter.class);
67-
private static final int RETRY_DELAY_SECONDS = 30;
6867

6968
private final @NotNull ProtocolAdapterInformation adapterInformation;
7069
private final @NotNull ProtocolAdapterState protocolAdapterState;
@@ -328,7 +327,7 @@ public void createTagSchema(
328327

329328
/**
330329
* Attempts to establish connection to OPC UA server.
331-
* On failure, schedules automatic retry after RETRY_DELAY_SECONDS.
330+
* On failure, schedules automatic retry after configured retry interval.
332331
*/
333332
private void attemptConnection(
334333
final @NotNull OpcUaClientConnection conn,
@@ -345,12 +344,13 @@ private void attemptConnection(
345344
this.opcUaClientConnection.set(null);
346345
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
347346

347+
final int retryIntervalSeconds = config.getRetryInterval();
348348
if (throwable != null) {
349349
log.warn("OPC UA adapter '{}' connection failed, will retry in {} seconds",
350-
adapterId, RETRY_DELAY_SECONDS, throwable);
350+
adapterId, retryIntervalSeconds, throwable);
351351
} else {
352352
log.warn("OPC UA adapter '{}' connection returned false, will retry in {} seconds",
353-
adapterId, RETRY_DELAY_SECONDS);
353+
adapterId, retryIntervalSeconds);
354354
}
355355

356356
// Schedule retry attempt
@@ -360,12 +360,13 @@ private void attemptConnection(
360360
}
361361

362362
/**
363-
* Schedules a retry attempt after RETRY_DELAY_SECONDS.
363+
* Schedules a retry attempt after configured retry interval.
364364
*/
365365
private void scheduleRetry(
366366
final @NotNull ParsedConfig parsedConfig,
367367
final @NotNull ProtocolAdapterStartInput input) {
368368

369+
final int retryIntervalSeconds = config.getRetryInterval();
369370
final ScheduledFuture<?> future = retryScheduler.schedule(() -> {
370371
// Check if adapter was stopped before retry executes
371372
if (opcUaClientConnection.get() == null) {
@@ -391,7 +392,7 @@ private void scheduleRetry(
391392
} else {
392393
log.debug("OPC UA adapter '{}' retry skipped - connection already exists", adapterId);
393394
}
394-
}, RETRY_DELAY_SECONDS, TimeUnit.SECONDS);
395+
}, retryIntervalSeconds, TimeUnit.SECONDS);
395396

396397
// Store future so it can be cancelled if needed
397398
final ScheduledFuture<?> oldFuture = retryFuture.getAndSet(future);

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/client/OpcUaClientConfigurator.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.hivemq.edge.adapters.opcua.client;
1717

1818
import com.hivemq.edge.adapters.opcua.Constants;
19+
import com.hivemq.edge.adapters.opcua.config.OpcUaSpecificAdapterConfig;
1920
import org.eclipse.milo.opcua.sdk.client.OpcUaClientConfigBuilder;
2021
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
2122
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
@@ -32,10 +33,12 @@ public class OpcUaClientConfigurator implements Consumer<OpcUaClientConfigBuilde
3233

3334
private final @NotNull String adapterId;
3435
private final @NotNull ParsedConfig parsedConfig;
36+
private final @NotNull OpcUaSpecificAdapterConfig config;
3537

36-
public OpcUaClientConfigurator(final @NotNull String adapterId, final @NotNull ParsedConfig parsedConfig) {
38+
public OpcUaClientConfigurator(final @NotNull String adapterId, final @NotNull ParsedConfig parsedConfig, final @NotNull OpcUaSpecificAdapterConfig config) {
3739
this.adapterId = adapterId;
3840
this.parsedConfig = parsedConfig;
41+
this.config = config;
3942
}
4043

4144
@Override
@@ -51,18 +54,24 @@ public void accept(final @NotNull OpcUaClientConfigBuilder configBuilder) {
5154
log.info("Using Application URI from certificate: {}", applicationUri);
5255
}
5356

57+
// Convert seconds to milliseconds for SDK configuration
58+
final int sessionTimeoutMs = config.getSessionTimeout() * 1000;
59+
final int requestTimeoutMs = config.getRequestTimeout() * 1000;
60+
final int keepAliveIntervalMs = config.getKeepAliveInterval() * 1000;
61+
5462
configBuilder
5563
.setApplicationName(LocalizedText.english(Constants.OPCUA_APPLICATION_NAME))
5664
.setApplicationUri(applicationUri)
5765
.setProductUri(Constants.OPCUA_PRODUCT_URI)
5866
.setSessionName(() -> Constants.OPCUA_SESSION_NAME_PREFIX + adapterId)
5967
// Configure timeouts to prevent silent disconnects
60-
.setSessionTimeout(UInteger.valueOf(120_000)) // 2 minutes - session renewal interval
61-
.setRequestTimeout(UInteger.valueOf(30_000)) // 30 seconds - request timeout
62-
.setKeepAliveInterval(UInteger.valueOf(10_000)) // 10 seconds - keep-alive ping interval
63-
.setKeepAliveFailuresAllowed(UInteger.valueOf(3)); // 3 failures = 30s before disconnect
68+
.setSessionTimeout(UInteger.valueOf(sessionTimeoutMs))
69+
.setRequestTimeout(UInteger.valueOf(requestTimeoutMs))
70+
.setKeepAliveInterval(UInteger.valueOf(keepAliveIntervalMs))
71+
.setKeepAliveFailuresAllowed(UInteger.valueOf(config.getKeepAliveFailuresAllowed()));
6472

65-
log.info("Configured OPC UA timeouts: session=120s, request=30s, keepAlive=10s, failuresAllowed=3");
73+
log.info("Configured OPC UA timeouts: session={}s, request={}s, keepAlive={}s, failuresAllowed={}",
74+
config.getSessionTimeout(), config.getRequestTimeout(), config.getKeepAliveInterval(), config.getKeepAliveFailuresAllowed());
6675
log.info("TLS is enabled: {}", parsedConfig.tlsEnabled());
6776
if (parsedConfig.tlsEnabled()) {
6877
if (log.isDebugEnabled()) {

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/BidirectionalOpcUaSpecificAdapterConfig.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,14 @@ public BidirectionalOpcUaSpecificAdapterConfig(
3131
@JsonProperty("auth") final @Nullable Auth auth,
3232
@JsonProperty("tls") final @Nullable Tls tls,
3333
@JsonProperty(value = "opcuaToMqtt") final @Nullable OpcUaToMqttConfig opcuaToMqttConfig,
34-
@JsonProperty("security") final @Nullable Security security) {
35-
super(uri, overrideUri, applicationUri, auth, tls, opcuaToMqttConfig, security);
34+
@JsonProperty("security") final @Nullable Security security,
35+
@JsonProperty("sessionTimeout") final @Nullable Integer sessionTimeout,
36+
@JsonProperty("requestTimeout") final @Nullable Integer requestTimeout,
37+
@JsonProperty("keepAliveInterval") final @Nullable Integer keepAliveInterval,
38+
@JsonProperty("keepAliveFailuresAllowed") final @Nullable Integer keepAliveFailuresAllowed,
39+
@JsonProperty("connectionTimeout") final @Nullable Integer connectionTimeout,
40+
@JsonProperty("healthCheckInterval") final @Nullable Integer healthCheckInterval,
41+
@JsonProperty("retryInterval") final @Nullable Integer retryInterval) {
42+
super(uri, overrideUri, applicationUri, auth, tls, opcuaToMqttConfig, security, sessionTimeout, requestTimeout, keepAliveInterval, keepAliveFailuresAllowed, connectionTimeout, healthCheckInterval, retryInterval);
3643
}
3744
}

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/OpcUaSpecificAdapterConfig.java

Lines changed: 109 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ public class OpcUaSpecificAdapterConfig implements ProtocolSpecificAdapterConfig
5858
@JsonProperty("applicationUri")
5959
@ModuleConfigField(title = "Application URI Override",
6060
description = "Overrides the Application URI used for OPC UA client identification. If not specified, the URI from the certificate SAN extension is used, or the default URI 'urn:hivemq:edge:client' as fallback.",
61-
format = ModuleConfigField.FieldType.URI,
62-
required = false)
61+
format = ModuleConfigField.FieldType.URI)
6362
private final @Nullable String applicationUri;
6463

6564
@JsonProperty("auth")
@@ -77,6 +76,62 @@ public class OpcUaSpecificAdapterConfig implements ProtocolSpecificAdapterConfig
7776
description = "The configuration for a data stream from OPC UA to MQTT")
7877
private final @NotNull OpcUaToMqttConfig opcuaToMqttConfig;
7978

79+
@JsonProperty("sessionTimeout")
80+
@ModuleConfigField(title = "Session Timeout (seconds)",
81+
description = "OPC UA session timeout in seconds. Session will be renewed at this interval.",
82+
numberMin = 10,
83+
numberMax = 3600,
84+
defaultValue = "120")
85+
private final int sessionTimeout;
86+
87+
@JsonProperty("requestTimeout")
88+
@ModuleConfigField(title = "Request Timeout (seconds)",
89+
description = "Timeout for OPC UA requests in seconds",
90+
numberMin = 5,
91+
numberMax = 300,
92+
defaultValue = "30")
93+
private final int requestTimeout;
94+
95+
@JsonProperty("keepAliveInterval")
96+
@ModuleConfigField(title = "Keep-Alive Interval (seconds)",
97+
description = "Interval between OPC UA keep-alive pings in seconds",
98+
numberMin = 1,
99+
numberMax = 60,
100+
defaultValue = "10")
101+
private final int keepAliveInterval;
102+
103+
@JsonProperty("keepAliveFailuresAllowed")
104+
@ModuleConfigField(title = "Keep-Alive Failures Allowed",
105+
description = "Number of consecutive keep-alive failures before connection is considered dead",
106+
numberMin = 1,
107+
numberMax = 10,
108+
defaultValue = "3")
109+
private final int keepAliveFailuresAllowed;
110+
111+
@JsonProperty("connectionTimeout")
112+
@ModuleConfigField(title = "Connection Timeout (seconds)",
113+
description = "Timeout for establishing connection to OPC UA server in seconds",
114+
numberMin = 5,
115+
numberMax = 300,
116+
defaultValue = "30")
117+
private final int connectionTimeout;
118+
119+
@JsonProperty("healthCheckInterval")
120+
@ModuleConfigField(title = "Health Check Interval (seconds)",
121+
description = "Interval between connection health checks in seconds",
122+
numberMin = 10,
123+
numberMax = 300,
124+
defaultValue = "30")
125+
private final int healthCheckInterval;
126+
127+
@JsonProperty("retryInterval")
128+
@ModuleConfigField(title = "Retry Interval (seconds)",
129+
description = "Interval between connection retry attempts in seconds",
130+
numberMin = 5,
131+
numberMax = 300,
132+
defaultValue = "30")
133+
private final int retryInterval;
134+
80135
@JsonCreator
81136
public OpcUaSpecificAdapterConfig(
82137
@JsonProperty(value = "uri", required = true) final @NotNull String uri,
@@ -85,7 +140,14 @@ public OpcUaSpecificAdapterConfig(
85140
@JsonProperty("auth") final @Nullable Auth auth,
86141
@JsonProperty("tls") final @Nullable Tls tls,
87142
@JsonProperty(value = "opcuaToMqtt") final @Nullable OpcUaToMqttConfig opcuaToMqttConfig,
88-
@JsonProperty("security") final @Nullable Security security) {
143+
@JsonProperty("security") final @Nullable Security security,
144+
@JsonProperty("sessionTimeout") final @Nullable Integer sessionTimeout,
145+
@JsonProperty("requestTimeout") final @Nullable Integer requestTimeout,
146+
@JsonProperty("keepAliveInterval") final @Nullable Integer keepAliveInterval,
147+
@JsonProperty("keepAliveFailuresAllowed") final @Nullable Integer keepAliveFailuresAllowed,
148+
@JsonProperty("connectionTimeout") final @Nullable Integer connectionTimeout,
149+
@JsonProperty("healthCheckInterval") final @Nullable Integer healthCheckInterval,
150+
@JsonProperty("retryInterval") final @Nullable Integer retryInterval) {
89151
this.uri = uri;
90152
this.overrideUri = requireNonNullElse(overrideUri, false);
91153
this.applicationUri = (applicationUri != null && !applicationUri.isBlank()) ? applicationUri : null;
@@ -94,6 +156,15 @@ public OpcUaSpecificAdapterConfig(
94156
this.opcuaToMqttConfig =
95157
Objects.requireNonNullElseGet(opcuaToMqttConfig, () -> new OpcUaToMqttConfig(1, 1000));
96158
this.security = requireNonNullElse(security, new Security(Constants.DEFAULT_SECURITY_POLICY));
159+
160+
// Timeout configurations with sensible defaults
161+
this.sessionTimeout = requireNonNullElse(sessionTimeout, 120);
162+
this.requestTimeout = requireNonNullElse(requestTimeout, 30);
163+
this.keepAliveInterval = requireNonNullElse(keepAliveInterval, 10);
164+
this.keepAliveFailuresAllowed = requireNonNullElse(keepAliveFailuresAllowed, 3);
165+
this.connectionTimeout = requireNonNullElse(connectionTimeout, 30);
166+
this.healthCheckInterval = requireNonNullElse(healthCheckInterval, 30);
167+
this.retryInterval = requireNonNullElse(retryInterval, 30);
97168
}
98169

99170

@@ -125,7 +196,33 @@ public OpcUaSpecificAdapterConfig(
125196
return applicationUri;
126197
}
127198

199+
public int getSessionTimeout() {
200+
return sessionTimeout;
201+
}
202+
203+
public int getRequestTimeout() {
204+
return requestTimeout;
205+
}
206+
207+
public int getKeepAliveInterval() {
208+
return keepAliveInterval;
209+
}
128210

211+
public int getKeepAliveFailuresAllowed() {
212+
return keepAliveFailuresAllowed;
213+
}
214+
215+
public int getConnectionTimeout() {
216+
return connectionTimeout;
217+
}
218+
219+
public int getHealthCheckInterval() {
220+
return healthCheckInterval;
221+
}
222+
223+
public int getRetryInterval() {
224+
return retryInterval;
225+
}
129226

130227
@Override
131228
public boolean equals(final @Nullable Object o) {
@@ -134,6 +231,13 @@ public boolean equals(final @Nullable Object o) {
134231
}
135232
final OpcUaSpecificAdapterConfig that = (OpcUaSpecificAdapterConfig) o;
136233
return getOverrideUri().equals(that.getOverrideUri()) &&
234+
sessionTimeout == that.sessionTimeout &&
235+
requestTimeout == that.requestTimeout &&
236+
keepAliveInterval == that.keepAliveInterval &&
237+
keepAliveFailuresAllowed == that.keepAliveFailuresAllowed &&
238+
connectionTimeout == that.connectionTimeout &&
239+
healthCheckInterval == that.healthCheckInterval &&
240+
retryInterval == that.retryInterval &&
137241
Objects.equals(id, that.id) &&
138242
Objects.equals(getUri(), that.getUri()) &&
139243
Objects.equals(getApplicationUri(), that.getApplicationUri()) &&
@@ -145,6 +249,7 @@ public boolean equals(final @Nullable Object o) {
145249

146250
@Override
147251
public int hashCode() {
148-
return Objects.hash(getOverrideUri(), id, getUri(), getApplicationUri(), getAuth(), getTls(), getSecurity(), getOpcuaToMqttConfig());
252+
return Objects.hash(getOverrideUri(), id, getUri(), getApplicationUri(), getAuth(), getTls(), getSecurity(), getOpcuaToMqttConfig(),
253+
sessionTimeout, requestTimeout, keepAliveInterval, keepAliveFailuresAllowed, connectionTimeout, healthCheckInterval, retryInterval);
149254
}
150255
}

0 commit comments

Comments
 (0)