Skip to content
6 changes: 6 additions & 0 deletions hivemq-edge/src/main/java/com/hivemq/HiveMQEdgeMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import com.hivemq.http.JaxrsHttpServer;
import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -174,6 +175,11 @@ public void start(final @Nullable EmbeddedExtension embeddedExtension)

public void stop() {
stopGateway();
}

@VisibleForTesting
public void shutdownProtocolAdapters() {
injector.protocolAdapterManager().shutdown();
try {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
} catch (final IllegalStateException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ private void performStop(

try {
hiveMQServer.stop();
hiveMQServer.shutdownProtocolAdapters();
} catch (final Exception ex) {
if (desiredState == State.CLOSED) {
log.error("Exception during running shutdown hook.", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ public void start() {
protocolAdapterConfig.registerConsumer(this::refresh);
}

@VisibleForTesting
public void shutdown() {
protocolAdapters.entrySet().stream().forEach(entry -> {
try {
entry.getValue().stopAsync(true).get();
} catch (final InterruptedException | ExecutionException e) {
log.error("Exception happened while shutting down adapter: ", e);
throw new RuntimeException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming an exception is thrown, the rest of protocol adapters are not stopped. Is this behavior expected?

}
});
}

public void refresh(final @NotNull List<ProtocolAdapterEntity> configs) {
executorService.submit(() -> {
log.info("Refreshing adapters");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaSubscription;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -45,10 +44,16 @@

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static com.hivemq.edge.adapters.opcua.Constants.PROTOCOL_ID_OPCUA;
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;

public class OpcUaClientConnection {
private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaClientConnection.class);
Expand All @@ -72,8 +77,7 @@ public class OpcUaClientConnection {
final @NotNull DataPointFactory dataPointFactory,
final @NotNull EventService eventService,
final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService,
final @NotNull OpcUaSpecificAdapterConfig config,
final @NotNull AtomicReference<UInteger> lastSubscriptionId) {
final @NotNull OpcUaSpecificAdapterConfig config) {
this.config = config;
this.tagStreamingService = tagStreamingService;
this.dataPointFactory = dataPointFactory;
Expand Down Expand Up @@ -120,14 +124,59 @@ synchronized boolean start(final ParsedConfig parsedConfig) {
config.getUri(),
endpointFilter,
ignore -> {},
new OpcUaClientConfigurator(adapterId, parsedConfig));
new OpcUaClientConfigurator(adapterId, parsedConfig, config));
client.addFaultListener(faultListener);
client.connect();

// Add timeout to connection attempt to prevent hanging forever
// Wrap synchronous connect() call with CompletableFuture timeout
final int connectionTimeoutSeconds = config.getConnectionTimeout();
try {
CompletableFuture.runAsync(() -> {
try {
client.connect();
} catch (final UaException e) {
throw new RuntimeException(e);
}
}).get(connectionTimeoutSeconds, TimeUnit.SECONDS);
log.debug("OPC UA client connected successfully for adapter '{}'", adapterId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logging statement is in the right place? If client.connect() throws a UaException, a success will still be printed?

} catch (final TimeoutException e) {
log.error("Connection timeout after {} seconds for OPC UA adapter '{}'", connectionTimeoutSeconds, adapterId);
eventService
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
.withMessage("Connection timeout after " + connectionTimeoutSeconds + " seconds for adapter '" + adapterId + "'")
.withSeverity(Event.SEVERITY.ERROR)
.fire();
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
quietlyCloseClient(client, false, faultListener, null);
return false;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Connection interrupted for OPC UA adapter '{}'", adapterId, e);
eventService
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
.withMessage("Connection interrupted for adapter '" + adapterId + "'")
.withSeverity(Event.SEVERITY.ERROR)
.fire();
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
quietlyCloseClient(client, false, faultListener, null);
return false;
} catch (final ExecutionException e) {
final Throwable cause = e.getCause();
log.error("Connection failed for OPC UA adapter '{}'", adapterId, cause);
eventService
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
.withMessage("Connection failed for adapter '" + adapterId + "': " + cause.getMessage())
.withSeverity(Event.SEVERITY.ERROR)
.fire();
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
quietlyCloseClient(client, false, faultListener, null);
return false;
}
} catch (final UaException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch will not be hit.

log.error("Unable to connect and subscribe to the OPC UA server for adapter '{}'", adapterId, e);
log.error("Unable to create OPC UA client for adapter '{}'", adapterId, e);
eventService
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
.withMessage("Unable to connect and subscribe to the OPC UA server for adapter '" + adapterId + "'")
.withMessage("Unable to create OPC UA client for adapter '" + adapterId + "'")
.withSeverity(Event.SEVERITY.ERROR)
.fire();
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
Expand All @@ -153,15 +202,17 @@ synchronized boolean start(final ParsedConfig parsedConfig) {
final var subscription = subscriptionOptional.get();
log.trace("Creating Subscription for OPC UA client");

context.set(new ConnectionContext(subscription.getClient(), faultListener, activityListener));
context.set(new ConnectionContext(subscription.getClient(), faultListener, activityListener, subscriptionLifecycleHandler));
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.CONNECTED);
client.addSessionActivityListener(activityListener);

log.info("Client created and connected successfully");
return true;
}

synchronized void stop() {
log.info("Stopping OPC UA client");

final ConnectionContext ctx = context.get();
if(ctx != null) {
quietlyCloseClient(ctx.client(),true, ctx.faultListener(), ctx.activityListener());
Expand All @@ -171,13 +222,48 @@ synchronized void stop() {

void destroy() {
log.info("Destroying OPC UA client");

final ConnectionContext ctx = context.get();
if(ctx != null) {
quietlyCloseClient(ctx.client(), false, ctx.faultListener(), ctx.activityListener());
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED);
}
}

/**
* Checks if the connection is healthy by verifying both:
* 1. Session is active (client connection exists and session is present)
* 2. Keep-alive messages are being received (subscription is healthy)
*
* @return true if connection is healthy, false otherwise
*/
public boolean isHealthy() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the one I expect! Nice work.

final ConnectionContext ctx = context.get();
if (ctx == null) {
log.debug("Connection health check failed: no connection context");
return false;
}

try {
// Check 1: Session is active
if (ctx.client().getSession().isEmpty()) {
log.debug("Connection health check failed: session inactive for adapter '{}'", adapterId);
return false;
}

// Check 2: Keep-alive is healthy
if (!ctx.subscriptionHandler().isKeepAliveHealthy()) {
log.debug("Connection health check failed: keep-alive timeout for adapter '{}'", adapterId);
return false;
}

return true;
} catch (final UaException e) {
log.debug("Connection health check failed with exception for adapter '{}'", adapterId, e);
return false;
}
}

@NotNull Optional<OpcUaClient> client() {
final ConnectionContext ctx = context.get();
if(ctx != null) {
Expand Down Expand Up @@ -237,6 +323,9 @@ private static void quietlyCloseClient(
}
}

private record ConnectionContext(@NotNull OpcUaClient client, @NotNull ServiceFaultListener faultListener, @NotNull SessionActivityListener activityListener) {
private record ConnectionContext(@NotNull OpcUaClient client,
@NotNull ServiceFaultListener faultListener,
@NotNull SessionActivityListener activityListener,
@NotNull OpcUaSubscriptionLifecycleHandler subscriptionHandler) {
}
}
Loading
Loading