Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
00e0d94
Allowed to deactivate certificate validation
codepitbull Oct 30, 2025
0763182
Nullability
codepitbull Oct 30, 2025
2762122
Add noChecks option
codepitbull Nov 3, 2025
0884be3
Add better connection and timeout handling
codepitbull Nov 3, 2025
b4696a9
Disconnect monitoring
codepitbull Nov 3, 2025
275d9d8
Test cleanup
codepitbull Nov 3, 2025
8eb0273
Extract reconnect and isHealthy methods
codepitbull Nov 3, 2025
143659d
Add connection health monitoring and reconnect
codepitbull Nov 3, 2025
4110ded
Finalized the reconnect loop
codepitbull Nov 3, 2025
e0f9d0e
Add default for noChecks and check if adapter is stopped
codepitbull Nov 4, 2025
a22f84d
Allow explicit shutdown of the ProtocolAdapterManager for testing
codepitbull Nov 4, 2025
372de2c
Default value for TLS
codepitbull Nov 4, 2025
8187fee
Adjust min value for connectionTimeout
codepitbull Nov 5, 2025
6b7a593
Move connection options into their own config record
codepitbull Nov 5, 2025
0e400d7
Move to long to prepare for switch to ms
codepitbull Nov 5, 2025
dfa03f4
Now using milliseconds
codepitbull Nov 5, 2025
9dc654c
Small cleanup
codepitbull Nov 5, 2025
be913bd
Fixed property name
codepitbull Nov 5, 2025
bd3c441
Naming cleanup
codepitbull Nov 5, 2025
72781b5
Update modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge…
codepitbull Nov 5, 2025
eecbab5
seconds -> milliseconds
codepitbull Nov 5, 2025
76ed25c
Update modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge…
codepitbull Nov 5, 2025
fc93cb7
Adjust defaults
codepitbull Nov 5, 2025
0eeebe5
Naming
codepitbull Nov 5, 2025
b6a2b77
Merge branch 'master' into feature/disable-cert-validation
codepitbull Nov 5, 2025
ab3065e
UI adjustment
codepitbull Nov 5, 2025
3522a58
Merge branch 'master' into feature/disable-cert-validation
caoccao Nov 6, 2025
ede0cc9
fix: Add a daemon thread to protect attemptStartingConsumers
caoccao Nov 6, 2025
6929af0
fix: Exclude the first call to status listener
caoccao Nov 6, 2025
4e1305a
Fix scheduler restart
codepitbull Nov 6, 2025
71d7014
Make executor create non-daemon-threads
codepitbull Nov 6, 2025
87ca943
Reverted change that caused too many additonal errors
codepitbull Nov 6, 2025
fe3b549
Fixed license headers
codepitbull Nov 6, 2025
2364836
FIx fieldtype
codepitbull Nov 6, 2025
422ee72
prevent operations from executing after stop() has been called
marregui Nov 6, 2025
d3ecd38
Fixed a timeout
codepitbull Nov 6, 2025
816bde1
fix: Fix Modbus stop()
caoccao Nov 6, 2025
2b374f1
clean resources in database adapters
marregui Nov 6, 2025
5248065
synchronise list of northbound consumers
marregui Nov 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hivemq-edge/src/main/java/com/hivemq/HiveMQEdgeMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import com.hivemq.http.JaxrsHttpServer;
import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -174,6 +175,11 @@ public void start(final @Nullable EmbeddedExtension embeddedExtension)

public void stop() {
stopGateway();
}

