Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
fbb2172
debezium/dbz#1083: Scaffold Pulsar connection validator
pxcamus Mar 3, 2026
5926e4a
debezium/dbz#1083: Enhance Pulsar connection validator with `PulsarAd…
pxcamus Mar 9, 2026
e07a526
debezium/dbz#1083: Add integration tests for Apache Pulsar connection…
pxcamus Mar 10, 2026
96a691a
debezium/dbz#1083: Add support for multiple Pulsar authentication sch…
pxcamus Mar 11, 2026
f582600
debezium/dbz#1083: Introduce JWT-based authentication for Pulsar with…
pxcamus Mar 17, 2026
47c2a6e
debezium/dbz#1083: Add Javadoc for `PulsarAuthHandler` and `PulsarAdm…
pxcamus Mar 17, 2026
32142a1
debezium/dbz#1083: Add missing license headers to Pulsar-related file…
pxcamus Mar 17, 2026
de633d4
debezium/dbz#1083: Update dependency from to and adjust author anno…
pxcamus Mar 18, 2026
3e5d615
debezium/dbz#1083: Add validation for `authType` in `PulsarAuthHandle…
pxcamus Mar 18, 2026
c2e94bc
debezium/dbz#1083: Add integration tests for Pulsar connection valida…
pxcamus Mar 18, 2026
4055930
debezium/dbz#1083: Extend Pulsar connection validation tests to cover…
pxcamus Mar 19, 2026
8d9830c
debezium/dbz#1083: Add OAuth2-based authentication for Pulsar via `OA…
pxcamus Mar 30, 2026
3be0454
debezium/dbz#1083: Add minor code formatting adjustments, resolve imp…
pxcamus Mar 30, 2026
f2d060c
debezium/dbz#1083: Update license header in `JwtAuthHandler` to refer…
pxcamus Mar 31, 2026
99d1ebf
debezium/dbz#1083: Update error message in `PulsarConnectionValidator…
pxcamus Mar 31, 2026
2e25131
debezium/dbz#1083: Update error messages in Pulsar connection validat…
pxcamus Apr 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions debezium-platform-conductor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
<version.impsort>1.13.0</version.impsort>
<version.checkstyle.plugin>3.6.0</version.checkstyle.plugin>
<version.code.formatter>2.29.0</version.code.formatter>
<version.pulsar>4.1.3</version.pulsar>
<version.nats>2.17.6</version.nats>
<version.oras>0.3.1</version.oras>
<version.rabbitmq>5.20.0</version.rabbitmq>
Expand Down Expand Up @@ -439,6 +440,11 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>${version.pulsar}</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
Expand Down Expand Up @@ -526,6 +532,12 @@
<artifactId>testcontainers-mssqlserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-pulsar</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-rabbitmq</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.environment.connection.destination;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Named;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.platform.data.dto.ConnectionValidationResult;
import io.debezium.platform.domain.views.Connection;
import io.debezium.platform.environment.connection.ConnectionValidator;
import io.debezium.platform.environment.connection.destination.pulsar.PulsarAdminProvider;
import io.debezium.platform.environment.connection.destination.pulsar.PulsarAuthHandler;
import io.debezium.platform.environment.connection.destination.pulsar.PulsarAuthHandlerFactory;

@Named("APACHE_PULSAR")
@ApplicationScoped
public class PulsarConnectionValidator implements ConnectionValidator {

private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConnectionValidator.class);

private final PulsarAuthHandlerFactory authHandlerFactory;
private final PulsarAdminProvider pulsarAdminProvider;
private final int defaultConnectionTimeout;

private static final String SERVICE_HTTP_URL_KEY = "serviceHttpUrl";
private static final String AUTH_SCHEME_KEY = "authScheme";
public static final String NO_AUTH_SCHEME = "none";

public PulsarConnectionValidator(@ConfigProperty(name = "destinations.pulsar.connection.timeout") int defaultConnectionTimeout,
PulsarAuthHandlerFactory authHandlerFactory,
PulsarAdminProvider pulsarAdminProvider) {
this.defaultConnectionTimeout = defaultConnectionTimeout;
this.authHandlerFactory = authHandlerFactory;
this.pulsarAdminProvider = pulsarAdminProvider;
}

