Skip to content

Commit fcf0475

Browse files
Support for AstraDB Dialect (#2501)
* Support for AstraDB Dialect * Adding UT coverage
1 parent 97a597b commit fcf0475

34 files changed

+2006
-340
lines changed

v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner.md

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,127 @@ In case your job fails due to many exceptions like the above, here are a few ste
339339
#### Throughput on Spanner raises and falls in sharp bursts
340340
It's possible that the default configuration could lead to spanner throughput raise and fall in sharp bursts. In case this is observed, you can disable spanner batch writes by setting `batchSizeForSpannerMutations` as 0.
341341

342+
## AstraDB to Spanner Bulk Migration
343+
### Prerequisites
344+
For bulk data migration from AstraDB to spanner, here are a few prerequisites you will need:
345+
346+
#### Prerequisite-1: Network Connectivity
347+
1. Choose a VPC in the project where you would like to run the dataflow job (default is the VPC named `default` in the project).
348+
2. Ensure that the VPC has network connectivity to your AstraDB instance.
349+
#### Prerequisite-2: AstraDB credentials and related details
350+
You will need the following Astra DB details:
351+
1. AstraDB token.
352+
1. The AstraDB token can be generated from the database page.
353+
2. Please ensure that the token remains valid till the duration of the migration. Depending on the size of the database, the migration can take a few hours.
354+
2. AstraDB Database ID
355+
3. AstraDB Region - Leave it empty for default region.
356+
4. AstraDB Keyspace - The keyspace you want to migrate to spanner.
357+
Note that the template will automatically download the security bundle from the database.
358+
359+
#### Prerequisite-3: Active Astra DB database
360+
Please ensure that the AstraDB instance is active (not hibernated) through the migration.
361+
#### Prerequisite-4: Spanner
362+
You will need to provision a spanner database where you would like to migrate the data. The database would need to have tables with a schema that maps to the schema on the source.
363+
The tables which are present both on Spanner and Cassandra would be the ones that are migrated.
364+
#### Prerequisite-5: GCS
365+
You would need a GCS bucket to stage your build, driver configuration file, and provide an output directory for DLQs.
366+
### Run Migration
367+
368+
**Using the staged template**:
369+
370+
Follow [above](#staging-the-template) to build the template and stage it in GCS.
371+
This step prints the path of the staged template which is passed as `TEMPLATE_SPEC_GCSPATH` below.
372+
373+
To start a job with the staged template at any time using `gcloud`, you are going to
374+
need valid resources for the required parameters.
375+
376+
Provided that, the following command line can be used:
377+
378+
379+
```shell
380+
### Basic Job Paramters
381+
export PROJECT=<your-project>
382+
export BUCKET_NAME=<bucket-name>
383+
export REGION=<GCP-Region-where-the-dataflow-machines-will-be-provisioned-like-us-central1>
384+
export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/flex/Sourcedb_to_Spanner_Flex"
385+
### The number of works controls the fanout of Dataflow job to read from Cassandra.
386+
### While you might need to finetune this for best performance, a number close to number of nodes on Cassandra Cluster might be good place to start.
387+
export MAX_WORKERS="<MAX_NUMBER_OF_DATAFLOW_WORKERS_TO_READ_FROM_CASSANDRA>"
388+
export NUM_WORKERS="<INITIAL_NUMBER_OF_DATAFLOW_WORKERS_TO_READ_FROM_CASSANDRA>"
389+
### The type of machine. `e2-standard-32` might be good starting point for most use cases.
390+
eport MACHINE_TYPE="<WORKER_MACHINE_TYPE>"
391+
392+
### Required
393+
export INSTANCE_ID=<spanner instanceId>
394+
export DATABASE_ID=<spanner databaseId>
395+
export PROJECT_ID=<spanner projectId>
396+
## Either the token directly (starting with `AstraCS`), or URL to gcp secret store.
397+
ASTRA_DB_APPLICATION_TOKEN="AstraCS:<Your-Astra-DB-Token>"
398+
## Astra DB database ID.
399+
ASTRA_DB_ID="<Your-Astra-DB-ID>"
400+
ASTRA_DB_KEYSPACE="<Your-Astra-DB-Key-Space>"
401+
## Astra DB region. Leave empty for default region.
402+
ASTRA_DB_REGION="<Your-Astra-DB-Region>"
403+
#### Stores DLQ.
404+
export OUTPUT_DIRECTORY=<outputDirectory>
405+
406+
### Optional
407+
#### Use A session file in case you would like the Cassandra and Spanner Tables to have different names.
408+
export SESSION_FILE_PATH=""
409+
export DISABLED_ALGORITHMS=<disabledAlgorithms>
410+
export EXTRA_FILES_TO_STAGE=<extraFilesToStage>
411+
export DEFAULT_LOG_LEVEL=INFO
412+
#### Set insert only mode to true, in case you would run bulk migration in parallel to dual writes.
413+
#### This mode stops the bulk template from overwriting rows that already exist in spanner.
414+
#### If you are not replicating live changes to spanner in parallel, you could choose to set this mode to false.
415+
#### Setting this mode to false causes the bulk template to overwrite existing rows in spanner.
416+
#### false is the default if unset.
417+
export INSERT_ONLY_MODE_FOR_SPANNER_MUTATIONS="true"
418+
#### Region for Dataflow workers (Required ony if you want to configure network and subnetwork.
419+
expoert WORKER_REGION="${REGION}"
420+
#### Network where you would like to run Dataflow. Defaults to default. This VPC must have access to Cassandra nodes you would like to migrate from.
421+
export NETWORK="<VPC_NAME>"
422+
#### Subnet where you would like to run Dataflow. Defaults to default. This subnet must have access to Cassandra nodes you would like to migrate from.
423+
export SUBNETWORK="regions/${WORKER_REGION}/subnetworks/<SUBNET_NAME>"
424+
#### Number of partitions for parallel read.
425+
##### By default Apache Beam's CassandraIO sets NUM_PARTITIONS equals to number
426+
##### of nodes on the Cassandra Cluster. This default does not give good performance
427+
##### larger workloads as it limits the parallelization.
428+
##### While specifcs would depend on many factors like number of Cassandra nodes, distribution
429+
##### of partitions of the table across the nodes,
430+
##### In general a partition of average size of 150 MB gives good throughput and might be a good place to start the fine-tuning.
431+
NUM_PARTITIONS="<NUM_PARTITIONS>"
432+
#### Disable Spanner Batch Writes.
433+
BATCH_SIZE_FOR_SPANNER_MUTATIONS=1
434+
435+
gcloud dataflow flex-template run "sourcedb-to-spanner-flex-job" \
436+
--project "$PROJECT" \
437+
--region "$REGION" \
438+
--network "$NETWORK" \
439+
--max-workers "$MAX_WORKERS" \
440+
--num-workers "$NUM_WORKERS" \
441+
--worker-machine-type "$MACHINE_TYPE" \
442+
--subnetwork "$SUBNETWORK" \
443+
--template-file-gcs-location "$TEMPLATE_SPEC_GCSPATH" \
444+
--additional-experiments="[\"disable_runner_v2\"]" \
445+
--parameters "sourceDbDialect=ASTRA_DB" \
446+
--parameters "insertOnlyModeForSpannerMutations=$INSERT_ONLY_MODE_FOR_SPANNER_MUTATIONS" \
447+
--parameters "astraDBToken=${ASTRA_DB_APPLICATION_TOKEN}" \
448+
--parameters "astraDBRegion=${ASTRA_DB_REGION}" \
449+
--parameters "astraDBDatabaseId=${ASTRA_DB_ID}" \
450+
--parameters "astraDBKeySpace=${ASTRA_DB_KEYSPACE}" \
451+
--parameters "instanceId=$INSTANCE_ID" \
452+
--parameters "databaseId=$DATABASE_ID" \
453+
--parameters "projectId=$PROJECT_ID" \
454+
--parameters "sessionFilePath=$SESSION_FILE_PATH" \
455+
--parameters "outputDirectory=$OUTPUT_DIRECTORY" \
456+
--parameters "disabledAlgorithms=$DISABLED_ALGORITHMS" \
457+
--parameters "extraFilesToStage=$EXTRA_FILES_TO_STAGE" \
458+
--parameters "defaultLogLevel=$DEFAULT_LOG_LEVEL" \
459+
--parameters "numPartitions=${NUM_PARTITIONS}" \
460+
--parameters "batchSizeForSpannerMutations=${BATCH_SIZE_FOR_SPANNER_MUTATIONS}"
461+
```
462+
342463

343464
## Terraform
344465

v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner_Flex.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
2828

2929
### Optional parameters
3030

31-
* **sourceDbDialect**: Possible values are `CASSANDRA`, `MYSQL` and `POSTGRESQL`. Defaults to: MYSQL.
31+
* **sourceDbDialect**: Possible values are `ASTRA_DB`, `CASSANDRA`, `MYSQL` and `POSTGRESQL`. Defaults to: MYSQL.
3232
* **jdbcDriverJars**: The comma-separated list of driver JAR files. For example, `gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar`. Defaults to empty.
3333
* **jdbcDriverClassName**: The JDBC driver class name. For example, `com.mysql.jdbc.Driver`. Defaults to: com.mysql.jdbc.Driver.
3434
* **username**: The username to be used for the JDBC connection. Defaults to empty.
@@ -46,8 +46,8 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
4646
* **insertOnlyModeForSpannerMutations**: By default the pipeline uses Upserts to write rows to spanner. Which means existing rows would get overwritten. If InsertOnly mode is enabled, inserts would be used instead of upserts and existing rows won't be overwritten.
4747
* **batchSizeForSpannerMutations**: BatchSize in bytes for Spanner Mutations. if set less than 0, default of Apache Beam's SpannerIO is used, which is 1MB. Set this to 0 or 10, to disable batching mutations.
4848
* **spannerPriority**: The request priority for Cloud Spanner calls. The value must be one of: [`HIGH`,`MEDIUM`,`LOW`]. Defaults to `MEDIUM`.
49-
* **tableOverrides**: These are the table name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]This example shows mapping Singers table to Vocalists and Albums table to Records. For example, `[{Singers, Vocalists}, {Albums, Records}]`. Defaults to empty.
50-
* **columnOverrides**: These are the column name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides.The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively. For example, `[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]`. Defaults to empty.
49+
* **tableOverrides**: These are the table name overrides from source to spanner. They are written in the following format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]This example shows mapping Singers table to Vocalists and Albums table to Records. For example, `[{Singers, Vocalists}, {Albums, Records}]`. Defaults to empty.
50+
* **columnOverrides**: These are the column name overrides from source to spanner. They are written in the following format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides.The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively. For example, `[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]`. Defaults to empty.
5151
* **schemaOverridesFilePath**: A file which specifies the table and the column name overrides from source to spanner. Defaults to empty.
5252
* **uniformizationStageCountHint**: Hint for number of uniformization stages. Currently Applicable only for jdbc based sources like MySQL or PostgreSQL. Leave 0 or default to disable uniformization. Set to -1 for a log(numPartition) number of stages. If your source primary key space is uniformly distributed (for example an auto-incrementing key with sparse holes), it's based to leave it disabled. If your keyspace is not uniform, you might encounter a laggard VM in your dataflow run. In such a case, you can set it to -1 to enable uniformization. Manually setting it to values other than 0 or -1 would help you fine tune the tradeoff of the overhead added by uniformization stages and the performance improvement due to better distribution of work.
5353
* **disabledAlgorithms**: Comma separated algorithms to disable. If this value is set to `none`, no algorithm is disabled. Use this parameter with caution, because the algorithms disabled by default might have vulnerabilities or performance issues. For example, `SSLv3, RC4`.

