diff --git a/debezium-platform-conductor/pom.xml b/debezium-platform-conductor/pom.xml index f402e87a..f463ec82 100644 --- a/debezium-platform-conductor/pom.xml +++ b/debezium-platform-conductor/pom.xml @@ -63,6 +63,8 @@ 4.8.1 -javaagent:${org.mockito:mockito-core:jar} + + 1.56.1 @@ -149,6 +151,11 @@ mssqlserver ${test-containers.version} + + org.testcontainers + qdrant + 1.21.3 + io.rest-assured @@ -231,6 +238,28 @@ io.quarkus quarkus-rest-jackson + + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + io.grpc + grpc-core + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + io.qdrant + client + 1.15.0 + + io.quarkus quarkus-hibernate-orm @@ -385,6 +414,12 @@ mssqlserver test + + org.testcontainers + qdrant + 1.21.3 + test + ch.qos.logback diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/QdrantConnectionValidator.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/QdrantConnectionValidator.java new file mode 100644 index 00000000..f6ae9d64 --- /dev/null +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/QdrantConnectionValidator.java @@ -0,0 +1,256 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.connection.destination; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Named; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ListenableFuture; + +import io.debezium.platform.data.dto.ConnectionValidationResult; +import io.debezium.platform.domain.views.Connection; +import io.debezium.platform.environment.connection.ConnectionValidator; +import io.qdrant.client.QdrantClient; +import io.qdrant.client.QdrantGrpcClient; + +/** + * Implementation of {@link ConnectionValidator} for Qdrant vector database connections. + *

+ * This validator performs validation of Qdrant connection configurations + * including network connectivity and server accessibility. + *

+ * + *

+ * The validation process includes: + *

    + *
  • Connection parameter validation (hostname, port)
  • + *
  • Network connectivity verification
  • + *
  • Server health check by listing collections
  • + *
  • Timeout handling for network issues
  • + *
+ *

+ * + * @author Pranav Tiwari + */ +@ApplicationScoped +@Named("QDRANT") +public class QdrantConnectionValidator implements ConnectionValidator { + + private static final Logger LOGGER = LoggerFactory.getLogger(QdrantConnectionValidator.class); + + private final int defaultConnectionTimeout; + + private static final String HOSTNAME_KEY = "hostname"; + private static final String PORT_KEY = "port"; + private static final String USE_TLS_KEY = "useTls"; + private static final String API_KEY_KEY = "apiKey"; + private static final int DEFAULT_PORT = 6334; + + public QdrantConnectionValidator(@ConfigProperty(name = "destinations.qdrant.connection.timeout") int defaultConnectionTimeout) { + this.defaultConnectionTimeout = defaultConnectionTimeout; + } + + @Override + public ConnectionValidationResult validate(Connection connectionConfig) { + if (connectionConfig == null) { + return ConnectionValidationResult.failed("Connection configuration cannot be null"); + } + + try { + LOGGER.info("Starting Qdrant connection validation for connection: {}", connectionConfig.getName()); + + Map qdrantConfig = connectionConfig.getConfig(); + + ConnectionValidationResult paramValidation = validateConnectionParameters(qdrantConfig); + if (!paramValidation.valid()) { + return ConnectionValidationResult.failed(paramValidation.message()); + } + + String hostname = qdrantConfig.get(HOSTNAME_KEY).toString(); + int port = getPortValue(qdrantConfig); + boolean useTls = getBooleanValue(qdrantConfig, USE_TLS_KEY, false); + String apiKey = getStringValue(qdrantConfig, API_KEY_KEY); + + return performConnectionValidation(hostname, port, useTls, apiKey); + + } + catch (Exception e) { + LOGGER.error("Unexpected error during Qdrant connection validation", e); + return ConnectionValidationResult.failed("Validation failed due to unexpected error: " + e.getMessage()); + } + } + + /** + * Validates the connection parameters for required fields and correct types. + * + * @param config the Qdrant configuration properties + * @return ConnectionValidationResult indicating if parameters are valid + */ + private ConnectionValidationResult validateConnectionParameters(Map config) { + if (!config.containsKey(HOSTNAME_KEY) || + config.get(HOSTNAME_KEY) == null || + config.get(HOSTNAME_KEY).toString().trim().isEmpty()) { + return ConnectionValidationResult.failed("Hostname must be specified"); + } + + if (!config.containsKey(PORT_KEY) || + config.get(PORT_KEY) == null) { + return ConnectionValidationResult.failed("Port must be specified"); + } + + try { + int port = getPortValue(config); + if (port <= 0 || port > 65535) { + return ConnectionValidationResult.failed("Port must be between 1 and 65535"); + } + } + catch (NumberFormatException e) { + return ConnectionValidationResult.failed("Port must be a valid integer"); + } + + return ConnectionValidationResult.successful(); + } + + /** + * Performs the actual connection validation by attempting to connect to Qdrant + * and list collections. + * + * @param hostname the Qdrant server hostname + * @param port the Qdrant server port + * @param useTls whether to use TLS connection + * @param apiKey optional API key for authentication + * @return ConnectionConnectionValidationResult indicating success or failure + */ + private ConnectionValidationResult performConnectionValidation(String hostname, int port, boolean useTls, String apiKey) { + QdrantClient client = null; + + try { + LOGGER.debug("Creating Qdrant client for validation: {}:{}, TLS: {}", hostname, port, useTls); + + QdrantGrpcClient.Builder builder = QdrantGrpcClient.newBuilder(hostname, port, useTls); + + if (apiKey != null && !apiKey.trim().isEmpty()) { + builder.withApiKey(apiKey); + } + + client = new QdrantClient(builder.build()); + + LOGGER.debug("Attempting to list Qdrant collections for connectivity test"); + + ListenableFuture> connectionTest = client.listCollectionsAsync(); + + connectionTest.get(defaultConnectionTimeout, TimeUnit.SECONDS); + + LOGGER.info("Successfully validated Qdrant connection to {}:{}", hostname, port); + return ConnectionValidationResult.successful(); + + } + catch (java.util.concurrent.TimeoutException e) { + LOGGER.warn("Timeout during Qdrant connection validation", e); + return ConnectionValidationResult.failed( + "Connection timeout - please check hostname, port and network connectivity"); + + } + catch (java.util.concurrent.ExecutionException e) { + Throwable cause = e.getCause(); + LOGGER.warn("Failed to connect to Qdrant server", cause); + + if (cause instanceof io.grpc.StatusRuntimeException) { + io.grpc.StatusRuntimeException grpcException = (io.grpc.StatusRuntimeException) cause; + return switch (grpcException.getStatus().getCode()) { + case UNAVAILABLE -> { + if (useTls && cause.getMessage().contains("io exception")) { + yield ConnectionValidationResult.failed( + "TLS connection failed - check certificates and hostname"); + } + yield ConnectionValidationResult.failed( + "Qdrant server is unavailable - please check if the server is running"); + } + case UNAUTHENTICATED -> ConnectionValidationResult.failed( + "Authentication failed - please check API key"); + case PERMISSION_DENIED -> ConnectionValidationResult.failed( + "Permission denied - please check API key permissions"); + case DEADLINE_EXCEEDED -> ConnectionValidationResult.failed( + "Connection timeout - please check network connectivity"); + default -> ConnectionValidationResult.failed( + "gRPC error: " + grpcException.getStatus().getDescription()); + }; + } + + return ConnectionValidationResult.failed("Failed to connect to Qdrant: " + cause.getMessage()); + + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Qdrant connection validation was interrupted", e); + return ConnectionValidationResult.failed("Connection validation was interrupted"); + + } + catch (Exception e) { + LOGGER.error("Unexpected error during Qdrant connection validation", e); + return ConnectionValidationResult.failed("Generic error while connecting to Qdrant server: " + e.getMessage()); + + } + finally { + if (client != null) { + try { + LOGGER.debug("Closing Qdrant client"); + client.close(); + } + catch (Exception e) { + LOGGER.warn("Error closing Qdrant client", e); + } + } + } + } + + /** + * Extracts port value from configuration, using default if not specified. + */ + private int getPortValue(Map config) { + Object portValue = config.get(PORT_KEY); + if (portValue instanceof Integer) { + return (Integer) portValue; + } + else if (portValue instanceof String) { + return Integer.parseInt((String) portValue); + } + else if (portValue instanceof Number) { + return ((Number) portValue).intValue(); + } + return DEFAULT_PORT; + } + + /** + * Extracts boolean value from configuration with default fallback. + */ + private boolean getBooleanValue(Map config, String key, boolean defaultValue) { + Object value = config.get(key); + if (value instanceof Boolean) { + return (Boolean) value; + } + else if (value instanceof String) { + return Boolean.parseBoolean((String) value); + } + return defaultValue; + } + + /** + * Extracts string value from configuration. + */ + private String getStringValue(Map config, String key) { + Object value = config.get(key); + return value != null ? value.toString() : null; + } +} diff --git a/debezium-platform-conductor/src/main/resources/application.yml b/debezium-platform-conductor/src/main/resources/application.yml index 66fd877c..9b6dd5c4 100644 --- a/debezium-platform-conductor/src/main/resources/application.yml +++ b/debezium-platform-conductor/src/main/resources/application.yml @@ -44,6 +44,9 @@ destinations: kafka: connection: timeout: 60 + qdrant: + connection: + timeout: 60 quarkus: rest-client: diff --git a/debezium-platform-conductor/src/main/resources/connection-schemas.json b/debezium-platform-conductor/src/main/resources/connection-schemas.json index 936e1801..534395b8 100644 --- a/debezium-platform-conductor/src/main/resources/connection-schemas.json +++ b/debezium-platform-conductor/src/main/resources/connection-schemas.json @@ -214,7 +214,42 @@ "properties": { "bootstrap.servers": { "type": "list", - "title": "List of “hostname:port” pairs that address one or more (even all) of the brokers." + "title": "List of "hostname:port" pairs that address one or more (even all) of the brokers." + } + } + } + }, + { + "type": "QDRANT", + "schema": { + "title": "Qdrant connection properties", + "description": "Qdrant vector database connection properties", + "type": "object", + "required": [ + "hostname", + "port" + ], + "additionalProperties": { + "type": "string" + }, + "properties": { + "hostname": { + "type": "string", + "title": "The hostname of the Qdrant server" + }, + "port": { + "type": "integer", + "title": "The port of the Qdrant server", + "default": 6334 + }, + "useTls": { + "type": "boolean", + "title": "Whether to use TLS/SSL for connection", + "default": false + }, + "apiKey": { + "type": "string", + "title": "API key for authentication (optional)" } } } diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/QdrantConnectionValidatorAuthIT.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/QdrantConnectionValidatorAuthIT.java new file mode 100644 index 00000000..157dbcac --- /dev/null +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/QdrantConnectionValidatorAuthIT.java @@ -0,0 +1,260 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.connection; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.qdrant.QdrantContainer; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +import io.debezium.platform.data.dto.ConnectionValidationResult; +import io.debezium.platform.data.model.ConnectionEntity; +import io.debezium.platform.domain.views.Connection; +import io.debezium.platform.environment.connection.destination.QdrantConnectionValidator; +import io.debezium.platform.environment.database.db.QdrantTestResourceAuthenticated; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +/** + * Integration tests for QdrantConnectionValidator using Testcontainers WITH authentication. + * + *

This test class validates the QdrantConnectionValidator functionality against + * a real Qdrant instance running in a Docker container WITH API key authentication + * enabled. It provides comprehensive testing of authenticated connections, security + * validation, and real-world production-like scenarios.

+ * + *

Test scenarios covered:

+ *
    + *
  • Authentication Success: Valid API key connections
  • + *
  • Authentication Failures: Invalid, missing, or empty API keys
  • + *
  • Network Error Handling: Connection failures with authentication context
  • + *
  • TLS with Authentication: Secure connections with API key validation
  • + *
  • Parameter Validation: Required field validation in authenticated context
  • + *
+ * + *

The tests use {@link QdrantTestResourceAuthenticated} which provides a containerized + * Qdrant instance with authentication enabled. The container is configured with a test + * API key that can be accessed via {@code QdrantTestResourceAuthenticated.getApiKey()}.

+ * + *

Test Categories:

+ *
    + *
  • Authentication Tests: Verify correct API key handling and rejection of invalid credentials
  • + *
  • Network Tests: Ensure authentication errors are distinguished from network errors
  • + *
  • Parameter Tests: Validate that parameter validation works in authenticated environments
  • + *
+ * + *

Prerequisites:

+ *
    + *
  • Docker must be running on the test environment
  • + *
  • Testcontainers Qdrant dependency must be available
  • + *
  • Network access to pull Qdrant Docker image (qdrant/qdrant:v1.7.4)
  • + *
  • Container must support authentication configuration via environment variables
  • + *
+ * + *

Security Note: This test uses a predefined test API key + * ("secure-test-api-key-123") which is only suitable for testing environments and should + * never be used in production.

+ * + * @author Pranav Tiwari + * @since 1.0 + */ +@QuarkusTest +@QuarkusTestResource(value = QdrantTestResourceAuthenticated.class, restrictToAnnotatedClass = true) +class QdrantConnectionValidatorAuthIT { + + @Inject + QdrantConnectionValidator connectionValidator; + + @Test + @DisplayName("Should authenticate successfully with correct API key") + void shouldAuthenticateWithCorrectApiKey() { + QdrantContainer container = QdrantTestResourceAuthenticated.getContainer(); + + Awaitility.await() + .atMost(300, TimeUnit.SECONDS) + .until(container::isRunning); + + // Use the correct API key + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", container.getHost(), + "port", container.getMappedPort(6334), + "useTls", false, + "apiKey", QdrantTestResourceAuthenticated.getApiKey())); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertTrue(result.valid(), "Connection should succeed with correct API key"); + assertThat(result.message()).isNotNull(); + } + + @Test + @DisplayName("Should fail authentication with incorrect API key") + void shouldFailWithIncorrectApiKey() { + QdrantContainer container = QdrantTestResourceAuthenticated.getContainer(); + + Awaitility.await() + .atMost(300, TimeUnit.SECONDS) + .until(container::isRunning); + + // Use wrong API key + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", container.getHost(), + "port", container.getMappedPort(6334), + "useTls", false, + "apiKey", "wrong-api-key")); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection should fail with incorrect API key"); + assertThat(result.message()).containsAnyOf( + "Authentication failed", "UNAUTHENTICATED", "API key", "Permission denied"); + } + + @Test + @DisplayName("Should fail authentication without API key") + void shouldFailWithoutApiKeyWhenRequired() { + QdrantContainer container = QdrantTestResourceAuthenticated.getContainer(); + + Awaitility.await() + .atMost(300, TimeUnit.SECONDS) + .until(container::isRunning); + + // Don't provide API key + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", container.getHost(), + "port", container.getMappedPort(6334), + "useTls", false)); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection should fail without API key when auth is required"); + assertThat(result.message()).containsAnyOf( + "Authentication failed", "UNAUTHENTICATED", "API key", "Permission denied"); + } + + @Test + @DisplayName("Should fail authentication with empty API key") + void shouldFailWithEmptyApiKey() { + QdrantContainer container = QdrantTestResourceAuthenticated.getContainer(); + + Awaitility.await() + .atMost(300, TimeUnit.SECONDS) + .until(container::isRunning); + + // Provide empty API key + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", container.getHost(), + "port", container.getMappedPort(6334), + "useTls", false, + "apiKey", "")); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection should fail with empty API key"); + assertThat(result.message()).containsAnyOf( + "Authentication failed", "UNAUTHENTICATED", "API key", "Permission denied"); + } + + @Test + @DisplayName("Should handle network errors with authentication") + void shouldHandleNetworkErrorsWithAuth() { + QdrantContainer container = QdrantTestResourceAuthenticated.getContainer(); + + Awaitility.await() + .atMost(300, TimeUnit.SECONDS) + .until(container::isRunning); + + // Test with wrong port but correct API key + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", container.getHost(), + "port", 9999, // Wrong port + "useTls", false, + "apiKey", QdrantTestResourceAuthenticated.getApiKey())); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection should fail with wrong port"); + // Should be a connection error, not an auth error + assertThat(result.message()).containsAnyOf("timeout", "unavailable", "Failed to connect"); + } + + @Test + @DisplayName("Should handle TLS with authentication") + void shouldHandleTlsWithAuth() { + QdrantContainer container = QdrantTestResourceAuthenticated.getContainer(); + + Awaitility.await() + .atMost(300, TimeUnit.SECONDS) + .until(container::isRunning); + + // Test with TLS enabled and correct API key + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", container.getHost(), + "port", container.getMappedPort(6334), + "useTls", true, + "apiKey", QdrantTestResourceAuthenticated.getApiKey())); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + // This might fail due to TLS configuration, but should handle API key correctly + assertThat(result).isNotNull(); + assertThat(result.message()).isNotNull(); + // If it fails, it should be due to TLS, not authentication + if (!result.valid()) { + assertThat(result.message()).doesNotContainIgnoringCase("authentication"); + } + } + + // ========== PARAMETER VALIDATION TESTS WITH AUTH ========== + // These tests verify that parameter validation works even with auth enabled + + @Test + @DisplayName("Should fail validation when hostname is missing (with auth)") + void shouldFailValidationWithoutHostnameWithAuth() { + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "port", 6334, + "useTls", false, + "apiKey", QdrantTestResourceAuthenticated.getApiKey())); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Hostname must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when port is missing (with auth)") + void shouldFailValidationWithoutPortWithAuth() { + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", "localhost", + "useTls", false, + "apiKey", QdrantTestResourceAuthenticated.getApiKey())); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Port must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when connection config is null (with auth)") + void shouldFailValidationWithNullConnectionWithAuth() { + ConnectionValidationResult result = connectionValidator.validate(null); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Connection configuration cannot be null", result.message()); + } +} diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/QdrantConnectionValidatorIT.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/QdrantConnectionValidatorIT.java new file mode 100644 index 00000000..a6f3a10b --- /dev/null +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/QdrantConnectionValidatorIT.java @@ -0,0 +1,268 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.connection; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.qdrant.QdrantContainer; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +import io.debezium.platform.data.dto.ConnectionValidationResult; +import io.debezium.platform.data.model.ConnectionEntity; +import io.debezium.platform.domain.views.Connection; +import io.debezium.platform.environment.connection.destination.QdrantConnectionValidator; +import io.debezium.platform.environment.database.db.QdrantTestResource; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +/** + * Integration tests for QdrantConnectionValidator using Testcontainers WITHOUT authentication. + * + *

This test class validates the QdrantConnectionValidator functionality against + * a real Qdrant instance running in a Docker container WITHOUT authentication. + * It provides comprehensive testing of basic connection validation, network connectivity, + * and parameter handling in a non-authenticated environment.

+ * + *

Test scenarios covered:

+ *
    + *
  • Successful Connections: Valid hostname and port configurations
  • + *
  • Network Failures: Invalid hostnames, unreachable hosts, wrong ports
  • + *
  • Parameter Validation: Missing or invalid connection parameters
  • + *
  • Timeout Handling: Connection timeouts using non-routable IP addresses
  • + *
  • TLS Configuration: SSL/TLS connection attempts and error handling
  • + *
  • API Key Handling: Optional API key parameter processing (ignored by non-auth server)
  • + *
+ * + *

The tests use {@link QdrantTestResource} which provides a containerized Qdrant + * instance without authentication enabled. This makes it ideal for testing basic + * connection logic, parameter validation, and network error scenarios without the + * complexity of authentication setup.

+ * + *

Test Categories:

+ *
    + *
  • Connection Tests: Verify successful connections to running container
  • + *
  • Network Error Tests: Test various network failure scenarios
  • + *
  • Parameter Tests: Validate required and optional parameter handling
  • + *
  • Timeout Tests: Ensure proper timeout behavior for unreachable hosts
  • + *
+ * + *

Prerequisites:

+ *
    + *
  • Docker must be running on the test environment
  • + *
  • Testcontainers Qdrant dependency must be available
  • + *
  • Network access to pull Qdrant Docker image (qdrant/qdrant:v1.7.4)
  • + *
+ * + *

These tests are faster and more reliable than authenticated tests since they don't + * require complex container configuration, making them ideal for continuous integration + * and rapid feedback during development.

+ * + * @author Pranav Tiwari + * @since 1.0 + */ +@QuarkusTest +@QuarkusTestResource(value = QdrantTestResource.class, restrictToAnnotatedClass = true) +class QdrantConnectionValidatorIT { + + @Inject + QdrantConnectionValidator connectionValidator; + + @Test + @DisplayName("Should successfully validate connection with valid Qdrant configuration") + void shouldValidateSuccessfulConnection() { + QdrantContainer container = QdrantTestResource.getContainer(); + + Awaitility.await() + .atMost(300, TimeUnit.SECONDS) + .until(container::isRunning); + + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", container.getHost(), + "port", container.getMappedPort(6334), // gRPC port + "useTls", false)); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertTrue(result.valid(), "Connection validation should succeed"); + assertThat(result.message()).isNotNull(); + } + + @Test + @DisplayName("Should fail validation with wrong hostname") + void shouldFailValidationWithWrongHostname() { + QdrantContainer container = QdrantTestResource.getContainer(); + + Awaitility.await() + .atMost(300, TimeUnit.SECONDS) + .until(container::isRunning); + + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", "non-existent-host", + "port", container.getMappedPort(6334), + "useTls", false)); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection validation should fail"); + assertThat(result.message()).containsAnyOf("timeout", "unavailable", "Failed to connect"); + } + + @Test + @DisplayName("Should fail validation with wrong port") + void shouldFailValidationWithWrongPort() { + QdrantContainer container = QdrantTestResource.getContainer(); + + Awaitility.await() + .atMost(300, TimeUnit.SECONDS) + .until(container::isRunning); + + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", container.getHost(), + "port", 9999, // Wrong port + "useTls", false)); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection validation should fail"); + assertThat(result.message()).containsAnyOf("timeout", "unavailable", "Failed to connect"); + } + + @Test + @DisplayName("Should handle API key gracefully when server doesn't require authentication") + void shouldHandleApiKeyWithoutServerAuth() { + QdrantContainer container = QdrantTestResource.getContainer(); + + Awaitility.await() + .atMost(300, TimeUnit.SECONDS) + .until(container::isRunning); + + // Test with API key (container doesn't require authentication, so should succeed) + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", container.getHost(), + "port", container.getMappedPort(6334), + "useTls", false, + "apiKey", "unused-api-key")); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + // Should succeed since the container doesn't enforce authentication + assertTrue(result.valid(), "Connection validation should succeed with API key when server doesn't require auth"); + } + + @Test + @DisplayName("Should handle TLS configuration") + void shouldHandleTlsConfiguration() { + QdrantContainer container = QdrantTestResource.getContainer(); + + Awaitility.await() + .atMost(300, TimeUnit.SECONDS) + .until(container::isRunning); + + // Test with TLS enabled (should fail since container doesn't use TLS) + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", container.getHost(), + "port", container.getMappedPort(6334), + "useTls", true)); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + // This might fail because the container is not configured with TLS + // The important thing is that the validator handles the TLS parameter + assertThat(result).isNotNull(); + assertThat(result.message()).isNotNull(); + } + + // ========== PARAMETER VALIDATION TESTS ========== + + @Test + @DisplayName("Should fail validation when hostname is missing") + void shouldFailValidationWithoutHostname() { + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "port", 6334, + "useTls", false)); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Hostname must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when port is missing") + void shouldFailValidationWithoutPort() { + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", "localhost", + "useTls", false)); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Port must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when hostname is empty") + void shouldFailValidationWithEmptyHostname() { + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", "", + "port", 6334, + "useTls", false)); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Hostname must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when port is invalid") + void shouldFailValidationWithInvalidPort() { + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", "localhost", + "port", -1, + "useTls", false)); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Port must be between 1 and 65535", result.message()); + } + + @Test + @DisplayName("Should fail validation when connection config is null") + void shouldFailValidationWithNullConnection() { + ConnectionValidationResult result = connectionValidator.validate(null); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Connection configuration cannot be null", result.message()); + } + + @Test + @DisplayName("Should handle timeout scenarios gracefully") + void shouldHandleTimeoutScenarios() { + // Use a non-routable IP address to simulate timeout + Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.QDRANT, Map.of( + "hostname", "10.255.255.1", // Non-routable IP + "port", 6334, + "useTls", false)); + + ConnectionValidationResult result = connectionValidator.validate(connectionConfig); + + assertFalse(result.valid(), "Connection validation should fail"); + assertThat(result.message()).containsAnyOf( + "timeout", "Connection timeout", "Failed to connect", "unavailable"); + } +} \ No newline at end of file diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/QdrantConnectionValidatorTest.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/QdrantConnectionValidatorTest.java new file mode 100644 index 00000000..985b6122 --- /dev/null +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/QdrantConnectionValidatorTest.java @@ -0,0 +1,318 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.connection; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import io.debezium.platform.data.dto.ConnectionValidationResult; +import io.debezium.platform.data.model.ConnectionEntity; +import io.debezium.platform.domain.views.Connection; +import io.debezium.platform.environment.connection.destination.QdrantConnectionValidator; + +/** + * Unit tests for QdrantConnectionValidator. + * + *

This test class focuses on parameter validation, error handling scenarios, + * and business logic testing without requiring an actual Qdrant instance. It uses + * mock connections and invalid configurations to test various edge cases and + * validation rules.

+ * + *

Test coverage includes:

+ *
    + *
  • Parameter Validation: Required fields (hostname, port) and their constraints
  • + *
  • Invalid Configuration Handling: Malformed URLs, invalid ports, empty values
  • + *
  • Timeout Scenarios: Using non-routable IP addresses (10.255.255.1)
  • + *
  • API Key Validation: Authentication parameter handling and validation
  • + *
  • Connection Failure Scenarios: Network errors and error message validation
  • + *
  • Edge Cases: Null connections, boundary values, special characters
  • + *
+ * + *

Test Categories:

+ *
    + *
  • Parameter Tests: Validate required and optional parameter handling
  • + *
  • Validation Tests: Test input validation and constraint checking
  • + *
  • Error Handling Tests: Verify proper error messages and failure modes
  • + *
  • Timeout Tests: Test connection timeout behavior with unreachable hosts
  • + *
+ * + *

These tests are fast-running and don't require Docker or external dependencies, + * making them ideal for continuous integration and rapid feedback during development. + * They use the {@link TestConnectionView} helper class to create mock connection + * configurations for testing various scenarios.

+ * + *

Key Testing Techniques:

+ *
    + *
  • Non-routable IP addresses (10.255.255.1) for reliable timeout testing
  • + *
  • Boundary value testing for port numbers (0, 65536, negative values)
  • + *
  • Null and empty value testing for required parameters
  • + *
  • Invalid data type testing (strings for numeric fields)
  • + *
+ * + * @author Pranav Tiwari + * @since 1.0 + */ +class QdrantConnectionValidatorTest { + + public static final int DEFAULT_30_SECONDS_TIMEOUT = 30; + + private QdrantConnectionValidator validator; + + @BeforeEach + void setUp() { + validator = new QdrantConnectionValidator(DEFAULT_30_SECONDS_TIMEOUT); + } + + @Test + @DisplayName("Should fail validation when hostname is not provided") + void shouldFailValidationWithoutHostname() { + Map config = new HashMap<>(); + config.put("port", 6334); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Hostname must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when hostname is empty") + void shouldFailValidationWithEmptyHostname() { + Map config = new HashMap<>(); + config.put("hostname", ""); + config.put("port", 6334); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Hostname must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when hostname is null") + void shouldFailValidationWithNullHostname() { + Map config = new HashMap<>(); + config.put("hostname", null); + config.put("port", 6334); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Hostname must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when hostname is whitespace only") + void shouldFailValidationWithWhitespaceHostname() { + Map config = new HashMap<>(); + config.put("hostname", " "); + config.put("port", 6334); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Hostname must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when port is not provided") + void shouldFailValidationWithoutPort() { + Map config = new HashMap<>(); + config.put("hostname", "localhost"); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Port must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when port is null") + void shouldFailValidationWithNullPort() { + Map config = new HashMap<>(); + config.put("hostname", "localhost"); + config.put("port", null); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Port must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when port is invalid (negative)") + void shouldFailValidationWithNegativePort() { + Map config = new HashMap<>(); + config.put("hostname", "localhost"); + config.put("port", -1); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Port must be between 1 and 65535", result.message()); + } + + @Test + @DisplayName("Should fail validation when port is invalid (zero)") + void shouldFailValidationWithZeroPort() { + Map config = new HashMap<>(); + config.put("hostname", "localhost"); + config.put("port", 0); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Port must be between 1 and 65535", result.message()); + } + + @Test + @DisplayName("Should fail validation when port is too high") + void shouldFailValidationWithTooHighPort() { + Map config = new HashMap<>(); + config.put("hostname", "localhost"); + config.put("port", 65536); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Port must be between 1 and 65535", result.message()); + } + + @Test + @DisplayName("Should fail validation when port is not a number") + void shouldFailValidationWithInvalidPortType() { + Map config = new HashMap<>(); + config.put("hostname", "localhost"); + config.put("port", "not-a-number"); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Port must be a valid integer", result.message()); + } + + @Test + @DisplayName("Should fail validation when connection config is null") + void shouldFailValidationWithNullConnection() { + ConnectionValidationResult result = validator.validate(null); + + assertFalse(result.valid(), "Connection validation should fail"); + assertEquals("Connection configuration cannot be null", result.message()); + } + + @Test + @DisplayName("Should fail validation with invalid hostname") + void shouldFailValidationWithInvalidHostname() { + Map config = new HashMap<>(); + config.put("hostname", "invalid-host-that-does-not-exist"); + config.put("port", 6334); + config.put("useTls", false); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + Assertions.assertThat(result.message()).containsAnyOf( + "timeout", "Failed to connect", "unavailable", "Connection timeout"); + } + + @Test + @DisplayName("Should handle timeout scenarios gracefully") + void shouldHandleTimeoutScenarios() { + Map config = new HashMap<>(); + config.put("hostname", "10.255.255.1"); // Non-routable IP + config.put("port", 6334); + config.put("useTls", false); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Connection validation should fail"); + assertTrue(result.message().contains("timeout") || + result.message().contains("Failed to connect") || + result.message().contains("unavailable"), + "Error message should indicate timeout or connection failure"); + } + + @Test + @DisplayName("Should accept valid port values") + void shouldAcceptValidPortValues() { + // Test with minimum valid port + Map config1 = new HashMap<>(); + config1.put("hostname", "10.255.255.1"); // Will fail connection but pass validation + config1.put("port", 1); + config1.put("useTls", false); + Connection connection1 = new TestConnectionView(ConnectionEntity.Type.QDRANT, config1); + + ConnectionValidationResult result1 = validator.validate(connection1); + // Should fail due to connection, not parameter validation + assertFalse(result1.valid()); + Assertions.assertThat(result1.message()).doesNotContain("Port must be"); + + // Test with maximum valid port + Map config2 = new HashMap<>(); + config2.put("hostname", "10.255.255.1"); // Will fail connection but pass validation + config2.put("port", 65535); + config2.put("useTls", false); + Connection connection2 = new TestConnectionView(ConnectionEntity.Type.QDRANT, config2); + + ConnectionValidationResult result2 = validator.validate(connection2); + // Should fail due to connection, not parameter validation + assertFalse(result2.valid()); + Assertions.assertThat(result2.message()).doesNotContain("Port must be"); + } + + @Test + @DisplayName("Should handle string port values") + void shouldHandleStringPortValues() { + Map config = new HashMap<>(); + config.put("hostname", "10.255.255.1"); // Will fail connection but pass validation + config.put("port", "6334"); // String representation of valid port + config.put("useTls", false); + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + // Should fail due to connection, not parameter validation + assertFalse(result.valid()); + Assertions.assertThat(result.message()).doesNotContain("Port must be"); + } + + @Test + @DisplayName("Should handle optional parameters gracefully") + void shouldHandleOptionalParameters() { + Map config = new HashMap<>(); + config.put("hostname", "10.255.255.1"); // Will fail connection but pass validation + config.put("port", 6334); + // useTls and apiKey are optional, so not providing them should be fine + Connection connection = new TestConnectionView(ConnectionEntity.Type.QDRANT, config); + + ConnectionValidationResult result = validator.validate(connection); + + // Should fail due to connection, not parameter validation + assertFalse(result.valid()); + Assertions.assertThat(result.message()).doesNotContain("must be specified"); + } +} diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/QdrantTestResource.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/QdrantTestResource.java new file mode 100644 index 00000000..d83bdaeb --- /dev/null +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/QdrantTestResource.java @@ -0,0 +1,62 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.database.db; + +import java.util.Map; + +import org.testcontainers.qdrant.QdrantContainer; +import org.testcontainers.utility.DockerImageName; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +/** + * Test resource for Qdrant vector database using Testcontainers. + * + *

This class provides a containerized Qdrant instance WITHOUT authentication + * for integration testing. It manages the lifecycle of a Docker container running + * Qdrant server in non-authenticated mode, making it suitable for testing basic + * connection validation scenarios.

+ * + *

Key features:

+ *
    + *
  • No authentication required - server starts without API key validation
  • + *
  • Automatic container management with proper startup and shutdown
  • + *
  • Port mapping and configuration injection for test scenarios
  • + *
  • Compatible with Quarkus test resource lifecycle
  • + *
+ * + *

This resource is ideal for testing connection validation logic, parameter + * handling, and network connectivity without the complexity of authentication + * setup. It uses Qdrant v1.7.4 and exposes the standard HTTP port (6333) for + * client connections.

+ * + * @author Pranav Tiwari + * @since 1.0 + */ +public class QdrantTestResource implements QuarkusTestResourceLifecycleManager { + + private static final QdrantContainer QDRANT = new QdrantContainer( + DockerImageName.parse("qdrant/qdrant:v1.7.4")); + + public static QdrantContainer getContainer() { + return QDRANT; + } + + @Override + public Map start() { + QDRANT.start(); + + // Configure timeout for Qdrant connection validator + return Map.of( + "destinations.qdrant.connection.timeout", "30", + "test.qdrant.auth.enabled", "false"); + } + + @Override + public void stop() { + QDRANT.stop(); + } +} \ No newline at end of file diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/QdrantTestResourceAuthenticated.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/QdrantTestResourceAuthenticated.java new file mode 100644 index 00000000..22c260a9 --- /dev/null +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/QdrantTestResourceAuthenticated.java @@ -0,0 +1,78 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.database.db; + +import java.util.Map; + +import org.testcontainers.qdrant.QdrantContainer; +import org.testcontainers.utility.DockerImageName; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +/** + * Test resource for Qdrant vector database using Testcontainers WITH authentication. + * + *

This class provides a containerized Qdrant instance WITH API key authentication + * enabled for integration testing. It manages the lifecycle of a Docker container running + * Qdrant server in authenticated mode, making it suitable for testing secure connection + * validation scenarios that mirror production environments.

+ * + *

Key features:

+ *
    + *
  • API key authentication required - server starts with authentication enabled
  • + *
  • Configurable API key for testing different authentication scenarios
  • + *
  • Automatic container management with proper startup and shutdown
  • + *
  • Port mapping and configuration injection for authenticated test scenarios
  • + *
  • Compatible with Quarkus test resource lifecycle
  • + *
  • CORS enabled for web-based testing scenarios
  • + *
+ * + *

The container is configured with environment variables to enable authentication:

+ *
    + *
  • {@code QDRANT__SERVICE__API_KEY} - Sets the required API key
  • + *
  • {@code QDRANT__SERVICE__ENABLE_CORS} - Enables CORS for web clients
  • + *
+ * + *

The default API key used is "secure-test-api-key-123" which can be accessed + * via {@link #getApiKey()} method. This key is only suitable for testing environments + * and should never be used in production.

+ * + * @author Pranav Tiwari + * @since 1.0 + */ +public class QdrantTestResourceAuthenticated implements QuarkusTestResourceLifecycleManager { + + public static final String API_KEY = "secure-test-api-key-123"; + + private static final QdrantContainer QDRANT = new QdrantContainer( + DockerImageName.parse("qdrant/qdrant:v1.7.4")) + .withEnv("QDRANT__SERVICE__API_KEY", API_KEY) + .withEnv("QDRANT__SERVICE__ENABLE_CORS", "true"); + + public static QdrantContainer getContainer() { + return QDRANT; + } + + public static String getApiKey() { + return API_KEY; + } + + @Override + public Map start() { + QDRANT.start(); + + // Configure timeout and provide API key for tests + return Map.of( + "destinations.qdrant.connection.timeout", "30", + "test.qdrant.auth.enabled", "true", + "test.qdrant.api.key", API_KEY); + } + + @Override + public void stop() { + QDRANT.stop(); + } +} \ No newline at end of file