diff --git a/hivemq-edge/src/main/java/com/hivemq/HiveMQEdgeMain.java b/hivemq-edge/src/main/java/com/hivemq/HiveMQEdgeMain.java index aad3bf4dcf..f5be957ebc 100644 --- a/hivemq-edge/src/main/java/com/hivemq/HiveMQEdgeMain.java +++ b/hivemq-edge/src/main/java/com/hivemq/HiveMQEdgeMain.java @@ -34,6 +34,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import com.hivemq.http.JaxrsHttpServer; +import org.jetbrains.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,6 +175,11 @@ public void start(final @Nullable EmbeddedExtension embeddedExtension) public void stop() { stopGateway(); + } + + @VisibleForTesting + public void shutdownProtocolAdapters() { + injector.protocolAdapterManager().shutdown(); try { Runtime.getRuntime().removeShutdownHook(shutdownThread); } catch (final IllegalStateException ignored) { diff --git a/hivemq-edge/src/main/java/com/hivemq/embedded/internal/EmbeddedHiveMQImpl.java b/hivemq-edge/src/main/java/com/hivemq/embedded/internal/EmbeddedHiveMQImpl.java index 09cda3fe67..f23f9c3799 100644 --- a/hivemq-edge/src/main/java/com/hivemq/embedded/internal/EmbeddedHiveMQImpl.java +++ b/hivemq-edge/src/main/java/com/hivemq/embedded/internal/EmbeddedHiveMQImpl.java @@ -213,6 +213,7 @@ private void performStop( try { hiveMQServer.stop(); + hiveMQServer.shutdownProtocolAdapters(); } catch (final Exception ex) { if (desiredState == State.CLOSED) { log.error("Exception during running shutdown hook.", ex); diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java b/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java index 7fc4c32645..c3558dd382 100644 --- a/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java @@ -152,6 +152,18 @@ public void start() { protocolAdapterConfig.registerConsumer(this::refresh); } + @VisibleForTesting + public void shutdown() { + protocolAdapters.entrySet().stream().forEach(entry -> { + try { + entry.getValue().stopAsync(true).get(); + } catch (final InterruptedException | ExecutionException e) { + log.error("Exception happened while shutting down adapter: ", e); + throw new RuntimeException(e); + } + }); + } + public void refresh(final @NotNull List configs) { executorService.submit(() -> { log.info("Refreshing adapters"); diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java index 6984a6ac93..5ddddad3d7 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java @@ -36,7 +36,6 @@ import org.eclipse.milo.opcua.sdk.client.SessionActivityListener; import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaSubscription; import org.eclipse.milo.opcua.stack.core.UaException; -import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger; import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -45,10 +44,16 @@ import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import static com.hivemq.edge.adapters.opcua.Constants.PROTOCOL_ID_OPCUA; -import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint; public class OpcUaClientConnection { private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaClientConnection.class); @@ -72,8 +77,7 @@ public class OpcUaClientConnection { final @NotNull DataPointFactory dataPointFactory, final @NotNull EventService eventService, final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService, - final @NotNull OpcUaSpecificAdapterConfig config, - final @NotNull AtomicReference lastSubscriptionId) { + final @NotNull OpcUaSpecificAdapterConfig config) { this.config = config; this.tagStreamingService = tagStreamingService; this.dataPointFactory = dataPointFactory; @@ -120,14 +124,59 @@ synchronized boolean start(final ParsedConfig parsedConfig) { config.getUri(), endpointFilter, ignore -> {}, - new OpcUaClientConfigurator(adapterId, parsedConfig)); + new OpcUaClientConfigurator(adapterId, parsedConfig, config)); client.addFaultListener(faultListener); - client.connect(); + + // Add timeout to connection attempt to prevent hanging forever + // Wrap synchronous connect() call with CompletableFuture timeout + final int connectionTimeoutSeconds = config.getConnectionTimeout(); + try { + CompletableFuture.runAsync(() -> { + try { + client.connect(); + } catch (final UaException e) { + throw new RuntimeException(e); + } + }).get(connectionTimeoutSeconds, TimeUnit.SECONDS); + log.debug("OPC UA client connected successfully for adapter '{}'", adapterId); + } catch (final TimeoutException e) { + log.error("Connection timeout after {} seconds for OPC UA adapter '{}'", connectionTimeoutSeconds, adapterId); + eventService + .createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA) + .withMessage("Connection timeout after " + connectionTimeoutSeconds + " seconds for adapter '" + adapterId + "'") + .withSeverity(Event.SEVERITY.ERROR) + .fire(); + protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR); + quietlyCloseClient(client, false, faultListener, null); + return false; + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Connection interrupted for OPC UA adapter '{}'", adapterId, e); + eventService + .createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA) + .withMessage("Connection interrupted for adapter '" + adapterId + "'") + .withSeverity(Event.SEVERITY.ERROR) + .fire(); + protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR); + quietlyCloseClient(client, false, faultListener, null); + return false; + } catch (final ExecutionException e) { + final Throwable cause = e.getCause(); + log.error("Connection failed for OPC UA adapter '{}'", adapterId, cause); + eventService + .createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA) + .withMessage("Connection failed for adapter '" + adapterId + "': " + cause.getMessage()) + .withSeverity(Event.SEVERITY.ERROR) + .fire(); + protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR); + quietlyCloseClient(client, false, faultListener, null); + return false; + } } catch (final UaException e) { - log.error("Unable to connect and subscribe to the OPC UA server for adapter '{}'", adapterId, e); + log.error("Unable to create OPC UA client for adapter '{}'", adapterId, e); eventService .createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA) - .withMessage("Unable to connect and subscribe to the OPC UA server for adapter '" + adapterId + "'") + .withMessage("Unable to create OPC UA client for adapter '" + adapterId + "'") .withSeverity(Event.SEVERITY.ERROR) .fire(); protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR); @@ -153,15 +202,17 @@ synchronized boolean start(final ParsedConfig parsedConfig) { final var subscription = subscriptionOptional.get(); log.trace("Creating Subscription for OPC UA client"); - context.set(new ConnectionContext(subscription.getClient(), faultListener, activityListener)); + context.set(new ConnectionContext(subscription.getClient(), faultListener, activityListener, subscriptionLifecycleHandler)); protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.CONNECTED); client.addSessionActivityListener(activityListener); + log.info("Client created and connected successfully"); return true; } synchronized void stop() { log.info("Stopping OPC UA client"); + final ConnectionContext ctx = context.get(); if(ctx != null) { quietlyCloseClient(ctx.client(),true, ctx.faultListener(), ctx.activityListener()); @@ -171,6 +222,7 @@ synchronized void stop() { void destroy() { log.info("Destroying OPC UA client"); + final ConnectionContext ctx = context.get(); if(ctx != null) { quietlyCloseClient(ctx.client(), false, ctx.faultListener(), ctx.activityListener()); @@ -178,6 +230,40 @@ void destroy() { } } + /** + * Checks if the connection is healthy by verifying both: + * 1. Session is active (client connection exists and session is present) + * 2. Keep-alive messages are being received (subscription is healthy) + * + * @return true if connection is healthy, false otherwise + */ + public boolean isHealthy() { + final ConnectionContext ctx = context.get(); + if (ctx == null) { + log.debug("Connection health check failed: no connection context"); + return false; + } + + try { + // Check 1: Session is active + if (ctx.client().getSession().isEmpty()) { + log.debug("Connection health check failed: session inactive for adapter '{}'", adapterId); + return false; + } + + // Check 2: Keep-alive is healthy + if (!ctx.subscriptionHandler().isKeepAliveHealthy()) { + log.debug("Connection health check failed: keep-alive timeout for adapter '{}'", adapterId); + return false; + } + + return true; + } catch (final UaException e) { + log.debug("Connection health check failed with exception for adapter '{}'", adapterId, e); + return false; + } + } + @NotNull Optional client() { final ConnectionContext ctx = context.get(); if(ctx != null) { @@ -237,6 +323,9 @@ private static void quietlyCloseClient( } } - private record ConnectionContext(@NotNull OpcUaClient client, @NotNull ServiceFaultListener faultListener, @NotNull SessionActivityListener activityListener) { + private record ConnectionContext(@NotNull OpcUaClient client, + @NotNull ServiceFaultListener faultListener, + @NotNull SessionActivityListener activityListener, + @NotNull OpcUaSubscriptionLifecycleHandler subscriptionHandler) { } } diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java index f0f5bc6611..7d7361e4ef 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java @@ -27,6 +27,7 @@ import com.hivemq.adapter.sdk.api.model.ProtocolAdapterStopOutput; import com.hivemq.adapter.sdk.api.schema.TagSchemaCreationInput; import com.hivemq.adapter.sdk.api.schema.TagSchemaCreationOutput; +import com.hivemq.adapter.sdk.api.services.ModuleServices; import com.hivemq.adapter.sdk.api.services.ProtocolAdapterMetricsService; import com.hivemq.adapter.sdk.api.state.ProtocolAdapterState; import com.hivemq.adapter.sdk.api.writing.WritingContext; @@ -46,7 +47,6 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; -import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -55,6 +55,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -72,7 +76,17 @@ public class OpcUaProtocolAdapter implements WritingProtocolAdapter { private final @NotNull DataPointFactory dataPointFactory; private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService; private final @NotNull OpcUaSpecificAdapterConfig config; - private final @NotNull AtomicReference lastSubscriptionId = new AtomicReference<>(); + private final @NotNull ScheduledExecutorService retryScheduler = Executors.newSingleThreadScheduledExecutor(); + private final @NotNull AtomicReference> retryFuture = new AtomicReference<>(); + private final @NotNull ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(); + private final @NotNull AtomicReference> healthCheckFuture = new AtomicReference<>(); + + // Stored for reconnection - set during start() + private volatile ParsedConfig parsedConfig; + private volatile ModuleServices moduleServices; + + // Flag to prevent scheduling after stop + private volatile boolean stopped = false; public OpcUaProtocolAdapter( final @NotNull ProtocolAdapterInformation adapterInformation, @@ -98,15 +112,22 @@ public synchronized void start( final @NotNull ProtocolAdapterStartInput input, final @NotNull ProtocolAdapterStartOutput output) { log.info("Starting OPC UA protocol adapter {}", adapterId); + + // Reset stopped flag + stopped = false; + final ParsedConfig parsedConfig; final var result = ParsedConfig.fromConfig(config); - if (result instanceof final Failure failure) { - log.error("Failed to parse configuration for OPC UA client: {}", failure.failure()); - output.failStart(new IllegalStateException(failure.failure()), + if (result instanceof Failure(final String failure)) { + log.error("Failed to parse configuration for OPC UA client: {}", failure); + output.failStart(new IllegalStateException(failure), "Failed to parse configuration for OPC UA client"); return; - } else if (result instanceof final Success success) { - parsedConfig = success.result(); + } else if (result instanceof Success(final ParsedConfig result1)) { + parsedConfig = result1; + // Store for reconnection + this.parsedConfig = result1; + this.moduleServices = input.moduleServices(); } else { output.failStart(new IllegalStateException("Unexpected result type: " + result.getClass().getName()), "Failed to parse configuration for OPC UA client"); @@ -121,22 +142,23 @@ public synchronized void start( dataPointFactory, input.moduleServices().eventService(), protocolAdapterMetricsService, - config, - lastSubscriptionId))) { + config))) { protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED); - CompletableFuture.supplyAsync(() -> conn.start(parsedConfig)).whenComplete((success, throwable) -> { - if (!success || throwable != null) { - this.opcUaClientConnection.set(null); - protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR); - log.error("Failed to start OPC UA client", throwable); - } - }); + // Attempt initial connection asynchronously + attemptConnection(conn, parsedConfig, input); + + // Adapter starts successfully even if connection isn't established yet + // Hardware may come online later and automatic retry will connect log.info("Successfully started OPC UA protocol adapter {}", adapterId); output.startedSuccessfully(); } else { - log.warn("Tried starting already started OPC UA protocol adapter {}", adapterId); + log.error("Cannot start OPC UA protocol adapter '{}' - adapter is already started", adapterId); + output.failStart( + new IllegalStateException("Adapter already started"), + "Cannot start already started adapter. Please stop the adapter first." + ); } } @@ -145,6 +167,21 @@ public synchronized void stop( final @NotNull ProtocolAdapterStopInput input, final @NotNull ProtocolAdapterStopOutput output) { log.info("Stopping OPC UA protocol adapter {}", adapterId); + + // Set stopped flag to prevent new scheduling + stopped = true; + + // Cancel any pending retries and health checks + cancelRetry(); + cancelHealthCheck(); + + // Shutdown schedulers immediately to prevent new tasks + shutdownSchedulers(); + + // Clear stored configuration to prevent reconnection after stop + this.parsedConfig = null; + this.moduleServices = null; + final OpcUaClientConnection conn = opcUaClientConnection.getAndSet(null); if (conn != null) { conn.stop(); @@ -154,9 +191,158 @@ public synchronized void stop( output.stoppedSuccessfully(); } + /** + * Triggers reconnection by stopping the current connection and creating a new one. + * Used for runtime reconnection when health check detects issues. + * Requires that start() has been called previously to initialize parsedConfig and moduleServices. + */ + private void reconnect() { + // Check if adapter has been stopped + if (stopped) { + log.debug("Skipping reconnection for adapter '{}' - adapter has been stopped", adapterId); + return; + } + + log.info("Reconnecting OPC UA adapter '{}'", adapterId); + + // Verify we have the necessary configuration + if (parsedConfig == null || moduleServices == null) { + log.error("Cannot reconnect OPC UA adapter '{}' - adapter has not been started yet", adapterId); + return; + } + + // Cancel any pending retries and health checks + cancelRetry(); + cancelHealthCheck(); + + // Stop and clean up current connection + final OpcUaClientConnection oldConn = opcUaClientConnection.getAndSet(null); + if (oldConn != null) { + oldConn.stop(); + log.debug("Stopped old connection for OPC UA adapter '{}'", adapterId); + } + + // Create new connection + final OpcUaClientConnection newConn = new OpcUaClientConnection(adapterId, + tagList, + protocolAdapterState, + moduleServices.protocolAdapterTagStreamingService(), + dataPointFactory, + moduleServices.eventService(), + protocolAdapterMetricsService, + config); + + // Set as current connection and attempt connection with retry logic + protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED); + if (opcUaClientConnection.compareAndSet(null, newConn)) { + // Create a minimal ProtocolAdapterStartInput for attemptConnection + final ProtocolAdapterStartInput input = new ProtocolAdapterStartInput() { + @Override + public ModuleServices moduleServices() { + return moduleServices; + } + }; + attemptConnection(newConn, parsedConfig, input); + } else { + log.warn("OPC UA adapter '{}' reconnect failed - another connection was created concurrently", adapterId); + } + } + + /** + * Schedules periodic health check that monitors connection health and triggers reconnection if needed. + */ + private void scheduleHealthCheck() { + // Check if adapter has been stopped + if (stopped) { + log.debug("Skipping health check scheduling for adapter '{}' - adapter has been stopped", adapterId); + return; + } + + final int healthCheckIntervalSeconds = config.getHealthCheckInterval(); + final ScheduledFuture future = healthCheckScheduler.scheduleAtFixedRate(() -> { + // Check if adapter was stopped before health check executes + if (stopped) { + log.debug("Health check skipped for adapter '{}' - adapter was stopped", adapterId); + return; + } + + final OpcUaClientConnection conn = opcUaClientConnection.get(); + if (conn == null) { + log.debug("Health check skipped - no active connection for adapter '{}'", adapterId); + return; + } + + if (!conn.isHealthy()) { + if (config.isAutoReconnect()) { + log.warn("Health check failed for adapter '{}' - triggering automatic reconnection", adapterId); + reconnect(); + } else { + log.warn("Health check failed for adapter '{}' - automatic reconnection is disabled", adapterId); + protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR); + } + } else { + log.debug("Health check passed for adapter '{}'", adapterId); + } + }, healthCheckIntervalSeconds, healthCheckIntervalSeconds, TimeUnit.SECONDS); + + // Store future so it can be cancelled if needed + final ScheduledFuture oldFuture = healthCheckFuture.getAndSet(future); + if (oldFuture != null && !oldFuture.isDone()) { + oldFuture.cancel(false); + } + + log.debug("Scheduled connection health check every {} seconds for adapter '{}'", + healthCheckIntervalSeconds, adapterId); + } + + /** + * Cancels any pending health check. + */ + private void cancelHealthCheck() { + final ScheduledFuture future = healthCheckFuture.getAndSet(null); + if (future != null && !future.isDone()) { + future.cancel(false); + log.debug("Cancelled health check for adapter '{}'", adapterId); + } + } + + /** + * Shuts down both retry and health check schedulers. + * Uses immediate shutdown to cancel all pending tasks. + */ + private void shutdownSchedulers() { + // Shutdown retry scheduler - use shutdownNow() to cancel pending tasks immediately + if (!retryScheduler.isShutdown()) { + retryScheduler.shutdownNow(); + try { + retryScheduler.awaitTermination(5, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + // Shutdown health check scheduler - use shutdownNow() to cancel pending tasks immediately + if (!healthCheckScheduler.isShutdown()) { + healthCheckScheduler.shutdownNow(); + try { + healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + @Override public void destroy() { log.info("Destroying OPC UA protocol adapter {}", adapterId); + + // Cancel any pending retries and health checks + cancelRetry(); + cancelHealthCheck(); + + // Shutdown schedulers (if not already shutdown in stop()) + shutdownSchedulers(); + final OpcUaClientConnection conn = opcUaClientConnection.getAndSet(null); if (conn != null) { CompletableFuture.runAsync(() -> { @@ -300,4 +486,99 @@ public void createTagSchema( public @NotNull ProtocolAdapterState getProtocolAdapterState() { return protocolAdapterState; } + + /** + * Attempts to establish connection to OPC UA server. + * On failure, schedules automatic retry after configured retry interval. + */ + private void attemptConnection( + final @NotNull OpcUaClientConnection conn, + final @NotNull ParsedConfig parsedConfig, + final @NotNull ProtocolAdapterStartInput input) { + + CompletableFuture.supplyAsync(() -> conn.start(parsedConfig)).whenComplete((success, throwable) -> { + if (success && throwable == null) { + // Connection succeeded - cancel any pending retries and start health check + cancelRetry(); + scheduleHealthCheck(); + log.info("OPC UA adapter '{}' connected successfully", adapterId); + } else { + // Connection failed - clean up and schedule retry + this.opcUaClientConnection.set(null); + protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR); + + final int retryIntervalSeconds = config.getRetryInterval(); + if (throwable != null) { + log.warn("OPC UA adapter '{}' connection failed, will retry in {} seconds", + adapterId, retryIntervalSeconds, throwable); + } else { + log.warn("OPC UA adapter '{}' connection returned false, will retry in {} seconds", + adapterId, retryIntervalSeconds); + } + + // Schedule retry attempt + scheduleRetry(parsedConfig, input); + } + }); + } + + /** + * Schedules a retry attempt after configured retry interval. + */ + private void scheduleRetry( + final @NotNull ParsedConfig parsedConfig, + final @NotNull ProtocolAdapterStartInput input) { + + // Check if adapter has been stopped + if (stopped) { + log.debug("Skipping retry scheduling for adapter '{}' - adapter has been stopped", adapterId); + return; + } + + final int retryIntervalSeconds = config.getRetryInterval(); + final ScheduledFuture future = retryScheduler.schedule(() -> { + // Check if adapter was stopped before retry executes + if (stopped || this.parsedConfig == null || this.moduleServices == null) { + log.debug("OPC UA adapter '{}' retry cancelled - adapter was stopped", adapterId); + return; + } + + log.info("Retrying connection for OPC UA adapter '{}'", adapterId); + + // Create new connection object for retry + final OpcUaClientConnection newConn = new OpcUaClientConnection(adapterId, + tagList, + protocolAdapterState, + this.moduleServices.protocolAdapterTagStreamingService(), + dataPointFactory, + this.moduleServices.eventService(), + protocolAdapterMetricsService, + config); + + // Set as current connection and attempt + protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED); + if (opcUaClientConnection.compareAndSet(null, newConn)) { + attemptConnection(newConn, this.parsedConfig, input); + } else { + log.debug("OPC UA adapter '{}' retry skipped - connection already exists", adapterId); + } + }, retryIntervalSeconds, TimeUnit.SECONDS); + + // Store future so it can be cancelled if needed + final ScheduledFuture oldFuture = retryFuture.getAndSet(future); + if (oldFuture != null && !oldFuture.isDone()) { + oldFuture.cancel(false); + } + } + + /** + * Cancels any pending retry attempts. + */ + private void cancelRetry() { + final ScheduledFuture future = retryFuture.getAndSet(null); + if (future != null && !future.isDone()) { + future.cancel(false); + log.debug("Cancelled pending retry for OPC UA adapter '{}'", adapterId); + } + } } diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/client/OpcUaClientConfigurator.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/client/OpcUaClientConfigurator.java index d4ee44174f..6f96e25544 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/client/OpcUaClientConfigurator.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/client/OpcUaClientConfigurator.java @@ -16,8 +16,10 @@ package com.hivemq.edge.adapters.opcua.client; import com.hivemq.edge.adapters.opcua.Constants; +import com.hivemq.edge.adapters.opcua.config.OpcUaSpecificAdapterConfig; import org.eclipse.milo.opcua.sdk.client.OpcUaClientConfigBuilder; import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; +import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,10 +33,12 @@ public class OpcUaClientConfigurator implements Consumer Constants.OPCUA_SESSION_NAME_PREFIX + adapterId); + .setSessionName(() -> Constants.OPCUA_SESSION_NAME_PREFIX + adapterId) + // Configure timeouts to prevent silent disconnects + .setSessionTimeout(UInteger.valueOf(sessionTimeoutMs)) + .setRequestTimeout(UInteger.valueOf(requestTimeoutMs)) + .setKeepAliveInterval(UInteger.valueOf(keepAliveIntervalMs)) + .setKeepAliveFailuresAllowed(UInteger.valueOf(config.getKeepAliveFailuresAllowed())); + log.info("Configured OPC UA timeouts: session={}s, request={}s, keepAlive={}s, failuresAllowed={}", + config.getSessionTimeout(), config.getRequestTimeout(), config.getKeepAliveInterval(), config.getKeepAliveFailuresAllowed()); log.info("TLS is enabled: {}", parsedConfig.tlsEnabled()); if (parsedConfig.tlsEnabled()) { if (log.isDebugEnabled()) { diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/client/ParsedConfig.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/client/ParsedConfig.java index 669bd4f551..cd58c570df 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/client/ParsedConfig.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/client/ParsedConfig.java @@ -19,6 +19,7 @@ import com.hivemq.edge.adapters.opcua.config.BasicAuth; import com.hivemq.edge.adapters.opcua.config.Keystore; import com.hivemq.edge.adapters.opcua.config.OpcUaSpecificAdapterConfig; +import com.hivemq.edge.adapters.opcua.config.TlsChecks; import com.hivemq.edge.adapters.opcua.config.Truststore; import com.hivemq.edge.adapters.opcua.config.X509Auth; import com.hivemq.edge.adapters.opcua.security.CertificateTrustListManager; @@ -57,7 +58,7 @@ public static Result fromConfig(final OpcUaSpecificAdapter CertificateValidator certValidator = null; if (tlsEnabled) { final var truststore = adapterConfig.getTls().truststore(); - final var certOptional = getTrustedCerts(truststore).map(ParsedConfig::createServerCertificateValidator); + final var certOptional = getTrustedCerts(truststore).map(trustedCerts -> createServerCertificateValidator(trustedCerts, adapterConfig.getTls().tlsChecks())); if (certOptional.isEmpty()) { return Failure.of("Failed to create certificate validator, check truststore configuration"); } @@ -124,10 +125,18 @@ public static Result fromConfig(final OpcUaSpecificAdapter return Optional.of(KeystoreUtil.getCertificatesFromDefaultTruststore()); } - private static @NotNull CertificateValidator createServerCertificateValidator(final @NotNull List trustedCerts) { - return new DefaultClientCertificateValidator(new CertificateTrustListManager(trustedCerts), - Set.of(ValidationCheck.VALIDITY, ValidationCheck.REVOCATION, ValidationCheck.REVOCATION_LISTS), - new MemoryCertificateQuarantine()); + private static @NotNull CertificateValidator createServerCertificateValidator(final @NotNull List trustedCerts, final @NotNull TlsChecks tlsChecks) { + return switch (tlsChecks) { + case NONE -> new DefaultClientCertificateValidator(new CertificateTrustListManager(trustedCerts), + ValidationCheck.NO_OPTIONAL_CHECKS, + new MemoryCertificateQuarantine()); + case STANDARD -> new DefaultClientCertificateValidator(new CertificateTrustListManager(trustedCerts), + Set.of(ValidationCheck.APPLICATION_URI, ValidationCheck.VALIDITY, ValidationCheck.REVOCATION, ValidationCheck.REVOCATION_LISTS), + new MemoryCertificateQuarantine()); + case ALL -> new DefaultClientCertificateValidator(new CertificateTrustListManager(trustedCerts), + ValidationCheck.ALL_OPTIONAL_CHECKS, + new MemoryCertificateQuarantine()); + }; } private static @NotNull Optional getKeyPairWithChain(final @NotNull Keystore keystore) { diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/BidirectionalOpcUaSpecificAdapterConfig.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/BidirectionalOpcUaSpecificAdapterConfig.java index c5b1947411..eaeaa06304 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/BidirectionalOpcUaSpecificAdapterConfig.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/BidirectionalOpcUaSpecificAdapterConfig.java @@ -31,7 +31,15 @@ public BidirectionalOpcUaSpecificAdapterConfig( @JsonProperty("auth") final @Nullable Auth auth, @JsonProperty("tls") final @Nullable Tls tls, @JsonProperty(value = "opcuaToMqtt") final @Nullable OpcUaToMqttConfig opcuaToMqttConfig, - @JsonProperty("security") final @Nullable Security security) { - super(uri, overrideUri, applicationUri, auth, tls, opcuaToMqttConfig, security); + @JsonProperty("security") final @Nullable Security security, + @JsonProperty("sessionTimeout") final @Nullable Integer sessionTimeout, + @JsonProperty("requestTimeout") final @Nullable Integer requestTimeout, + @JsonProperty("keepAliveInterval") final @Nullable Integer keepAliveInterval, + @JsonProperty("keepAliveFailuresAllowed") final @Nullable Integer keepAliveFailuresAllowed, + @JsonProperty("connectionTimeout") final @Nullable Integer connectionTimeout, + @JsonProperty("healthCheckInterval") final @Nullable Integer healthCheckInterval, + @JsonProperty("retryInterval") final @Nullable Integer retryInterval, + @JsonProperty("autoReconnect") final @Nullable Boolean autoReconnect) { + super(uri, overrideUri, applicationUri, auth, tls, opcuaToMqttConfig, security, sessionTimeout, requestTimeout, keepAliveInterval, keepAliveFailuresAllowed, connectionTimeout, healthCheckInterval, retryInterval, autoReconnect); } } diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/OpcUaSpecificAdapterConfig.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/OpcUaSpecificAdapterConfig.java index 4e6538540f..b780987fca 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/OpcUaSpecificAdapterConfig.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/OpcUaSpecificAdapterConfig.java @@ -58,8 +58,7 @@ public class OpcUaSpecificAdapterConfig implements ProtocolSpecificAdapterConfig @JsonProperty("applicationUri") @ModuleConfigField(title = "Application URI Override", description = "Overrides the Application URI used for OPC UA client identification. If not specified, the URI from the certificate SAN extension is used, or the default URI 'urn:hivemq:edge:client' as fallback.", - format = ModuleConfigField.FieldType.URI, - required = false) + format = ModuleConfigField.FieldType.URI) private final @Nullable String applicationUri; @JsonProperty("auth") @@ -77,6 +76,68 @@ public class OpcUaSpecificAdapterConfig implements ProtocolSpecificAdapterConfig description = "The configuration for a data stream from OPC UA to MQTT") private final @NotNull OpcUaToMqttConfig opcuaToMqttConfig; + @JsonProperty("sessionTimeout") + @ModuleConfigField(title = "Session Timeout (seconds)", + description = "OPC UA session timeout in seconds. Session will be renewed at this interval.", + numberMin = 10, + numberMax = 3600, + defaultValue = "120") + private final int sessionTimeout; + + @JsonProperty("requestTimeout") + @ModuleConfigField(title = "Request Timeout (seconds)", + description = "Timeout for OPC UA requests in seconds", + numberMin = 5, + numberMax = 300, + defaultValue = "30") + private final int requestTimeout; + + @JsonProperty("keepAliveInterval") + @ModuleConfigField(title = "Keep-Alive Interval (seconds)", + description = "Interval between OPC UA keep-alive pings in seconds", + numberMin = 1, + numberMax = 60, + defaultValue = "10") + private final int keepAliveInterval; + + @JsonProperty("keepAliveFailuresAllowed") + @ModuleConfigField(title = "Keep-Alive Failures Allowed", + description = "Number of consecutive keep-alive failures before connection is considered dead", + numberMin = 1, + numberMax = 10, + defaultValue = "3") + private final int keepAliveFailuresAllowed; + + @JsonProperty("connectionTimeout") + @ModuleConfigField(title = "Connection Timeout (seconds)", + description = "Timeout for establishing connection to OPC UA server in seconds", + numberMin = 5, + numberMax = 300, + defaultValue = "30") + private final int connectionTimeout; + + @JsonProperty("healthCheckInterval") + @ModuleConfigField(title = "Health Check Interval (seconds)", + description = "Interval between connection health checks in seconds", + numberMin = 10, + numberMax = 300, + defaultValue = "30") + private final int healthCheckInterval; + + @JsonProperty("retryInterval") + @ModuleConfigField(title = "Retry Interval (seconds)", + description = "Interval between connection retry attempts in seconds", + numberMin = 5, + numberMax = 300, + defaultValue = "30") + private final int retryInterval; + + @JsonProperty("autoReconnect") + @ModuleConfigField(title = "Automatic Reconnection", + description = "Enable automatic reconnection when health check detects connection issues", + defaultValue = "true") + private final boolean autoReconnect; + @JsonCreator public OpcUaSpecificAdapterConfig( @JsonProperty(value = "uri", required = true) final @NotNull String uri, @@ -85,15 +146,33 @@ public OpcUaSpecificAdapterConfig( @JsonProperty("auth") final @Nullable Auth auth, @JsonProperty("tls") final @Nullable Tls tls, @JsonProperty(value = "opcuaToMqtt") final @Nullable OpcUaToMqttConfig opcuaToMqttConfig, - @JsonProperty("security") final @Nullable Security security) { + @JsonProperty("security") final @Nullable Security security, + @JsonProperty("sessionTimeout") final @Nullable Integer sessionTimeout, + @JsonProperty("requestTimeout") final @Nullable Integer requestTimeout, + @JsonProperty("keepAliveInterval") final @Nullable Integer keepAliveInterval, + @JsonProperty("keepAliveFailuresAllowed") final @Nullable Integer keepAliveFailuresAllowed, + @JsonProperty("connectionTimeout") final @Nullable Integer connectionTimeout, + @JsonProperty("healthCheckInterval") final @Nullable Integer healthCheckInterval, + @JsonProperty("retryInterval") final @Nullable Integer retryInterval, + @JsonProperty("autoReconnect") final @Nullable Boolean autoReconnect) { this.uri = uri; this.overrideUri = requireNonNullElse(overrideUri, false); this.applicationUri = (applicationUri != null && !applicationUri.isBlank()) ? applicationUri : null; this.auth = auth; - this.tls = requireNonNullElse(tls, new Tls(false, null, null)); + this.tls = requireNonNullElse(tls, new Tls(false, TlsChecks.NONE, null, null)); this.opcuaToMqttConfig = Objects.requireNonNullElseGet(opcuaToMqttConfig, () -> new OpcUaToMqttConfig(1, 1000)); this.security = requireNonNullElse(security, new Security(Constants.DEFAULT_SECURITY_POLICY)); + + // Timeout configurations with sensible defaults + this.sessionTimeout = requireNonNullElse(sessionTimeout, 120); + this.requestTimeout = requireNonNullElse(requestTimeout, 30); + this.keepAliveInterval = requireNonNullElse(keepAliveInterval, 10); + this.keepAliveFailuresAllowed = requireNonNullElse(keepAliveFailuresAllowed, 3); + this.connectionTimeout = requireNonNullElse(connectionTimeout, 30); + this.healthCheckInterval = requireNonNullElse(healthCheckInterval, 30); + this.retryInterval = requireNonNullElse(retryInterval, 30); + this.autoReconnect = requireNonNullElse(autoReconnect, true); } @@ -125,6 +204,38 @@ public OpcUaSpecificAdapterConfig( return applicationUri; } + public int getSessionTimeout() { + return sessionTimeout; + } + + public int getRequestTimeout() { + return requestTimeout; + } + + public int getKeepAliveInterval() { + return keepAliveInterval; + } + + public int getKeepAliveFailuresAllowed() { + return keepAliveFailuresAllowed; + } + + public int getConnectionTimeout() { + return connectionTimeout; + } + + public int getHealthCheckInterval() { + return healthCheckInterval; + } + + public int getRetryInterval() { + return retryInterval; + } + + public boolean isAutoReconnect() { + return autoReconnect; + } + @Override public boolean equals(final @Nullable Object o) { if (o == null || getClass() != o.getClass()) { @@ -132,6 +243,14 @@ public boolean equals(final @Nullable Object o) { } final OpcUaSpecificAdapterConfig that = (OpcUaSpecificAdapterConfig) o; return getOverrideUri().equals(that.getOverrideUri()) && + sessionTimeout == that.sessionTimeout && + requestTimeout == that.requestTimeout && + keepAliveInterval == that.keepAliveInterval && + keepAliveFailuresAllowed == that.keepAliveFailuresAllowed && + connectionTimeout == that.connectionTimeout && + healthCheckInterval == that.healthCheckInterval && + retryInterval == that.retryInterval && + autoReconnect == that.autoReconnect && Objects.equals(id, that.id) && Objects.equals(getUri(), that.getUri()) && Objects.equals(getApplicationUri(), that.getApplicationUri()) && @@ -143,6 +262,7 @@ public boolean equals(final @Nullable Object o) { @Override public int hashCode() { - return Objects.hash(getOverrideUri(), id, getUri(), getApplicationUri(), getAuth(), getTls(), getSecurity(), getOpcuaToMqttConfig()); + return Objects.hash(getOverrideUri(), id, getUri(), getApplicationUri(), getAuth(), getTls(), getSecurity(), getOpcuaToMqttConfig(), + sessionTimeout, requestTimeout, keepAliveInterval, keepAliveFailuresAllowed, connectionTimeout, healthCheckInterval, retryInterval, autoReconnect); } } diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/Tls.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/Tls.java index fe748e8770..3a055c5e3f 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/Tls.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/Tls.java @@ -21,9 +21,8 @@ import com.hivemq.adapter.sdk.api.annotations.ModuleConfigField; import org.jetbrains.annotations.Nullable; -import java.util.Objects; - import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; +import static java.util.Objects.requireNonNullElse; public record Tls (@JsonProperty("enabled") @ModuleConfigField(title = "Enable TLS", @@ -31,6 +30,12 @@ public record Tls (@JsonProperty("enabled") defaultValue = "false") boolean enabled, + @JsonProperty("noChecks") + @ModuleConfigField(title = "Disable certificate validation", + description = "Allows to disable the validation of a certificate", + defaultValue = "STANDARD") + @Nullable TlsChecks tlsChecks, + @JsonProperty("keystore") @JsonInclude(NON_NULL) @ModuleConfigField(title = "Keystore", @@ -43,7 +48,10 @@ public record Tls (@JsonProperty("enabled") description = "Truststore which contains the trusted server certificates or trusted intermediates.") @Nullable Truststore truststore ) { + @JsonCreator - public Tls{ + public Tls { + tlsChecks = requireNonNullElse(tlsChecks, TlsChecks.STANDARD); } + } diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/TlsChecks.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/TlsChecks.java new file mode 100644 index 0000000000..1416a2c628 --- /dev/null +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/TlsChecks.java @@ -0,0 +1,49 @@ +package com.hivemq.edge.adapters.opcua.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public enum TlsChecks { + + @JsonProperty("NONE") + NONE("NONE"), + + @JsonProperty("STANDARD") + STANDARD("STANDARD"), + + @JsonProperty("ALL") + ALL("ALL"); + + private final @NotNull String tlsChecks; + + TlsChecks(final @NotNull String tlsChecks) { + this.tlsChecks = tlsChecks; + } + + @Override + public String toString() { + return tlsChecks; + } + + /** + * Jackson creator method for deserialization. + * + * @param value the string value from JSON + * @return the corresponding MsgSecurityMode + */ + @JsonCreator + public static @Nullable TlsChecks fromString(final @Nullable String value) { + if (value == null || value.isBlank()) { + return null; + } + for (final var mode : values()) { + if (mode.name().equalsIgnoreCase(value) || + mode.name().replace("_", "").equalsIgnoreCase(value)) { + return mode; + } + } + return null; + } +} diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionLifecycleHandler.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionLifecycleHandler.java index b70de49ea4..3430c6ea1b 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionLifecycleHandler.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionLifecycleHandler.java @@ -50,6 +50,7 @@ public class OpcUaSubscriptionLifecycleHandler implements OpcUaSubscription.SubscriptionListener { private static final Logger log = LoggerFactory.getLogger(OpcUaSubscriptionLifecycleHandler.class); + private static final long KEEP_ALIVE_TIMEOUT_MS = 30_000; // 30 seconds private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService; private final @NotNull ProtocolAdapterTagStreamingService tagStreamingService; @@ -62,6 +63,9 @@ public class OpcUaSubscriptionLifecycleHandler implements OpcUaSubscription.Subs private final @NotNull DataPointFactory dataPointFactory; final @NotNull OpcUaSpecificAdapterConfig config; + // Track last keep-alive timestamp for health monitoring + private volatile long lastKeepAliveTimestamp = System.currentTimeMillis(); + public OpcUaSubscriptionLifecycleHandler( final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService, final @NotNull ProtocolAdapterTagStreamingService tagStreamingService, @@ -189,7 +193,23 @@ public static boolean syncTagsAndMonitoredItems(final @NotNull OpcUaSubscription @Override public void onKeepAliveReceived(final @NotNull OpcUaSubscription subscription) { + lastKeepAliveTimestamp = System.currentTimeMillis(); protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_KEEPALIVE_COUNT); + + subscription.getSubscriptionId().ifPresent(subscriptionId -> { + log.debug("Keep-alive received for subscription {} of adapter '{}'", subscriptionId, adapterId); + }); + } + + /** + * Checks if keep-alive messages are being received within the expected timeout. + * Can be used for health monitoring to detect subscription issues. + * + * @return true if last keep-alive was received within KEEP_ALIVE_TIMEOUT_MS, false otherwise + */ + public boolean isKeepAliveHealthy() { + final long timeSinceLastKeepAlive = System.currentTimeMillis() - lastKeepAliveTimestamp; + return timeSinceLastKeepAlive < KEEP_ALIVE_TIMEOUT_MS; } @Override diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/FakeEventService.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/FakeEventService.java index b3f6e20bfe..75a51036a5 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/FakeEventService.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/FakeEventService.java @@ -39,6 +39,7 @@ public void fireEvent(final @NotNull Event event) { eventStore.add(event); } + @Override public @NotNull EventBuilder createAdapterEvent(final @NotNull String adapterId, final @NotNull String protocolId) { return new EventBuilderImpl(this::fireEvent).withTimestamp(System.currentTimeMillis()) .withSource(TypeIdentifierImpl.create(TypeIdentifier.Type.ADAPTER, adapterId)) @@ -68,7 +69,7 @@ public void fireEvent(final @NotNull Event event) { } @Override - public List readEvents(final @Nullable Long sinceTimestamp, final @Nullable Integer limit) { + public @NotNull List readEvents(final @Nullable Long sinceTimestamp, final @Nullable Integer limit) { return List.copyOf(eventStore); } } diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaEndpointFilterTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaEndpointFilterTest.java index 3d4515a13b..eac9192b90 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaEndpointFilterTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaEndpointFilterTest.java @@ -20,6 +20,7 @@ import com.hivemq.edge.adapters.opcua.config.OpcUaSpecificAdapterConfig; import com.hivemq.edge.adapters.opcua.config.SecPolicy; import com.hivemq.edge.adapters.opcua.config.Tls; +import com.hivemq.edge.adapters.opcua.config.TlsChecks; import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode; import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription; import org.jetbrains.annotations.NotNull; @@ -56,7 +57,15 @@ public void whenSingleEndpointConfigSet_thenPickCorrectEndpoint() { false, null, null, - new Tls(true, new Keystore("path", null, null), null), + new Tls(true, TlsChecks.NONE, new Keystore("path", "pass", "passPriv"), null), + null, + null, + null, + null, + null, + null, + null, + null, null, null); @@ -65,14 +74,17 @@ public void whenSingleEndpointConfigSet_thenPickCorrectEndpoint() { final Optional result = opcUaEndpointFilter.apply(convertToEndpointDescription(allUris, MessageSecurityMode.SignAndEncrypt)); - assertThat(result.isPresent()).isTrue(); - assertThat(BASIC256SHA256.getSecurityPolicy().getUri()).isEqualTo(result.get().getSecurityPolicyUri()); + assertThat(result) + .isPresent() + .get() + .extracting(EndpointDescription::getSecurityPolicyUri) + .isEqualTo(BASIC256SHA256.getSecurityPolicy().getUri()); } @Test public void whenSingleEndpointConfigSetAndNoKeystorePresent_thenPickNoEndpoint() { final OpcUaSpecificAdapterConfig config = - new OpcUaSpecificAdapterConfig("opc.tcp://127.0.0.1:49320", false, null, null, null, null, null); + new OpcUaSpecificAdapterConfig("opc.tcp://127.0.0.1:49320", false, null, null, null, null, null, null, null, null, null, null, null, null, null); final String configUri = convertToUri(BASIC256SHA256); final OpcUaEndpointFilter opcUaEndpointFilter = new OpcUaEndpointFilter("id", configUri, null, config); @@ -86,7 +98,7 @@ public void whenSingleEndpointConfigSetAndNoKeystorePresent_thenPickNoEndpoint() public void whenSingleEndpointConfigSetAndNotAvailOnServer_thenPickNoEndpoint() { final String configUri = convertToUri(BASIC256SHA256); final OpcUaSpecificAdapterConfig config = - new OpcUaSpecificAdapterConfig("opc.tcp://127.0.0.1:49320", false, null, null, null, null, null); + new OpcUaSpecificAdapterConfig("opc.tcp://127.0.0.1:49320", false, null, null, null, null, null, null, null, null, null, null, null, null, null); final OpcUaEndpointFilter opcUaEndpointFilter = new OpcUaEndpointFilter("id", configUri, null, config); final Optional result = @@ -98,14 +110,17 @@ public void whenSingleEndpointConfigSetAndNotAvailOnServer_thenPickNoEndpoint() @Test public void whenDefaultEndpointConfigSet_thenPickMatchingEndpoint() { final OpcUaSpecificAdapterConfig config = - new OpcUaSpecificAdapterConfig("opc.tcp://127.0.0.1:49320", false, null, null, null, null, null); + new OpcUaSpecificAdapterConfig("opc.tcp://127.0.0.1:49320", false, null, null, null, null, null, null, null, null, null, null, null, null, null); final OpcUaEndpointFilter opcUaEndpointFilter = new OpcUaEndpointFilter("id", convertToUri( DEFAULT_SECURITY_POLICY), null, config); final Optional result = opcUaEndpointFilter.apply(convertToEndpointDescription(allUris, MessageSecurityMode.None)); - assertThat(result.isPresent()).isTrue(); - assertThat(NONE.getSecurityPolicy().getUri()).isEqualTo(result.get().getSecurityPolicyUri()); + assertThat(result) + .isPresent() + .get() + .extracting(EndpointDescription::getSecurityPolicyUri) + .isEqualTo(NONE.getSecurityPolicy().getUri()); } @Test @@ -115,7 +130,15 @@ public void whenMessageSecurityModeSpecified_thenFilterByMode() { false, null, null, - new Tls(true, new Keystore("path", null, null), null), + new Tls(true, TlsChecks.NONE, new Keystore("path", "pass", "passPriv"), null), + null, + null, + null, + null, + null, + null, + null, + null, null, null); @@ -133,16 +156,22 @@ public void whenMessageSecurityModeSpecified_thenFilterByMode() { final Optional resultSign = filterForSign.apply(endpoints); // Then - Sign endpoint is selected - assertThat(resultSign.isPresent()).isTrue(); - assertThat(resultSign.get().getSecurityMode()).isEqualTo(MessageSecurityMode.Sign); + assertThat(resultSign) + .isPresent() + .get() + .extracting(EndpointDescription::getSecurityMode) + .isEqualTo(MessageSecurityMode.Sign); // When - Filter with SignAndEncrypt mode preference final OpcUaEndpointFilter filterForSignAndEncrypt = new OpcUaEndpointFilter("id", policyUri, MessageSecurityMode.SignAndEncrypt, config); final Optional resultSignAndEncrypt = filterForSignAndEncrypt.apply(endpoints); // Then - SignAndEncrypt endpoint is selected - assertThat(resultSignAndEncrypt.isPresent()).isTrue(); - assertThat(resultSignAndEncrypt.get().getSecurityMode()).isEqualTo(MessageSecurityMode.SignAndEncrypt); + assertThat(resultSignAndEncrypt) + .isPresent() + .get() + .extracting(EndpointDescription::getSecurityMode) + .isEqualTo(MessageSecurityMode.SignAndEncrypt); } @Test @@ -152,7 +181,15 @@ public void whenNoMessageSecurityModeSpecified_thenAcceptAnyMode() { false, null, null, - new Tls(true, new Keystore("path", null, null), null), + new Tls(true, TlsChecks.NONE, new Keystore("path", "pass", "passPriv"), null), + null, + null, + null, + null, + null, + null, + null, + null, null, null); @@ -170,8 +207,11 @@ public void whenNoMessageSecurityModeSpecified_thenAcceptAnyMode() { final Optional result = filter.apply(endpoints); // Then - Any endpoint with matching policy is accepted (backwards compatible) - assertThat(result.isPresent()).isTrue(); - assertThat(result.get().getSecurityPolicyUri()).isEqualTo(policyUri); + assertThat(result) + .isPresent() + .get() + .extracting(EndpointDescription::getSecurityPolicyUri) + .isEqualTo(policyUri); } @Test @@ -181,7 +221,15 @@ public void whenWrongMessageSecurityMode_thenNoEndpointSelected() { false, null, null, - new Tls(true, new Keystore("path", null, null), null), + new Tls(true, TlsChecks.NONE, new Keystore("path", "pass", "passPriv"), null), + null, + null, + null, + null, + null, + null, + null, + null, null, null); diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterAuthTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterAuthTest.java index 999e88a076..2e0c391d17 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterAuthTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterAuthTest.java @@ -31,6 +31,7 @@ import com.hivemq.edge.adapters.opcua.config.SecPolicy; import com.hivemq.edge.adapters.opcua.config.Security; import com.hivemq.edge.adapters.opcua.config.Tls; +import com.hivemq.edge.adapters.opcua.config.TlsChecks; import com.hivemq.edge.adapters.opcua.config.X509Auth; import com.hivemq.edge.adapters.opcua.config.opcua2mqtt.OpcUaToMqttConfig; import com.hivemq.edge.modules.adapters.data.DataPointImpl; @@ -58,12 +59,14 @@ class OpcUaProtocolAdapterAuthTest { private final @NotNull ProtocolAdapterInput protocolAdapterInput = mock(); + private ModuleServices moduleServices; + @BeforeEach void setUp() { when(protocolAdapterInput.getProtocolAdapterState()).thenReturn(new ProtocolAdapterStateImpl(mock(), "id", "protocolId")); - final ModuleServices moduleServices = mock(); + moduleServices = mock(); when(moduleServices.adapterPublishService()).thenReturn(mock(ProtocolAdapterPublishService.class)); when(moduleServices.eventService()).thenReturn(new FakeEventService()); when(moduleServices.adapterPublishService()).thenReturn(mock(ProtocolAdapterPublishService.class)); @@ -99,6 +102,14 @@ public void whenNoAuthAndNoSubscriptions_thenConnectSuccessfully() { null, null, new OpcUaToMqttConfig(1, 1000), + null, + null, + null, + null, + null, + null, + null, + null, null); when(protocolAdapterInput.getConfig()).thenReturn(config); @@ -107,7 +118,7 @@ public void whenNoAuthAndNoSubscriptions_thenConnectSuccessfully() { final OpcUaProtocolAdapter protocolAdapter = new OpcUaProtocolAdapter(OpcUaProtocolAdapterInformation.INSTANCE, protocolAdapterInput); - final ProtocolAdapterStartInput in = new TestProtocolAdapterStartInput(protocolAdapterInput.moduleServices()); + final ProtocolAdapterStartInput in = new TestProtocolAdapterStartInput(moduleServices); final ProtocolAdapterStartOutput out = mock(ProtocolAdapterStartOutput.class); protocolAdapter.start(in, out); @@ -127,6 +138,14 @@ public void whenBasicAuthAndNoSubscriptions_thenConnectSuccessfully() { auth, null, null, + null, + null, + null, + null, + null, + null, + null, + null, null); when(protocolAdapterInput.getConfig()).thenReturn(config); @@ -134,7 +153,7 @@ public void whenBasicAuthAndNoSubscriptions_thenConnectSuccessfully() { final OpcUaProtocolAdapter protocolAdapter = new OpcUaProtocolAdapter(OpcUaProtocolAdapterInformation.INSTANCE, protocolAdapterInput); - final ProtocolAdapterStartInput in = new TestProtocolAdapterStartInput(protocolAdapterInput.moduleServices()); + final ProtocolAdapterStartInput in = new TestProtocolAdapterStartInput(moduleServices); final ProtocolAdapterStartOutput out = mock(ProtocolAdapterStartOutput.class); protocolAdapter.start(in, out); @@ -145,7 +164,7 @@ public void whenBasicAuthAndNoSubscriptions_thenConnectSuccessfully() { @Timeout(30) public void whenTlsAndNoSubscriptions_thenConnectSuccessfully() { final Security security = new Security(SecPolicy.NONE); - final Tls tls = new Tls(true, null, null); + final Tls tls = new Tls(true, TlsChecks.NONE, null, null); final OpcUaSpecificAdapterConfig config = new OpcUaSpecificAdapterConfig( opcUaServerExtension.getServerUri(), false, @@ -153,13 +172,21 @@ public void whenTlsAndNoSubscriptions_thenConnectSuccessfully() { null, tls, null, - security); + security, + null, + null, + null, + null, + null, + null, + null, + null); when(protocolAdapterInput.getConfig()).thenReturn(config); final OpcUaProtocolAdapter protocolAdapter = new OpcUaProtocolAdapter(OpcUaProtocolAdapterInformation.INSTANCE, protocolAdapterInput); - final ProtocolAdapterStartInput in = new TestProtocolAdapterStartInput(protocolAdapterInput.moduleServices()); + final ProtocolAdapterStartInput in = new TestProtocolAdapterStartInput(moduleServices); final ProtocolAdapterStartOutput out = mock(ProtocolAdapterStartOutput.class); protocolAdapter.start(in, out); @@ -174,7 +201,7 @@ public void whenCertAuthAndNoSubscriptions_thenConnectSuccessfully() throws Exce final KeyChain root = KeyChain.createKeyChain("root"); final var keystore = root.wrapInKeyStoreWithPrivateKey("keystore", "root", "password", "password"); - final Tls tls = new Tls(true, new Keystore(keystore.getAbsolutePath(), "password", "password"), null); + final Tls tls = new Tls(true, TlsChecks.NONE, new Keystore(keystore.getAbsolutePath(), "password", "password"), null); final OpcUaSpecificAdapterConfig config = new OpcUaSpecificAdapterConfig( opcUaServerExtension.getServerUri(), false, @@ -182,6 +209,14 @@ public void whenCertAuthAndNoSubscriptions_thenConnectSuccessfully() throws Exce auth, tls, null, + null, + null, + null, + null, + null, + null, + null, + null, null); when(protocolAdapterInput.getConfig()).thenReturn(config); @@ -189,7 +224,7 @@ public void whenCertAuthAndNoSubscriptions_thenConnectSuccessfully() throws Exce final OpcUaProtocolAdapter protocolAdapter = new OpcUaProtocolAdapter(OpcUaProtocolAdapterInformation.INSTANCE, protocolAdapterInput); - final ProtocolAdapterStartInput in = new TestProtocolAdapterStartInput(protocolAdapterInput.moduleServices()); + final ProtocolAdapterStartInput in = new TestProtocolAdapterStartInput(moduleServices); final ProtocolAdapterStartOutput out = mock(ProtocolAdapterStartOutput.class); protocolAdapter.start(in, out); diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/client/OpcUaClientConfiguratorTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/client/OpcUaClientConfiguratorTest.java index 7e6651b5a7..46898ca354 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/client/OpcUaClientConfiguratorTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/client/OpcUaClientConfiguratorTest.java @@ -16,6 +16,7 @@ package com.hivemq.edge.adapters.opcua.client; import com.hivemq.edge.adapters.opcua.Constants; +import com.hivemq.edge.adapters.opcua.config.OpcUaSpecificAdapterConfig; import org.eclipse.milo.opcua.sdk.client.OpcUaClientConfigBuilder; import org.eclipse.milo.opcua.sdk.client.identity.AnonymousProvider; import org.junit.jupiter.api.Test; @@ -30,6 +31,10 @@ class OpcUaClientConfiguratorTest { private static final String ADAPTER_ID = "test-adapter"; private static final String EXTRACTED_URI = "urn:hivemq:edge:testclient"; + // Minimal config for tests - defaults will be used for timeout values + private static final OpcUaSpecificAdapterConfig TEST_CONFIG = + new OpcUaSpecificAdapterConfig("opc.tcp://test:4840", false, null, null, null, null, null, null, null, null, null, null, null, null, null); + @Test void testAccept_withExtractedUri_usesExtractedUri() { // Given @@ -42,7 +47,7 @@ void testAccept_withExtractedUri_usesExtractedUri() { EXTRACTED_URI // Application URI from certificate ); - final OpcUaClientConfigurator configurator = new OpcUaClientConfigurator(ADAPTER_ID, parsedConfig); + final OpcUaClientConfigurator configurator = new OpcUaClientConfigurator(ADAPTER_ID, parsedConfig, TEST_CONFIG); final OpcUaClientConfigBuilder configBuilder = spy(new OpcUaClientConfigBuilder()); // When @@ -68,7 +73,7 @@ void testAccept_withoutExtractedUri_usesDefaultUri() { null // No Application URI from certificate ); - final OpcUaClientConfigurator configurator = new OpcUaClientConfigurator(ADAPTER_ID, parsedConfig); + final OpcUaClientConfigurator configurator = new OpcUaClientConfigurator(ADAPTER_ID, parsedConfig, TEST_CONFIG); final OpcUaClientConfigBuilder configBuilder = spy(new OpcUaClientConfigBuilder()); // When @@ -94,7 +99,7 @@ void testAccept_tlsDisabled_usesDefaultUri() { null // No Application URI ); - final OpcUaClientConfigurator configurator = new OpcUaClientConfigurator(ADAPTER_ID, parsedConfig); + final OpcUaClientConfigurator configurator = new OpcUaClientConfigurator(ADAPTER_ID, parsedConfig, TEST_CONFIG); final OpcUaClientConfigBuilder configBuilder = spy(new OpcUaClientConfigBuilder()); // When @@ -120,7 +125,7 @@ void testAccept_withExtractedUri_configuresOtherSettings() { EXTRACTED_URI ); - final OpcUaClientConfigurator configurator = new OpcUaClientConfigurator(ADAPTER_ID, parsedConfig); + final OpcUaClientConfigurator configurator = new OpcUaClientConfigurator(ADAPTER_ID, parsedConfig, TEST_CONFIG); final OpcUaClientConfigBuilder configBuilder = spy(new OpcUaClientConfigBuilder()); // When @@ -145,7 +150,7 @@ void testAccept_nullApplicationUri_usesDefault() { null // Explicitly null ); - final OpcUaClientConfigurator configurator = new OpcUaClientConfigurator(ADAPTER_ID, parsedConfig); + final OpcUaClientConfigurator configurator = new OpcUaClientConfigurator(ADAPTER_ID, parsedConfig, TEST_CONFIG); final OpcUaClientConfigBuilder configBuilder = spy(new OpcUaClientConfigBuilder()); // When @@ -172,7 +177,7 @@ void testAccept_withConfiguredApplicationUri_usesConfiguredUri() { configuredUri // Configured override URI ); - final OpcUaClientConfigurator configurator = new OpcUaClientConfigurator(ADAPTER_ID, parsedConfig); + final OpcUaClientConfigurator configurator = new OpcUaClientConfigurator(ADAPTER_ID, parsedConfig, TEST_CONFIG); final OpcUaClientConfigBuilder configBuilder = spy(new OpcUaClientConfigBuilder()); // When diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/client/ParsedConfigTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/client/ParsedConfigTest.java index 57eee80710..f9de17294f 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/client/ParsedConfigTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/client/ParsedConfigTest.java @@ -20,6 +20,7 @@ import com.hivemq.edge.adapters.opcua.config.SecPolicy; import com.hivemq.edge.adapters.opcua.config.Security; import com.hivemq.edge.adapters.opcua.config.Tls; +import com.hivemq.edge.adapters.opcua.config.TlsChecks; import com.hivemq.edge.adapters.opcua.config.Truststore; import com.hivemq.edge.adapters.opcua.config.opcua2mqtt.OpcUaToMqttConfig; import org.eclipse.milo.opcua.sdk.client.identity.AnonymousProvider; @@ -357,7 +358,7 @@ private OpcUaSpecificAdapterConfig createAdapterConfig( ? new Truststore(truststorePath, KEYSTORE_PASSWORD) : null; - final Tls tls = new Tls(tlsEnabled, keystore, truststore); + final Tls tls = new Tls(tlsEnabled, TlsChecks.NONE, keystore, truststore); final Security security = new Security(SecPolicy.NONE); final OpcUaToMqttConfig opcUaToMqttConfig = new OpcUaToMqttConfig(1, 1000); @@ -368,7 +369,15 @@ private OpcUaSpecificAdapterConfig createAdapterConfig( null, // no auth tls, opcUaToMqttConfig, - security + security, + null, + null, + null, + null, + null, + null, + null, + null ); } } diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/config/OpcUaProtocolAdapterConfigTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/config/OpcUaProtocolAdapterConfigTest.java index f359a2395a..7d2d5a9aaa 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/config/OpcUaProtocolAdapterConfigTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/config/OpcUaProtocolAdapterConfigTest.java @@ -135,9 +135,7 @@ public void convertConfigObject_defaults_valid() throws Exception { assertThat(config.getAuth()).isNull(); - assertThat(config.getTls()).satisfies(tls -> { - assertThat(tls.enabled()).isFalse(); - }); + assertThat(config.getTls()).satisfies(tls -> assertThat(tls.enabled()).isFalse()); assertThat(config.getOpcuaToMqttConfig()).isNotNull(); assertThat(protocolAdapterConfig.getNorthboundMappings()).satisfiesExactly(mapping -> { @@ -180,10 +178,19 @@ public void unconvertConfigObject_full_valid() { null, new Auth(new BasicAuth("my-username", "my-password"), new X509Auth(true)), new Tls(true, + TlsChecks.NONE, new Keystore("my/keystore/path", "keystore-password", "private-key-password"), new Truststore("my/truststore/path", "truststore-password")), new OpcUaToMqttConfig(1, 1000), - new Security(BASIC128RSA15) + new Security(BASIC128RSA15), + null, + null, + null, + null, + null, + null, + null, + null ); final OpcUaProtocolAdapterFactory opcuaProtocolAdapterFactory = @@ -201,9 +208,7 @@ public void unconvertConfigObject_full_valid() { assertThat(basic.get("username")).isEqualTo("my-username"); assertThat(basic.get("password")).isEqualTo("my-password"); }); - assertThat((Map) authMap.get("x509")).satisfies(basic -> { - assertThat(basic.get("enabled")).isEqualTo(true); - }); + assertThat((Map) authMap.get("x509")).satisfies(basic -> assertThat(basic.get("enabled")).isEqualTo(true)); final Map tlsMap = (Map) config.get("tls"); assertThat(tlsMap.get("enabled")).isEqualTo(true); @@ -228,6 +233,14 @@ public void unconvertConfigObject_default_valid() { null, null, new OpcUaToMqttConfig(1, 1000), + null, + null, + null, + null, + null, + null, + null, + null, null ); @@ -252,7 +265,7 @@ public void unconvertConfigObject_default_valid() { final File path = Path.of(resource.toURI()).toFile(); final HiveMQConfigEntity configEntity = loadConfig(path); - final ProtocolAdapterEntity adapterEntity = configEntity.getProtocolAdapterConfig().get(0); + final ProtocolAdapterEntity adapterEntity = configEntity.getProtocolAdapterConfig().getFirst(); final ProtocolAdapterConfigConverter converter = createConverter(); diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/AbstractOpcUaPayloadConverterTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/AbstractOpcUaPayloadConverterTest.java index ad2ed949d7..409d2fd649 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/AbstractOpcUaPayloadConverterTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/AbstractOpcUaPayloadConverterTest.java @@ -105,6 +105,14 @@ protected OpcUaProtocolAdapter createAndStartAdapter(final @NotNull String subcr null, null, opcuaToMqttConfig, + null, + null, + null, + null, + null, + null, + null, + null, null); when(protocolAdapterInput.getConfig()).thenReturn(config);