v2/sourcedb-to-spanner/pom.xml

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
</parent>
2828

2929
<artifactId>sourcedb-to-spanner</artifactId>
30+
<properties>
31+
<cassandra-java-driver-core.version>4.18.1
32+
</cassandra-java-driver-core.version>
33+
</properties>
3034

3135
<dependencies>
3236
<dependency>
@@ -74,18 +78,11 @@
7478
<artifactId>postgresql</artifactId>
7579
<version>${postgresql.version}</version>
7680
</dependency>
77-
78-
<!-- https://mvnrepository.com/artifact/com.datastax.oss/java-driver-core -->
81+
<!-- https://mvnrepository.com/artifact/org.apache.cassandra/java-driver-core -->
7982
<dependency>
80-
<groupId>com.datastax.oss</groupId>
83+
<groupId>org.apache.cassandra</groupId>
8184
<artifactId>java-driver-core</artifactId>
82-
<version>4.17.0</version>
83-
<exclusions>
84-
<exclusion>
85-
<groupId>org.slf4j</groupId>
86-
<artifactId>slf4j-api</artifactId>
87-
</exclusion>
88-
</exclusions>
85+
<version>${cassandra-java-driver-core.version}</version>
8986
</dependency>
9087

9188
<!-- Needed for Beam CassandraIO -->
@@ -119,6 +116,12 @@
119116
<groupId>org.apache.beam</groupId>
120117
<artifactId>beam-it-cassandra</artifactId>
121118
<scope>test</scope>
119+
<exclusions>
120+
<exclusion>
121+
<groupId>com.datastax.oss</groupId>
122+
<artifactId>java-driver-core</artifactId>
123+
</exclusion>
124+
</exclusions>
122125
</dependency>
123126