@VisibleForTesting
public void shutdownProtocolAdapters() {
injector.protocolAdapterManager().shutdown();
try {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
} catch (final IllegalStateException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ private void performStop(

try {
hiveMQServer.stop();
hiveMQServer.shutdownProtocolAdapters();
} catch (final Exception ex) {
if (desiredState == State.CLOSED) {
log.error("Exception during running shutdown hook.", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,17 @@ public void start() {
protocolAdapterConfig.registerConsumer(this::refresh);
}

@VisibleForTesting
public void shutdown() {
protocolAdapters.entrySet().stream().forEach(entry -> {
try {
entry.getValue().stopAsync(true).get();
} catch (final InterruptedException | ExecutionException e) {
log.error("Exception happened while shutting down adapter: ", e);
}
});
}

public void refresh(final @NotNull List<ProtocolAdapterEntity> configs) {
executorService.submit(() -> {
log.info("Refreshing adapters");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaSubscription;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -45,10 +44,13 @@

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static com.hivemq.edge.adapters.opcua.Constants.PROTOCOL_ID_OPCUA;
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;

public class OpcUaClientConnection {
private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaClientConnection.class);
Expand All @@ -72,8 +74,7 @@ public class OpcUaClientConnection {
final @NotNull DataPointFactory dataPointFactory,
final @NotNull EventService eventService,
final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService,
final @NotNull OpcUaSpecificAdapterConfig config,
final @NotNull AtomicReference<UInteger> lastSubscriptionId) {
final @NotNull OpcUaSpecificAdapterConfig config) {
this.config = config;
this.tagStreamingService = tagStreamingService;
this.dataPointFactory = dataPointFactory;
Expand Down Expand Up @@ -120,14 +121,59 @@ synchronized boolean start(final ParsedConfig parsedConfig) {
config.getUri(),
endpointFilter,
ignore -> {},
new OpcUaClientConfigurator(adapterId, parsedConfig));
new OpcUaClientConfigurator(adapterId, parsedConfig, config));
client.addFaultListener(faultListener);
client.connect();

// Add timeout to connection attempt to prevent hanging forever
// Wrap synchronous connect() call with CompletableFuture timeout
final long connectionTimeoutSecondsMs = config.getConnectionOptions().connectionTimeoutMs();
try {
CompletableFuture.runAsync(() -> {
try {
client.connect();
} catch (final UaException e) {
throw new RuntimeException(e);
}
}).get(connectionTimeoutSecondsMs, TimeUnit.MILLISECONDS);
log.debug("OPC UA client connected successfully for adapter '{}'", adapterId);
} catch (final TimeoutException e) {
log.error("Connection timeout after {} milliseconds for OPC UA adapter '{}'", connectionTimeoutSecondsMs, adapterId);
eventService
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
.withMessage("Connection timeout after " + connectionTimeoutSecondsMs + " milliseconds for adapter '" + adapterId + "'")
.withSeverity(Event.SEVERITY.ERROR)
.fire();
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
quietlyCloseClient(client, false, faultListener, null);
return false;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Connection interrupted for OPC UA adapter '{}'", adapterId, e);
eventService
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
.withMessage("Connection interrupted for adapter '" + adapterId + "'")
.withSeverity(Event.SEVERITY.ERROR)
.fire();
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
quietlyCloseClient(client, false, faultListener, null);
return false;
} catch (final ExecutionException e) {
final Throwable cause = e.getCause();
log.error("Connection failed for OPC UA adapter '{}'", adapterId, cause);
eventService
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
.withMessage("Connection failed for adapter '" + adapterId + "': " + cause.getMessage())
.withSeverity(Event.SEVERITY.ERROR)
.fire();
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
quietlyCloseClient(client, false, faultListener, null);
return false;
}
} catch (final UaException e) {
log.error("Unable to connect and subscribe to the OPC UA server for adapter '{}'", adapterId, e);
log.error("Unable to create OPC UA client for adapter '{}'", adapterId, e);
eventService
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
.withMessage("Unable to connect and subscribe to the OPC UA server for adapter '" + adapterId + "'")
.withMessage("Unable to create OPC UA client for adapter '" + adapterId + "'")
.withSeverity(Event.SEVERITY.ERROR)
.fire();
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
Expand All @@ -153,15 +199,17 @@ synchronized boolean start(final ParsedConfig parsedConfig) {
final var subscription = subscriptionOptional.get();
log.trace("Creating Subscription for OPC UA client");

context.set(new ConnectionContext(subscription.getClient(), faultListener, activityListener));
context.set(new ConnectionContext(subscription.getClient(), faultListener, activityListener, subscriptionLifecycleHandler));
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.CONNECTED);
client.addSessionActivityListener(activityListener);

log.info("Client created and connected successfully");
return true;
}

synchronized void stop() {
log.info("Stopping OPC UA client");

final ConnectionContext ctx = context.get();
if(ctx != null) {
quietlyCloseClient(ctx.client(),true, ctx.faultListener(), ctx.activityListener());
Expand All @@ -171,13 +219,48 @@ synchronized void stop() {

void destroy() {
log.info("Destroying OPC UA client");

final ConnectionContext ctx = context.get();
if(ctx != null) {
quietlyCloseClient(ctx.client(), false, ctx.faultListener(), ctx.activityListener());
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED);
}
}

/**
* Checks if the connection is healthy by verifying both:
* 1. Session is active (client connection exists and session is present)
* 2. Keep-alive messages are being received (subscription is healthy)
*
* @return true if connection is healthy, false otherwise
*/
public boolean isHealthy() {
final ConnectionContext ctx = context.get();
if (ctx == null) {
log.debug("Connection health check failed: no connection context");
return false;
}

try {
// Check 1: Session is active
if (ctx.client().getSession().isEmpty()) {
log.debug("Connection health check failed: session inactive for adapter '{}'", adapterId);
return false;
}

// Check 2: Keep-alive is healthy
if (!ctx.subscriptionHandler().isKeepAliveHealthy()) {
log.debug("Connection health check failed: keep-alive timeout for adapter '{}'", adapterId);
return false;
}

return true;
} catch (final UaException e) {
log.debug("Connection health check failed with exception for adapter '{}'", adapterId, e);
return false;
}
}

@NotNull Optional<OpcUaClient> client() {
final ConnectionContext ctx = context.get();
if(ctx != null) {
Expand Down Expand Up @@ -237,6 +320,9 @@ private static void quietlyCloseClient(
}
}

private record ConnectionContext(@NotNull OpcUaClient client, @NotNull ServiceFaultListener faultListener, @NotNull SessionActivityListener activityListener) {
private record ConnectionContext(@NotNull OpcUaClient client,
@NotNull ServiceFaultListener faultListener,
@NotNull SessionActivityListener activityListener,
@NotNull OpcUaSubscriptionLifecycleHandler subscriptionHandler) {
}
}
Loading
Loading