Skip to content

Commit 4e8efe0

Browse files
committed
More base64 encoded files
Allow to configure Auth/GSSAPI and TLS files encoded in base64
1 parent 82452a6 commit 4e8efe0

File tree

2 files changed

+95
-28
lines changed

2 files changed

+95
-28
lines changed

common/src/main/java/com/datastax/oss/common/sink/config/CassandraSinkConfig.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
*/
1616
package com.datastax.oss.common.sink.config;
1717

18+
import static com.datastax.oss.common.sink.config.AuthenticatorConfig.KEYTAB_OPT;
19+
import static com.datastax.oss.common.sink.config.SslConfig.KEYSTORE_PATH_OPT;
20+
import static com.datastax.oss.common.sink.config.SslConfig.OPENSSL_KEY_CERT_CHAIN_OPT;
21+
import static com.datastax.oss.common.sink.config.SslConfig.OPENSSL_PRIVATE_KEY_OPT;
22+
import static com.datastax.oss.common.sink.config.SslConfig.TRUSTSTORE_PATH_OPT;
1823
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONTACT_POINTS;
1924
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES;
2025
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.METRICS_NODE_ENABLED;
@@ -252,7 +257,18 @@ public CassandraSinkConfig(Map<String, String> settings) {
252257
globalConfig = new AbstractConfig(GLOBAL_CONFIG_DEF, globalSettings, false);
253258

254259
populateDriverSettingsWithConnectorSettings(globalSettings);
255-
decodeBase64EncodedSecureBundle(javaDriverSettings);
260+
261+
// for Pulsar Sink we want to make it easy to deploy these files
262+
// as simple base64 encoded strings, because there is no
263+
// automatic mechanism to distribute files to the workers that
264+
// execute the Sink
265+
decodeBase64EncodedFile(SECURE_CONNECT_BUNDLE_DRIVER_SETTING, javaDriverSettings);
266+
decodeBase64EncodedFile(KEYSTORE_PATH_OPT, sslSettings);
267+
decodeBase64EncodedFile(TRUSTSTORE_PATH_OPT, sslSettings);
268+
decodeBase64EncodedFile(OPENSSL_PRIVATE_KEY_OPT, sslSettings);
269+
decodeBase64EncodedFile(OPENSSL_KEY_CERT_CHAIN_OPT, sslSettings);
270+
decodeBase64EncodedFile(KEYTAB_OPT, authSettings);
271+
256272
boolean cloud = isCloud();
257273

258274
if (!cloud) {
@@ -352,27 +368,27 @@ private void deprecatedSecureBundle(Map<String, String> connectorSettings) {
352368
Function.identity());
353369
}
354370

355-
static void decodeBase64EncodedSecureBundle(Map<String, String> javaDriverSettings) {
371+
static void decodeBase64EncodedFile(String key, Map<String, String> javaDriverSettings) {
356372
// if the path is base64:xxxx we decode the payload and create
357373
// a temporary file
358374
// we are setting permissions such that only current user can access the file
359375
// the file will be deleted at JVM exit
360-
String encoded = javaDriverSettings.get(SECURE_CONNECT_BUNDLE_DRIVER_SETTING);
376+
String encoded = javaDriverSettings.get(key);
361377
if (encoded != null && encoded.startsWith("base64:")) {
362378
try {
363379
encoded = encoded.replace("\n", "").replace("\r", "").trim();
364380
encoded = encoded.substring("base64:".length());
365381
byte[] decoded = Base64.getDecoder().decode(encoded);
366-
Path file = Files.createTempFile("cassandra.sink.securebundle", ".zip");
382+
Path file = Files.createTempFile("cassandra.sink.", ".tmp");
367383
Files.setPosixFilePermissions(file, PosixFilePermissions.fromString("rw-------"));
368384
file.toFile().deleteOnExit();
369385
Files.write(file, decoded);
370386
String path = file.toAbsolutePath().toString();
371-
log.info("Decoded bundle to temporary file {}", path);
372-
javaDriverSettings.put(SECURE_CONNECT_BUNDLE_DRIVER_SETTING, path);
387+
log.info("Decoded {} to temporary file {}", key, path);
388+
javaDriverSettings.put(key, path);
373389
} catch (IOException ex) {
374390
throw new RuntimeException(
375-
"Cannot decode base64 secure bundle and create temporary file: " + ex, ex);
391+
"Cannot decode base64 " + key + " and create temporary file: " + ex, ex);
376392
}
377393
}
378394
}

common/src/test/java/com/datastax/oss/common/sink/config/CassandraSinkConfigTest.java

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.datastax.oss.common.sink.config;
1717

18+
import static com.datastax.oss.common.sink.config.AuthenticatorConfig.KEYTAB_OPT;
1819
import static com.datastax.oss.common.sink.config.CassandraSinkConfig.COMPRESSION_DEFAULT;
1920
import static com.datastax.oss.common.sink.config.CassandraSinkConfig.COMPRESSION_DRIVER_SETTING;
2021
import static com.datastax.oss.common.sink.config.CassandraSinkConfig.COMPRESSION_OPT;
@@ -39,13 +40,18 @@
3940
import static com.datastax.oss.common.sink.config.CassandraSinkConfig.SECURE_CONNECT_BUNDLE_OPT;
4041
import static com.datastax.oss.common.sink.config.CassandraSinkConfig.SSL_OPT_PREFIX;
4142
import static com.datastax.oss.common.sink.config.CassandraSinkConfig.withDriverPrefix;
43+
import static com.datastax.oss.common.sink.config.SslConfig.KEYSTORE_PATH_OPT;
44+
import static com.datastax.oss.common.sink.config.SslConfig.OPENSSL_KEY_CERT_CHAIN_OPT;
45+
import static com.datastax.oss.common.sink.config.SslConfig.OPENSSL_PRIVATE_KEY_OPT;
4246
import static com.datastax.oss.common.sink.config.SslConfig.PROVIDER_OPT;
47+
import static com.datastax.oss.common.sink.config.SslConfig.TRUSTSTORE_PATH_OPT;
4348
import static com.datastax.oss.common.sink.config.TableConfig.MAPPING_OPT;
4449
import static com.datastax.oss.common.sink.config.TableConfig.getTableSettingPath;
4550
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONTACT_POINTS;
4651
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.METRICS_SESSION_CQL_REQUESTS_INTERVAL;
4752
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.METRICS_SESSION_ENABLED;
4853
import static com.datastax.oss.dsbulk.tests.assertions.TestAssertions.assertThat;
54+
import static java.nio.charset.StandardCharsets.UTF_8;
4955
import static org.assertj.core.api.Assertions.assertThat;
5056
import static org.assertj.core.api.Assertions.assertThatCode;
5157
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -59,7 +65,6 @@
5965
import com.datastax.oss.driver.shaded.guava.common.collect.Maps;
6066
import com.datastax.oss.dsbulk.tests.logging.LogInterceptingExtension;
6167
import com.datastax.oss.dsbulk.tests.logging.LogInterceptor;
62-
import java.io.ByteArrayOutputStream;
6368
import java.io.File;
6469
import java.nio.file.Files;
6570
import java.nio.file.attribute.PosixFilePermission;
@@ -69,9 +74,8 @@
6974
import java.util.List;
7075
import java.util.Map;
7176
import java.util.Set;
77+
import java.util.function.Function;
7278
import java.util.stream.Stream;
73-
import java.util.zip.ZipEntry;
74-
import java.util.zip.ZipOutputStream;
7579
import org.junit.jupiter.api.Test;
7680
import org.junit.jupiter.api.extension.ExtendWith;
7781
import org.junit.jupiter.params.ParameterizedTest;
@@ -680,33 +684,80 @@ void should_set_default_driver_setting(String driverSettingName, String expected
680684
}
681685

682686
@Test
683-
void should_unpack_base64_zip_file_legacy_name() throws Exception {
684-
should_unpack_base64_zip_file(SECURE_CONNECT_BUNDLE_OPT);
687+
void should_unpack_base64_secureBundle_legacy_name() throws Exception {
688+
should_unpack_base64_file(
689+
SECURE_CONNECT_BUNDLE_OPT,
690+
(CassandraSinkConfig config) -> {
691+
assertThat(config.isCloud()).isEqualTo(true);
692+
return config.getJavaDriverSettings().get(SECURE_CONNECT_BUNDLE_DRIVER_SETTING);
693+
});
685694
}
686695

687696
@Test
688-
void should_unpack_base64_zip_file() throws Exception {
689-
should_unpack_base64_zip_file(SECURE_CONNECT_BUNDLE_DRIVER_SETTING);
697+
void should_unpack_base64_secureBundle() throws Exception {
698+
should_unpack_base64_file(
699+
SECURE_CONNECT_BUNDLE_DRIVER_SETTING,
700+
(CassandraSinkConfig config) -> {
701+
assertThat(config.isCloud()).isEqualTo(true);
702+
return config.getJavaDriverSettings().get(SECURE_CONNECT_BUNDLE_DRIVER_SETTING);
703+
});
690704
}
691705

692-
void should_unpack_base64_zip_file(String entryName) throws Exception {
693-
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
694-
// it is not necessary that we really have a zip file here
695-
// but this gives the flavour of the type of content we expect
696-
// to be encoded in the secureBundleZip
697-
try (ZipOutputStream zip = new ZipOutputStream(buffer)) {
698-
zip.putNextEntry(new ZipEntry("file.bin"));
699-
zip.write(1234);
700-
zip.closeEntry();
701-
}
702-
byte[] zipFileContents = buffer.toByteArray();
706+
@Test
707+
void should_unpack_base64_ssl_keystore() throws Exception {
708+
should_unpack_base64_file(
709+
KEYSTORE_PATH_OPT,
710+
(CassandraSinkConfig config) -> {
711+
return config.getSslConfig().getKeystorePath().toString();
712+
});
713+
}
714+
715+
@Test
716+
void should_unpack_base64_ssl_trustore() throws Exception {
717+
should_unpack_base64_file(
718+
TRUSTSTORE_PATH_OPT,
719+
(CassandraSinkConfig config) -> {
720+
return config.getSslConfig().getTruststorePath().toString();
721+
});
722+
}
723+
724+
@Test
725+
void should_unpack_base64_ssl_openssl_private_key() throws Exception {
726+
should_unpack_base64_file(
727+
OPENSSL_PRIVATE_KEY_OPT,
728+
(CassandraSinkConfig config) -> {
729+
return config.getSslConfig().getOpenSslPrivateKey().toString();
730+
});
731+
}
732+
733+
@Test
734+
void should_unpack_base64_ssl_openssl_cert_chain() throws Exception {
735+
should_unpack_base64_file(
736+
OPENSSL_KEY_CERT_CHAIN_OPT,
737+
(CassandraSinkConfig config) -> {
738+
return config.getSslConfig().getOpenSslKeyCertChain().toString();
739+
});
740+
}
741+
742+
@Test
743+
void should_unpack_base64_auth_keytab() throws Exception {
744+
should_unpack_base64_file(
745+
KEYTAB_OPT,
746+
(CassandraSinkConfig config) -> {
747+
return config.getAuthenticatorConfig().getKeyTabPath().toString();
748+
});
749+
}
750+
751+
void should_unpack_base64_file(
752+
String entryName, Function<CassandraSinkConfig, String> parameterAccessor) throws Exception {
753+
754+
byte[] zipFileContents = "foo".getBytes(UTF_8);
703755
String encoded = "base64:" + Base64.getEncoder().encodeToString(zipFileContents);
704756
Map<String, String> inputSettings = new HashMap<>();
705757
inputSettings.put(entryName, encoded);
706758
CassandraSinkConfig cassandraSinkConfig = new CassandraSinkConfig(inputSettings);
707-
assertThat(cassandraSinkConfig.isCloud()).isEqualTo(true);
708-
Map<String, String> javaDriverSettings = cassandraSinkConfig.getJavaDriverSettings();
709-
String path = javaDriverSettings.get(SECURE_CONNECT_BUNDLE_DRIVER_SETTING);
759+
760+
String path = parameterAccessor.apply(cassandraSinkConfig);
710761
File file = new File(path);
711762
assertThat(file.isFile()).isEqualTo(true);
712763
Set<PosixFilePermission> posixFilePermissions = Files.getPosixFilePermissions(file.toPath());

0 commit comments

Comments
 (0)