124127
<!-- https://mvnrepository.com/artifact/org.apache.derby/derby -->
@@ -166,12 +169,12 @@
166169
<version>1.0-SNAPSHOT</version>
167170
<scope>compile</scope>
168171
</dependency>
169-
<dependency>
170-
<groupId>com.github.nosan</groupId>
171-
<artifactId>embedded-cassandra</artifactId>
172-
<version>5.0.0</version>
173-
<scope>test</scope>
174-
</dependency>
172+
<dependency>
173+
<groupId>com.github.nosan</groupId>
174+
<artifactId>embedded-cassandra</artifactId>
175+
<version>5.0.0</version>
176+
<scope>test</scope>
177+
</dependency>
175178
<dependency>
176179
<groupId>org.apache.beam</groupId>
177180
<artifactId>beam-sdks-java-io-cassandra</artifactId>
@@ -210,6 +213,17 @@
210213
<version>1.19.0</version>
211214
<scope>test</scope>
212215
</dependency>
216+
<dependency>
217+
<groupId>com.datastax.astra</groupId>
218+
<artifactId>beam-sdks-java-io-astra</artifactId>
219+
<version>4.18.1</version>
220+
</dependency>
221+
<!-- Downloading Secure Bundle -->
222+
<dependency>
223+
<groupId>com.datastax.astra</groupId>
224+
<artifactId>astra-sdk-devops</artifactId>
225+
<version>0.6.3</version>
226+
</dependency>
213227

