diff --git a/Jenkinsfile b/Jenkinsfile
index e7cb6c7..302e86f 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -204,9 +204,9 @@ pipeline {
}
environment {
- OS_VERSION = 'ubuntu/bionic64/java-driver'
+ OS_VERSION = 'ubuntu/focal64/java-driver'
JABBA_SHELL = '/usr/lib/jabba/jabba.sh'
- JABBA_VERSION = '1.8'
+ JABBA_VERSION = 'openjdk@1.11'
CCM_ENVIRONMENT_SHELL = '/usr/local/bin/ccm_environment.sh'
// always run long tests when generating the distribution tarball
RUN_LONG_TESTS = "${params.RUN_LONG_TESTS || params.GENERATE_DISTRO}"
diff --git a/common/pom.xml b/common/pom.xml
index dc347df..cc113f6 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -69,6 +69,10 @@
+
+ com.datastax.db
+ db-advanced-auth-client-plugin
+
org.apache.kafka
kafka-clients
diff --git a/common/src/main/java/com/datastax/oss/common/sink/config/AuthenticatorConfig.java b/common/src/main/java/com/datastax/oss/common/sink/config/AuthenticatorConfig.java
index 3514630..8db45d7 100644
--- a/common/src/main/java/com/datastax/oss/common/sink/config/AuthenticatorConfig.java
+++ b/common/src/main/java/com/datastax/oss/common/sink/config/AuthenticatorConfig.java
@@ -24,11 +24,15 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +43,13 @@ public class AuthenticatorConfig extends AbstractConfig {
public static final String PASSWORD_OPT = "auth.password";
public static final String KEYTAB_OPT = "auth.gssapi.keyTab";
public static final String PRINCIPAL_OPT = "auth.gssapi.principal";
- public static final String SERVICE_OPT = "auth.gssapi.service";
+ public static final String SERVICE_OPT = "auth.gssapi.cservice";
+ public static final String OIDC_ISSUER_OPT = "auth.oidc.issuer";
+ public static final String OIDC_CLIENT_ID_OPT = "auth.oidc.client_id";
+ public static final String OIDC_CLIENT_SECRET_OPT = "auth.oidc.client_secret";
+ public static final String OIDC_USE_TLS_OPT = "auth.oidc.use_tls";
+ public static final String OIDC_TRUSTSTORE_PATH_OPT = "auth.oidc.truststore_path";
+ public static final String OIDC_TRUSTSTORE_PASSWORD_OPT = "auth.oidc.truststore_password";
private static final Logger log = LoggerFactory.getLogger(AuthenticatorConfig.class);
public static final ConfigDef CONFIG_DEF =
@@ -48,7 +58,7 @@ public class AuthenticatorConfig extends AbstractConfig {
PROVIDER_OPT,
ConfigDef.Type.STRING,
"None",
- ConfigDef.ValidString.in("None", "PLAIN", "GSSAPI"),
+ ConfigDef.ValidString.in("None", "PLAIN", "GSSAPI", "OIDC"),
ConfigDef.Importance.HIGH,
"Authentication provider")
.define(
@@ -80,7 +90,43 @@ public class AuthenticatorConfig extends AbstractConfig {
ConfigDef.Type.STRING,
"dse",
ConfigDef.Importance.HIGH,
- "SASL service name to use for GSSAPI provider authentication");
+ "SASL service name to use for GSSAPI provider authentication")
+ .define(
+ OIDC_ISSUER_OPT,
+ ConfigDef.Type.STRING,
+ "",
+ ConfigDef.Importance.HIGH,
+ "OIDC Authentication server url")
+ .define(
+ OIDC_CLIENT_ID_OPT,
+ ConfigDef.Type.STRING,
+ "",
+ ConfigDef.Importance.HIGH,
+ "OIDC Client Id to use for authentication")
+ .define(
+ OIDC_CLIENT_SECRET_OPT,
+ ConfigDef.Type.PASSWORD,
+ "",
+ ConfigDef.Importance.HIGH,
+ "OIDC Client Secret to use for authentication")
+ .define(
+ OIDC_USE_TLS_OPT,
+ ConfigDef.Type.BOOLEAN,
+ true,
+ ConfigDef.Importance.HIGH,
+ "Set to false to disable TLS encrypted connections. Defaults to true.")
+ .define(
+ OIDC_TRUSTSTORE_PATH_OPT,
+ ConfigDef.Type.STRING,
+ "",
+ ConfigDef.Importance.HIGH,
+ "Path to the truststore file containing the OIDC issuer's certificate.")
+ .define(
+ OIDC_TRUSTSTORE_PASSWORD_OPT,
+ ConfigDef.Type.PASSWORD,
+ null,
+ ConfigDef.Importance.HIGH,
+ "Truststore file password containing the OIDC issuer's certificate.");
@Nullable private final Path keyTabPath;
@@ -101,9 +147,28 @@ public AuthenticatorConfig(Map authSettings) {
if (getService().isEmpty()) {
throw new ConfigException(SERVICE_OPT, "", "is required");
}
-
assertAccessibleFile(keyTabPath, KEYTAB_OPT);
}
+
+ if (provider == Provider.OIDC) {
+ if (getString(OIDC_ISSUER_OPT).isEmpty()) {
+ throw new ConfigException(OIDC_ISSUER_OPT, "", "is required");
+ }
+
+ try {
+ new URI(getString(OIDC_ISSUER_OPT));
+ } catch (URISyntaxException e) {
+ throw new ConfigException(
+ OIDC_ISSUER_OPT, getString(OIDC_ISSUER_OPT), "is not a valid URI: " + e.getMessage());
+ }
+
+ if (getString(OIDC_CLIENT_ID_OPT).isEmpty()) {
+ throw new ConfigException(OIDC_CLIENT_ID_OPT, "", "is required");
+ }
+ if (getPassword(OIDC_CLIENT_SECRET_OPT).value().isEmpty()) {
+ throw new ConfigException(OIDC_CLIENT_SECRET_OPT, "", "is required");
+ }
+ }
}
/**
@@ -139,6 +204,13 @@ private static Map sanitizeAuthSettings(Map auth
mutated.put(PRINCIPAL_OPT, getPrincipalFromKeyTab(keyTabPath.toString()));
}
}
+
+ if ("OIDC".equals(provider)) {
+ if (!mutated.containsKey(OIDC_USE_TLS_OPT) || mutated.get(OIDC_USE_TLS_OPT).isEmpty()) {
+ mutated.put(OIDC_USE_TLS_OPT, "true");
+ }
+ }
+
return ImmutableMap.builder().putAll(mutated).build();
}
@@ -191,7 +263,7 @@ public Provider getProvider() {
return Provider.valueOf(providerString);
} catch (IllegalArgumentException e) {
throw new ConfigException(
- PROVIDER_OPT, providerString, "valid values are None, PLAIN, GSSAPI");
+ PROVIDER_OPT, providerString, "valid values are None, PLAIN, GSSAPI, OIDC");
}
}
@@ -216,6 +288,35 @@ public String getService() {
return getString(SERVICE_OPT);
}
+ public URI getOIDCIssuer() {
+ try {
+ return new URI(getString(OIDC_ISSUER_OPT));
+ } catch (URISyntaxException e) {
+ // This should never happen because we validate the URI in the constructor.
+ return null;
+ }
+ }
+
+ public String getOIDCClientId() {
+ return getString(OIDC_CLIENT_ID_OPT);
+ }
+
+ public String getOIDCClientSecret() {
+ return getPassword(OIDC_CLIENT_SECRET_OPT).value();
+ }
+
+ public boolean getOIDCUseTLS() {
+ return getBoolean(OIDC_USE_TLS_OPT);
+ }
+
+ public Path getOIDCTruststorePath() {
+ return getFilePath(getString(OIDC_TRUSTSTORE_PATH_OPT));
+ }
+
+ public Password getOIDCTruststorePassword() {
+ return getPassword(OIDC_TRUSTSTORE_PASSWORD_OPT);
+ }
+
@Override
public String toString() {
return configToString(
@@ -226,12 +327,16 @@ public String toString() {
PASSWORD_OPT,
KEYTAB_OPT,
PRINCIPAL_OPT,
- SERVICE_OPT);
+ SERVICE_OPT,
+ OIDC_ISSUER_OPT,
+ OIDC_CLIENT_ID_OPT,
+ OIDC_CLIENT_SECRET_OPT);
}
public enum Provider {
None,
PLAIN,
- GSSAPI
+ GSSAPI,
+ OIDC
}
}
diff --git a/common/src/main/java/com/datastax/oss/common/sink/state/LifeCycleManager.java b/common/src/main/java/com/datastax/oss/common/sink/state/LifeCycleManager.java
index 876e78a..df576c7 100644
--- a/common/src/main/java/com/datastax/oss/common/sink/state/LifeCycleManager.java
+++ b/common/src/main/java/com/datastax/oss/common/sink/state/LifeCycleManager.java
@@ -31,6 +31,8 @@
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.SSL_TRUSTSTORE_PATH;
import com.codahale.metrics.MetricRegistry;
+import com.datastax.db.driver.api.plugin.auth.OIDCClientCredentialsAuthProvider;
+import com.datastax.db.driver.api.plugin.config.OIDCDriverOption;
import com.datastax.dse.driver.api.core.config.DseDriverOption;
import com.datastax.dse.driver.internal.core.auth.DseGssApiAuthProvider;
import com.datastax.oss.common.sink.AbstractSinkTask;
@@ -620,11 +622,32 @@ private static void processAuthenticatorConfig(
.withStringMap(
AUTH_PROVIDER_SASL_PROPERTIES, ImmutableMap.of("javax.security.sasl.qop", "auth"))
.withStringMap(DseDriverOption.AUTH_PROVIDER_LOGIN_CONFIGURATION, loginConfig);
+ } else if (authConfig.getProvider() == AuthenticatorConfig.Provider.OIDC) {
+ configLoaderBuilder
+ .withClass(AUTH_PROVIDER_CLASS, OIDCClientCredentialsAuthProvider.class)
+ .withString(
+ OIDCDriverOption.AUTH_PROVIDER_OIDC_ISSUER, authConfig.getOIDCIssuer().toString())
+ .withString(OIDCDriverOption.AUTH_PROVIDER_OIDC_CLIENT_ID, authConfig.getOIDCClientId())
+ .withString(
+ OIDCDriverOption.AUTH_PROVIDER_OIDC_CLIENT_SECRET, authConfig.getOIDCClientSecret())
+ .withBoolean(OIDCDriverOption.AUTH_PROVIDER_OIDC_USE_TLS, authConfig.getOIDCUseTLS());
+
+ if (authConfig.getOIDCTruststorePath() != null) {
+ configLoaderBuilder.withString(
+ OIDCDriverOption.AUTH_PROVIDER_OIDC_TRUSTSTORE_PATH,
+ authConfig.getOIDCTruststorePath().toString());
+
+ if (authConfig.getOIDCTruststorePassword() != null) {
+ configLoaderBuilder.withString(
+ OIDCDriverOption.AUTH_PROVIDER_OIDC_TRUSTSTORE_PASSWORD,
+ authConfig.getOIDCTruststorePassword().value());
+ }
+ }
}
}
/**
- * Prepare insert or update (depending on whether or not the table is a COUNTER table), and delete
+ * Prepare insert or update (depending on whether the table is a COUNTER table), and delete
* statements asynchronously.
*
* @param session the session
diff --git a/common/src/test/java/com/datastax/oss/common/sink/config/AuthenticatorConfigTest.java b/common/src/test/java/com/datastax/oss/common/sink/config/AuthenticatorConfigTest.java
index c491e93..b28e4d2 100644
--- a/common/src/test/java/com/datastax/oss/common/sink/config/AuthenticatorConfigTest.java
+++ b/common/src/test/java/com/datastax/oss/common/sink/config/AuthenticatorConfigTest.java
@@ -154,7 +154,7 @@ void should_error_invalid_provider() {
.isInstanceOf(org.apache.kafka.common.config.ConfigException.class)
.hasMessage(
String.format(
- "Invalid value foo for configuration %s: String must be one of: None, PLAIN, GSSAPI",
+ "Invalid value foo for configuration %s: String must be one of: None, PLAIN, GSSAPI, OIDC",
PROVIDER_OPT));
}
diff --git a/pom.xml b/pom.xml
index 199d45a..672fdac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,11 +37,12 @@
UTF-8
UTF-8
- 1.8
- 8
+ 11
+ 11
2.4.0
2.6.2
4.16.0
+ 1.0.1-SNAPSHOT
1.10.0
1.0.3
25.1-jre
@@ -155,6 +156,11 @@
java-driver-shaded-guava
${guava.version}
+
+ com.datastax.db
+ db-advanced-auth-client-plugin
+ ${datastax.advanced-auth-client-plugin.version}
+
org.reactivestreams
reactive-streams
@@ -286,26 +292,6 @@ limitations under the License.]]>
license-maven-plugin
1.14
-
- org.codehaus.mojo
- animal-sniffer-maven-plugin
- 1.16
-
-
- check-jdk8
-
- check
-
-
-
- org.codehaus.mojo.signature
- java18
- 1.0
-
-
-
-
-
org.apache.maven.plugins
maven-shade-plugin
@@ -430,10 +416,6 @@ limitations under the License.]]>
com.mycila
license-maven-plugin
-
- org.codehaus.mojo
- animal-sniffer-maven-plugin
-
maven-compiler-plugin
@@ -646,6 +628,17 @@ limitations under the License.]]>
false
+
+ datastax-snapshots-local
+ DataStax Local Snapshots
+ https://repo.aws.dsinternal.org/artifactory/datastax-snapshots-local/
+
+ false
+
+
+ true
+
+
diff --git a/text/pom.xml b/text/pom.xml
index 09462ad..6beccc2 100644
--- a/text/pom.xml
+++ b/text/pom.xml
@@ -22,7 +22,7 @@
dsbulk-codecs
com.datastax.oss
1.10.0
-
+
com.datastax.oss
1.0.16-SNAPSHOT
@@ -120,13 +120,6 @@
8
-
- org.codehaus.mojo
- animal-sniffer-maven-plugin
-
- true
-
-