Skip to content

Commit 155590a

Browse files
committed
Allow to configure SecureBundle ZIP in textual form
- allow users to encode the ZIP file using standard base64 linux tool - This is the new setting cloud.secureConnectBundle=base64:XXXXXXXXX With this change it is easier to connect to Astra and to secure Cassandra clusters without the need to deploy the ZIP file. Deploying the ZIP file is very awkward on K8S and in Pulsar IO framework
1 parent 73f6af1 commit 155590a

File tree

2 files changed

+80
-1
lines changed

2 files changed

+80
-1
lines changed

common/src/main/java/com/datastax/oss/common/sink/config/CassandraSinkConfig.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@
3030
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
3131
import edu.umd.cs.findbugs.annotations.NonNull;
3232
import edu.umd.cs.findbugs.annotations.Nullable;
33+
import java.io.IOException;
34+
import java.nio.file.Files;
35+
import java.nio.file.Path;
36+
import java.nio.file.attribute.PosixFilePermissions;
37+
import java.util.Base64;
3338
import java.util.Collections;
3439
import java.util.HashMap;
3540
import java.util.List;
@@ -247,6 +252,7 @@ public CassandraSinkConfig(Map<String, String> settings) {
247252
globalConfig = new AbstractConfig(GLOBAL_CONFIG_DEF, globalSettings, false);
248253

249254
populateDriverSettingsWithConnectorSettings(globalSettings);
255+
decodeBase64EncodedSecureBundle(javaDriverSettings);
250256
boolean cloud = isCloud();
251257

252258
if (!cloud) {
@@ -332,7 +338,6 @@ private void populateDriverSettingsWithConnectorSettings(Map<String, String> con
332338
deprecatedMetricsHighestLatency(connectorSettings);
333339
deprecatedCompression(connectorSettings);
334340
deprecatedSecureBundle(connectorSettings);
335-
336341
if (getJmx()) {
337342
metricsSettings();
338343
}
@@ -347,6 +352,31 @@ private void deprecatedSecureBundle(Map<String, String> connectorSettings) {
347352
Function.identity());
348353
}
349354

355+
static void decodeBase64EncodedSecureBundle(Map<String, String> javaDriverSettings) {
356+
// if the path is base64:xxxx we decode the payload and create
357+
// a temporary file
358+
// we are setting permissions such that only current user can access the file
359+
// the file will be deleted at JVM exit
360+
String encoded = javaDriverSettings.get(SECURE_CONNECT_BUNDLE_DRIVER_SETTING);
361+
if (encoded != null && encoded.startsWith("base64:")) {
362+
try {
363+
encoded = encoded.replace("\n", "").replace("\r", "").trim();
364+
encoded = encoded.substring("base64:".length());
365+
byte[] decoded = Base64.getDecoder().decode(encoded);
366+
Path file = Files.createTempFile("cassandra.sink.securebundle", ".zip");
367+
Files.setPosixFilePermissions(file, PosixFilePermissions.fromString("rw-------"));
368+
file.toFile().deleteOnExit();
369+
Files.write(file, decoded);
370+
String path = file.toAbsolutePath().toString();
371+
log.info("Decoded bundle to temporary file {}", path);
372+
javaDriverSettings.put(SECURE_CONNECT_BUNDLE_DRIVER_SETTING, path);
373+
} catch (IOException ex) {
374+
throw new RuntimeException(
375+
"Cannot decode base64 secure bundle and create temporary file: " + ex, ex);
376+
}
377+
}
378+
}
379+
350380
private void deprecatedCompression(Map<String, String> connectorSettings) {
351381
handleDeprecatedSetting(
352382
connectorSettings,

common/src/test/java/com/datastax/oss/common/sink/config/CassandraSinkConfigTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.METRICS_SESSION_CQL_REQUESTS_INTERVAL;
4747
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.METRICS_SESSION_ENABLED;
4848
import static com.datastax.oss.dsbulk.tests.assertions.TestAssertions.assertThat;
49+
import static org.assertj.core.api.Assertions.assertThat;
4950
import static org.assertj.core.api.Assertions.assertThatCode;
5051
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5152

@@ -58,11 +59,19 @@
5859
import com.datastax.oss.driver.shaded.guava.common.collect.Maps;
5960
import com.datastax.oss.dsbulk.tests.logging.LogInterceptingExtension;
6061
import com.datastax.oss.dsbulk.tests.logging.LogInterceptor;
62+
import java.io.ByteArrayOutputStream;
63+
import java.io.File;
64+
import java.nio.file.Files;
65+
import java.nio.file.attribute.PosixFilePermission;
66+
import java.util.Base64;
6167
import java.util.Collections;
6268
import java.util.HashMap;
6369
import java.util.List;
6470
import java.util.Map;
71+
import java.util.Set;
6572
import java.util.stream.Stream;
73+
import java.util.zip.ZipEntry;
74+
import java.util.zip.ZipOutputStream;
6675
import org.junit.jupiter.api.Test;
6776
import org.junit.jupiter.api.extension.ExtendWith;
6877
import org.junit.jupiter.params.ParameterizedTest;
@@ -670,6 +679,46 @@ void should_set_default_driver_setting(String driverSettingName, String expected
670679
.isEqualTo(expectedDefault);
671680
}
672681

682+
@Test
683+
void should_unpack_base64_zip_file_legacy_name() throws Exception {
684+
should_unpack_base64_zip_file(SECURE_CONNECT_BUNDLE_OPT);
685+
}
686+
687+
@Test
688+
void should_unpack_base64_zip_file() throws Exception {
689+
should_unpack_base64_zip_file(SECURE_CONNECT_BUNDLE_DRIVER_SETTING);
690+
}
691+
692+
void should_unpack_base64_zip_file(String entryName) throws Exception {
693+
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
694+
// it is not necessary that we really have a zip file here
695+
// but this gives the flavour of the type of content we expect
696+
// to be encoded in the secureBundleZip
697+
try (ZipOutputStream zip = new ZipOutputStream(buffer)) {
698+
zip.putNextEntry(new ZipEntry("file.bin"));
699+
zip.write(1234);
700+
zip.closeEntry();
701+
}
702+
byte[] zipFileContents = buffer.toByteArray();
703+
String encoded = "base64:" + Base64.getEncoder().encodeToString(zipFileContents);
704+
Map<String, String> inputSettings = new HashMap<>();
705+
inputSettings.put(entryName, encoded);
706+
CassandraSinkConfig cassandraSinkConfig = new CassandraSinkConfig(inputSettings);
707+
assertThat(cassandraSinkConfig.isCloud()).isEqualTo(true);
708+
Map<String, String> javaDriverSettings = cassandraSinkConfig.getJavaDriverSettings();
709+
String path = javaDriverSettings.get(SECURE_CONNECT_BUNDLE_DRIVER_SETTING);
710+
File file = new File(path);
711+
assertThat(file.isFile()).isEqualTo(true);
712+
Set<PosixFilePermission> posixFilePermissions = Files.getPosixFilePermissions(file.toPath());
713+
assertThat(posixFilePermissions)
714+
.contains(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE);
715+
assertThat(posixFilePermissions.size())
716+
.as("bad permissions " + posixFilePermissions)
717+
.isEqualTo(2);
718+
byte[] content = Files.readAllBytes(file.toPath());
719+
assertThat(content).isEqualTo(zipFileContents);
720+
}
721+
673722
@Test
674723
void should_transform_list_setting_to_indexed_typesafe_setting() {
675724
// given

0 commit comments

Comments
 (0)