@Override
public ConnectionValidationResult validate(Connection connectionConfig) {
if (connectionConfig == null) {
return ConnectionValidationResult.failed("Connection configuration cannot be null");
}

try {
LOGGER.debug("Starting Pulsar connection validation for connection: {}", connectionConfig.getName());

Map<String, Object> pulsarConfig = connectionConfig.getConfig();

if (!pulsarConfig.containsKey(SERVICE_HTTP_URL_KEY) ||
pulsarConfig.get(SERVICE_HTTP_URL_KEY) == null ||
pulsarConfig.get(SERVICE_HTTP_URL_KEY).toString().trim().isEmpty()) {
return ConnectionValidationResult.failed("Service HTTP URL must be specified");
}

// Set a reasonable timeout for validation
pulsarConfig.put("connectionTimeoutMs", defaultConnectionTimeout);

String authScheme = pulsarConfig.getOrDefault(AUTH_SCHEME_KEY, NO_AUTH_SCHEME).toString();
LOGGER.debug("Configuring Pulsar authentication with scheme: {}", authScheme);
PulsarAuthHandler authHandler = authHandlerFactory.getAuthHandler(authScheme);
authHandler.validate(pulsarConfig);

PulsarAdminBuilder builder = pulsarAdminProvider
.builder()
.serviceHttpUrl(pulsarConfig.get(SERVICE_HTTP_URL_KEY).toString());

authHandler.configure(builder, pulsarConfig);

try (PulsarAdmin admin = builder.build()) {
admin.clusters().getClusters();
LOGGER.debug("Pulsar connection validation successful for: {}", connectionConfig.getName());
return ConnectionValidationResult.successful();
}
}
catch (IllegalArgumentException e) {
LOGGER.warn("Invalid Pulsar configuration: {}", e.getMessage());
return ConnectionValidationResult.failed(e.getMessage());
}
catch (PulsarAdminException.TimeoutException e) {
LOGGER.warn("Timeout during Pulsar connection validation", e);
return ConnectionValidationResult.failed(
"Connection timeout - please check the Pulsar admin URL and network connectivity");
}
catch (PulsarAdminException.NotAuthorizedException e) {
LOGGER.warn("Authorization failed during Pulsar connection validation", e);
return ConnectionValidationResult.failed(
"Authorization failed - please check Pulsar credentials and permissions");
}
catch (PulsarAdminException.NotFoundException e) {
LOGGER.warn("Pulsar admin endpoint or resource not found", e);
return ConnectionValidationResult.failed(
"Pulsar admin endpoint not found - please check the service HTTP URL");
}
catch (PulsarAdminException.ConnectException e) {
LOGGER.warn("Unable to connect to Pulsar admin endpoint", e);
return ConnectionValidationResult.failed(
"Unable to connect - please check the Pulsar admin URL and network connectivity");
}
catch (PulsarAdminException e) {
LOGGER.warn("Pulsar-specific error during validation", e);
return ConnectionValidationResult.failed("Pulsar connection error: " + e.getMessage());
}
catch (Exception e) {
LOGGER.error("Unexpected error during Pulsar connection validation", e);
return ConnectionValidationResult.failed("Unexpected error: " + e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.environment.connection.destination.pulsar;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Named;

import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;

@Named("BASIC")
@ApplicationScoped
public class BasicAuthHandler implements PulsarAuthHandler {
@Override
public void configure(PulsarAdminBuilder builder, Map<String, Object> config) {
String username = (String) config.get("username");
String password = (String) config.get("password");

AuthenticationBasic auth = new AuthenticationBasic();
String authConfig = String.format(
"{\"userId\":\"%s\",\"password\":\"%s\"}",
username, password);
auth.configure(authConfig);

builder.authentication(auth);
}

@Override
public void validate(Map<String, Object> config) throws IllegalArgumentException {
if (isConfigValueMissing(config, "username") || isConfigValueMissing(config, "password")) {
throw new IllegalArgumentException("invalid or missing credentials for basic auth");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.environment.connection.destination.pulsar;

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;

@ApplicationScoped
public class DefaultPulsarAdminProvider implements PulsarAdminProvider {
@Override
public PulsarAdminBuilder builder() {
return PulsarAdmin.builder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.environment.connection.destination.pulsar;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Named;

import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("JWT")
@ApplicationScoped
public class JwtAuthHandler implements PulsarAuthHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(JwtAuthHandler.class);

@Override
public void configure(PulsarAdminBuilder builder, Map<String, Object> config) {
String jwtToken = (String) config.get("jwtToken");

AuthenticationToken auth = new AuthenticationToken();
auth.configure(jwtToken);
builder.authentication(auth);
}

@Override
public void validate(Map<String, Object> config) throws IllegalArgumentException {
if (isConfigValueMissing(config, "jwtToken")) {
throw new IllegalArgumentException("JWT token is missing or blank");
}

String jwtToken = config.get("jwtToken").toString();

// Basic JWT validation: 3 parts separated by '.'
String[] parts = jwtToken.split("\\.");
if (parts.length != 3) {
throw new IllegalArgumentException("JWT token must consist of exactly 3 Base64URL-encoded parts separated by '.'");
}

// Check parts are not empty
for (String part : parts) {
if (part.isEmpty()) {
throw new IllegalArgumentException("JWT token parts must not be empty");
}
}

// We do not have access to the key to verify the signature (yet), so doing basic checks
byte[] headerBytes = decodeBase64Url(parts[0], "header");
byte[] payloadBytes = decodeBase64Url(parts[1], "payload");

JSONObject header = parseJson(headerBytes, "header");
JSONObject claims = parseJson(payloadBytes, "payload");

validateHeader(header);
// Check if exp claim exists and is valid
// bin/pulsar tokens create ... --expiry-time 1y
validateClaims(claims);
// TODO: Implement signature verification when key management is available

}

private byte[] decodeBase64Url(String part, String partName) throws IllegalArgumentException {
try {
return Base64.getUrlDecoder().decode(part);
}
catch (IllegalArgumentException e) {
LOGGER.warn("JWT token {} part is not valid Base64URL: {}", partName, e.getMessage());
throw new IllegalArgumentException("JWT token " + partName + " must be a valid Base64URL-encoded value", e);
}
}

private JSONObject parseJson(byte[] bytes, String partName) throws IllegalArgumentException {
try {
return new JSONObject(new String(bytes, StandardCharsets.UTF_8));
}
catch (JSONException e) {
LOGGER.warn("JWT token {} part decoded successfully but is not valid JSON: {}", partName, e.getMessage());
throw new IllegalArgumentException("JWT token " + partName + " must decode to a valid JSON object", e);
}
}

private void validateHeader(JSONObject header) throws IllegalArgumentException {
if (!header.has("alg")) {
LOGGER.warn("JWT token header is missing the required 'alg' field");
throw new IllegalArgumentException("JWT token header must contain the 'alg' field");
}
}

private void validateClaims(JSONObject claims) throws IllegalArgumentException {
if (claims.has("exp")) {
try {
long expTimestamp = claims.getLong("exp");
if (expTimestamp * 1000 <= System.currentTimeMillis()) {
LOGGER.warn("JWT token has expired (exp={})", expTimestamp);
throw new IllegalArgumentException("JWT token has expired");
}
}
catch (JSONException e) {
LOGGER.warn("JWT token 'exp' claim is not a valid number: {}", e.getMessage());
throw new IllegalArgumentException("JWT token 'exp' claim must be a valid numeric timestamp", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.environment.connection.destination.pulsar;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Named;

import org.apache.pulsar.client.admin.PulsarAdminBuilder;

@Named("NONE")
@ApplicationScoped
public class NoAuthHandler implements PulsarAuthHandler {
@Override
public void configure(PulsarAdminBuilder builder, Map<String, Object> config) {
// Nothing to configure
}

@Override
public void validate(Map<String, Object> config) throws IllegalArgumentException {
// Nothing to validate
}
}
Loading
Loading