214228
<!-- test dependencies for localCassandraIO end -->
215229

@@ -226,4 +240,14 @@
226240
<scope>test</scope>
227241
</dependency>
228242
</dependencies>
243+
<dependencyManagement>
244+
<dependencies>
245+
<dependency>
246+
<groupId>org.apache.cassandra</groupId>
247+
<artifactId>java-driver-core</artifactId>
248+
<version>${cassandra-java-driver-core.version}</version>
249+
</dependency>
250+
</dependencies>
251+
</dependencyManagement>
252+
229253
</project>

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
/** Interface used by the SourcedbToSpanner pipeline to accept user input. */
2323
public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
2424
String CASSANDRA_SOURCE_DIALECT = "CASSANDRA";
25+
String ASTRA_DB_SOURCE_DIALECT = "ASTRA_DB";
2526
String MYSQL_SOURCE_DIALECT = "MYSQL";
2627
String PG_SOURCE_DIALECT = "POSTGRESQL";
2728

2829
@TemplateParameter.Enum(
2930
order = 1,
3031
optional = true,
3132
enumOptions = {
33+
@TemplateParameter.TemplateEnumOption(ASTRA_DB_SOURCE_DIALECT),
3234
@TemplateParameter.TemplateEnumOption(CASSANDRA_SOURCE_DIALECT),
3335
@TemplateParameter.TemplateEnumOption(MYSQL_SOURCE_DIALECT),
3436
@TemplateParameter.TemplateEnumOption(PG_SOURCE_DIALECT)
@@ -66,14 +68,16 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
6668

6769
@TemplateParameter.Text(
6870
order = 4,
69-
regexes = {"(^jdbc:mysql://.*|^jdbc:postgresql://.*|^gs://.*)"},
71+
optional = true,
72+
regexes = {"(^jdbc:mysql://.*|^jdbc:postgresql://.*|^gs://.*|^$)"},
7073
groupName = "Source",
7174
description =
7275
"URL to connect to the source database host. It can be either of "
7376
+ "1. The JDBC connection URL - which must contain the host, port and source db name and can optionally contain properties like autoReconnect, maxReconnects etc. Format: `jdbc:{mysql|postgresql}://{host}:{port}/{dbName}?{parameters}`"
7477
+ "2. The shard config path",
7578
helpText =
76-
"The JDBC connection URL string. For example, `jdbc:mysql://127.4.5.30:3306/my-db?autoReconnect=true&maxReconnects=10&unicode=true&characterEncoding=UTF-8` or the shard config")
79+
"The JDBC connection URL string. For example, `jdbc:mysql://127.4.5.30:3306/my-db?autoReconnect=true&maxReconnects=10&unicode=true&characterEncoding=UTF-8` or the shard config. This parameter is required except for ASTRA_DB source.")
80+
@Default.String("")
7781
String getSourceConfigURL();
7882

7983
void setSourceConfigURL(String url);
@@ -355,4 +359,45 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
355359
Long getUniformizationStageCountHint();
356360

357361
void setUniformizationStageCountHint(Long value);
362+
363+
@TemplateParameter.Text(
364+
order = 28,
365+
optional = true,
366+
description = "Astra DB token",
367+
helpText =
368+
"AstraDB token, ignored for non-AstraDB dialects. This token is used to automatically download the securebundle by the tempalte.")
369+
@Default.String("")
370+
String getAstraDBToken();
371+
372+
void setAstraDBToken(String value);
373+
374+
@TemplateParameter.Text(
375+
order = 29,
376+
optional = true,
377+
description = "Astra DB databaseID",
378+
helpText = "AstraDB databaseID, ignored for non-AstraDB dialects")
379+
@Default.String("")
380+
String getAstraDBDatabaseId();
381+
382+
void setAstraDBDatabaseId(String value);
383+
384+
@TemplateParameter.Text(
385+
order = 30,
386+
optional = true,
387+
description = "Astra DB keySpace",
388+
helpText = "AstraDB keySpace, ignored for non-AstraDB dialects")
389+
@Default.String("")
390+
String getAstraDBKeySpace();
391+
392+
void setAstraDBKeySpace(String value);
393+
394+
@TemplateParameter.Text(
395+
order = 31,
396+
optional = true,
397+
description = "Astra DB Region",
398+
helpText = "AstraDB region, ignored for non-AstraDB dialects")
399+
@Default.String("")
400+
String getAstraDBRegion();
401+
402+
void setAstraDBRegion(String value);
358403
}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/auth/dbauth/GuardedStringValueProvider.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,19 @@ public static GuardedStringValueProvider create(String value) {
4242
return new GuardedStringValueProvider(new GuardedString(value.toCharArray()));
4343
}
4444

45+
@Override
46+
public boolean equals(Object other) {
47+
boolean result;
48+
if ((other == null) || (getClass() != other.getClass())) {
49+
result = false;
50+
} // end if
51+
else {
52+
GuardedStringValueProvider otherGuardedString = (GuardedStringValueProvider) other;
53+
result = this.get().equals(otherGuardedString.get());
54+
}
55+
return result;
56+
}
57+
4558
/**
4659
* Implementation {@link ValueProvider#get()}.
4760
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.source.reader.io.cassandra.exception;
17+
18+
import com.google.cloud.teleport.v2.source.reader.io.exception.SchemaDiscoveryException;
19+
20+
public class AstraDBNotFoundException extends SchemaDiscoveryException {
21+
22+
public AstraDBNotFoundException(String msg) {
23+
super(new Throwable(msg));
24+
}
25+
}

0 commit comments

Comments
 (0)