Skip to content

DSP-24890 Adds OIDC support #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ pipeline {
}

environment {
OS_VERSION = 'ubuntu/bionic64/java-driver'
OS_VERSION = 'ubuntu/focal64/java-driver'
JABBA_SHELL = '/usr/lib/jabba/jabba.sh'
JABBA_VERSION = '1.8'
JABBA_VERSION = '[email protected]'
CCM_ENVIRONMENT_SHELL = '/usr/local/bin/ccm_environment.sh'
// always run long tests when generating the distribution tarball
RUN_LONG_TESTS = "${params.RUN_LONG_TESTS || params.GENERATE_DISTRO}"
Expand Down
4 changes: 4 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.datastax.db</groupId>
<artifactId>db-advanced-auth-client-plugin</artifactId>
Copy link
Preview

Copilot AI Jul 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency lacks a <version> element. Ensure you declare its version here or in a dependencyManagement section to maintain build reproducibility.

Suggested change
<artifactId>db-advanced-auth-client-plugin</artifactId>
<artifactId>db-advanced-auth-client-plugin</artifactId>
<version>1.0.0</version>

Copilot uses AI. Check for mistakes.

Copy link
Preview

Copilot AI Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new db-advanced-auth-client-plugin dependency is missing a <version> declaration, which may cause build failures. Please add the appropriate ${datastax.advanced-auth-client-plugin.version} or explicit version.

Suggested change
<artifactId>db-advanced-auth-client-plugin</artifactId>
<artifactId>db-advanced-auth-client-plugin</artifactId>
<version>${datastax.advanced-auth-client-plugin.version}</version>

Copilot uses AI. Check for mistakes.

</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.types.Password;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,7 +43,13 @@ public class AuthenticatorConfig extends AbstractConfig {
public static final String PASSWORD_OPT = "auth.password";
public static final String KEYTAB_OPT = "auth.gssapi.keyTab";
public static final String PRINCIPAL_OPT = "auth.gssapi.principal";
public static final String SERVICE_OPT = "auth.gssapi.service";
public static final String SERVICE_OPT = "auth.gssapi.cservice";
public static final String OIDC_ISSUER_OPT = "auth.oidc.issuer";
public static final String OIDC_CLIENT_ID_OPT = "auth.oidc.client_id";
public static final String OIDC_CLIENT_SECRET_OPT = "auth.oidc.client_secret";
public static final String OIDC_USE_TLS_OPT = "auth.oidc.use_tls";
public static final String OIDC_TRUSTSTORE_PATH_OPT = "auth.oidc.truststore_path";
public static final String OIDC_TRUSTSTORE_PASSWORD_OPT = "auth.oidc.truststore_password";

private static final Logger log = LoggerFactory.getLogger(AuthenticatorConfig.class);
public static final ConfigDef CONFIG_DEF =
Expand All @@ -48,7 +58,7 @@ public class AuthenticatorConfig extends AbstractConfig {
PROVIDER_OPT,
ConfigDef.Type.STRING,
"None",
ConfigDef.ValidString.in("None", "PLAIN", "GSSAPI"),
ConfigDef.ValidString.in("None", "PLAIN", "GSSAPI", "OIDC"),
ConfigDef.Importance.HIGH,
"Authentication provider")
.define(
Expand Down Expand Up @@ -80,7 +90,43 @@ public class AuthenticatorConfig extends AbstractConfig {
ConfigDef.Type.STRING,
"dse",
ConfigDef.Importance.HIGH,
"SASL service name to use for GSSAPI provider authentication");
"SASL service name to use for GSSAPI provider authentication")
.define(
OIDC_ISSUER_OPT,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"OIDC Authentication server url")
.define(
OIDC_CLIENT_ID_OPT,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"OIDC Client Id to use for authentication")
.define(
OIDC_CLIENT_SECRET_OPT,
ConfigDef.Type.PASSWORD,
"",
ConfigDef.Importance.HIGH,
"OIDC Client Secret to use for authentication")
.define(
OIDC_USE_TLS_OPT,
ConfigDef.Type.BOOLEAN,
true,
ConfigDef.Importance.HIGH,
"Set to false to disable TLS encrypted connections. Defaults to true.")
.define(
OIDC_TRUSTSTORE_PATH_OPT,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
"Path to the truststore file containing the OIDC issuer's certificate.")
.define(
OIDC_TRUSTSTORE_PASSWORD_OPT,
ConfigDef.Type.PASSWORD,
null,
ConfigDef.Importance.HIGH,
"Truststore file password containing the OIDC issuer's certificate.");

@Nullable private final Path keyTabPath;

Expand All @@ -101,9 +147,28 @@ public AuthenticatorConfig(Map<String, String> authSettings) {
if (getService().isEmpty()) {
throw new ConfigException(SERVICE_OPT, "<empty>", "is required");
}

assertAccessibleFile(keyTabPath, KEYTAB_OPT);
}

if (provider == Provider.OIDC) {
if (getString(OIDC_ISSUER_OPT).isEmpty()) {
throw new ConfigException(OIDC_ISSUER_OPT, "<empty>", "is required");
}

try {
new URI(getString(OIDC_ISSUER_OPT));
} catch (URISyntaxException e) {
throw new ConfigException(
OIDC_ISSUER_OPT, getString(OIDC_ISSUER_OPT), "is not a valid URI: " + e.getMessage());
}

if (getString(OIDC_CLIENT_ID_OPT).isEmpty()) {
throw new ConfigException(OIDC_CLIENT_ID_OPT, "<empty>", "is required");
}
if (getPassword(OIDC_CLIENT_SECRET_OPT).value().isEmpty()) {
throw new ConfigException(OIDC_CLIENT_SECRET_OPT, "<empty>", "is required");
}
}
}

/**
Expand Down Expand Up @@ -139,6 +204,13 @@ private static Map<String, String> sanitizeAuthSettings(Map<String, String> auth
mutated.put(PRINCIPAL_OPT, getPrincipalFromKeyTab(keyTabPath.toString()));
}
}

if ("OIDC".equals(provider)) {
if (!mutated.containsKey(OIDC_USE_TLS_OPT) || mutated.get(OIDC_USE_TLS_OPT).isEmpty()) {
mutated.put(OIDC_USE_TLS_OPT, "true");
}
}

return ImmutableMap.<String, String>builder().putAll(mutated).build();
}

Expand Down Expand Up @@ -191,7 +263,7 @@ public Provider getProvider() {
return Provider.valueOf(providerString);
} catch (IllegalArgumentException e) {
throw new ConfigException(
PROVIDER_OPT, providerString, "valid values are None, PLAIN, GSSAPI");
PROVIDER_OPT, providerString, "valid values are None, PLAIN, GSSAPI, OIDC");
}
}

Expand All @@ -216,6 +288,35 @@ public String getService() {
return getString(SERVICE_OPT);
}

public URI getOIDCIssuer() {
try {
return new URI(getString(OIDC_ISSUER_OPT));
} catch (URISyntaxException e) {
// This should never happen because we validate the URI in the constructor.
return null;
Comment on lines +291 to +296
Copy link
Preview

Copilot AI Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Returning null from getOIDCIssuer() can lead to NPEs downstream. Consider throwing an unchecked exception or using Optional<URI> to clearly signal failure.

Suggested change
public URI getOIDCIssuer() {
try {
return new URI(getString(OIDC_ISSUER_OPT));
} catch (URISyntaxException e) {
// This should never happen because we validate the URI in the constructor.
return null;
public Optional<URI> getOIDCIssuer() {
try {
return Optional.of(new URI(getString(OIDC_ISSUER_OPT)));
} catch (URISyntaxException e) {
// This should never happen because we validate the URI in the constructor.
return Optional.empty();

Copilot uses AI. Check for mistakes.

}
}

public String getOIDCClientId() {
return getString(OIDC_CLIENT_ID_OPT);
}

public String getOIDCClientSecret() {
return getPassword(OIDC_CLIENT_SECRET_OPT).value();
}

public boolean getOIDCUseTLS() {
return getBoolean(OIDC_USE_TLS_OPT);
}

public Path getOIDCTruststorePath() {
return getFilePath(getString(OIDC_TRUSTSTORE_PATH_OPT));
}

public Password getOIDCTruststorePassword() {
return getPassword(OIDC_TRUSTSTORE_PASSWORD_OPT);
}

@Override
public String toString() {
return configToString(
Expand All @@ -226,12 +327,16 @@ public String toString() {
PASSWORD_OPT,
KEYTAB_OPT,
PRINCIPAL_OPT,
SERVICE_OPT);
SERVICE_OPT,
OIDC_ISSUER_OPT,
OIDC_CLIENT_ID_OPT,
OIDC_CLIENT_SECRET_OPT);
}

public enum Provider {
None,
PLAIN,
GSSAPI
GSSAPI,
OIDC
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.SSL_TRUSTSTORE_PATH;

import com.codahale.metrics.MetricRegistry;
import com.datastax.db.driver.api.plugin.auth.OIDCClientCredentialsAuthProvider;
import com.datastax.db.driver.api.plugin.config.OIDCDriverOption;
import com.datastax.dse.driver.api.core.config.DseDriverOption;
import com.datastax.dse.driver.internal.core.auth.DseGssApiAuthProvider;
import com.datastax.oss.common.sink.AbstractSinkTask;
Expand Down Expand Up @@ -620,11 +622,32 @@ private static void processAuthenticatorConfig(
.withStringMap(
AUTH_PROVIDER_SASL_PROPERTIES, ImmutableMap.of("javax.security.sasl.qop", "auth"))
.withStringMap(DseDriverOption.AUTH_PROVIDER_LOGIN_CONFIGURATION, loginConfig);
} else if (authConfig.getProvider() == AuthenticatorConfig.Provider.OIDC) {
configLoaderBuilder
.withClass(AUTH_PROVIDER_CLASS, OIDCClientCredentialsAuthProvider.class)
.withString(
OIDCDriverOption.AUTH_PROVIDER_OIDC_ISSUER, authConfig.getOIDCIssuer().toString())
.withString(OIDCDriverOption.AUTH_PROVIDER_OIDC_CLIENT_ID, authConfig.getOIDCClientId())
.withString(
OIDCDriverOption.AUTH_PROVIDER_OIDC_CLIENT_SECRET, authConfig.getOIDCClientSecret())
.withBoolean(OIDCDriverOption.AUTH_PROVIDER_OIDC_USE_TLS, authConfig.getOIDCUseTLS());

if (authConfig.getOIDCTruststorePath() != null) {
configLoaderBuilder.withString(
OIDCDriverOption.AUTH_PROVIDER_OIDC_TRUSTSTORE_PATH,
authConfig.getOIDCTruststorePath().toString());

if (authConfig.getOIDCTruststorePassword() != null) {
configLoaderBuilder.withString(
OIDCDriverOption.AUTH_PROVIDER_OIDC_TRUSTSTORE_PASSWORD,
authConfig.getOIDCTruststorePassword().value());
}
}
}
}

/**
* Prepare insert or update (depending on whether or not the table is a COUNTER table), and delete
* Prepare insert or update (depending on whether the table is a COUNTER table), and delete
* statements asynchronously.
*
* @param session the session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ void should_error_invalid_provider() {
.isInstanceOf(org.apache.kafka.common.config.ConfigException.class)
.hasMessage(
String.format(
"Invalid value foo for configuration %s: String must be one of: None, PLAIN, GSSAPI",
"Invalid value foo for configuration %s: String must be one of: None, PLAIN, GSSAPI, OIDC",
PROVIDER_OPT));
}

Expand Down
45 changes: 19 additions & 26 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<java.release.version>8</java.release.version>
<java.version>11</java.version>
<java.release.version>11</java.release.version>
<kafka.connect.version>2.4.0</kafka.connect.version>
<caffeine.version>2.6.2</caffeine.version>
<oss.driver.version>4.16.0</oss.driver.version>
<datastax.advanced-auth-client-plugin.version>1.0.1-SNAPSHOT</datastax.advanced-auth-client-plugin.version>
<dsbulk.version>1.10.0</dsbulk.version>
<reactive-streams.version>1.0.3</reactive-streams.version>
<guava.version>25.1-jre</guava.version>
Expand Down Expand Up @@ -155,6 +156,11 @@
<artifactId>java-driver-shaded-guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.db</groupId>
<artifactId>db-advanced-auth-client-plugin</artifactId>
<version>${datastax.advanced-auth-client-plugin.version}</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
Expand Down Expand Up @@ -286,26 +292,6 @@ limitations under the License.]]></inlineHeader>
<artifactId>license-maven-plugin</artifactId>
<version>1.14</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-maven-plugin</artifactId>
<version>1.16</version>
<executions>
<execution>
<id>check-jdk8</id>
<goals>
<goal>check</goal>
</goals>
<configuration>
<signature>
<groupId>org.codehaus.mojo.signature</groupId>
<artifactId>java18</artifactId>
<version>1.0</version>
</signature>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand Down Expand Up @@ -430,10 +416,6 @@ limitations under the License.]]></inlineHeader>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-maven-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
Expand Down Expand Up @@ -646,6 +628,17 @@ limitations under the License.]]></inlineHeader>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>datastax-snapshots-local</id>
<name>DataStax Local Snapshots</name>
<url>https://repo.aws.dsinternal.org/artifactory/datastax-snapshots-local/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<distributionManagement>
<!-- releases go to Maven central -->
Expand Down
9 changes: 1 addition & 8 deletions text/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<artifactId>dsbulk-codecs</artifactId>
<groupId>com.datastax.oss</groupId>
<version>1.10.0</version>
<relativePath />
<relativePath/>
</parent>
<groupId>com.datastax.oss</groupId>
<version>1.0.16-SNAPSHOT</version>
Expand Down Expand Up @@ -120,13 +120,6 @@
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-maven-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>