Skip to content

Commit bc90790

Browse files
Custom transformation for Cassandra to Spanner SourceDB (#2201)
* RR custom transformation feature (#102) * sync upstream/main (#98) * Custom transformation fixes * Added Custom Transformation * Added Custom Transformation * Added Fixes * Address null to all columns * Added Null Assert * Added Timeout fixes * Added Spotless fixes * reverse merge the main * Added Custom Fixes * Added Drop Keys --------- * Move null test case into previous tests Move null test case into previous tests * Issue fix * Null fix * Null fix for varint * Removed unwanted code * Added Bytes Fixes * Missing Bytes Error Code Pushed * PR comment fixes (#114) * Regex fixes to validate binary string vs base64 * Added Review Comments (#122) * Added Review Comments * Address TODO * Added POM File * Added Ignore Case in table name check * Reference Mismatch * Added full_name as we are skiping extra column and adding as null * Schema Fixes * Added Missing Alignment issue with CustomTransformation * Removed binaryString encoded Support * removed unwanted log * Keep Log Level As warm --------- Co-authored-by: pawankashyapollion <v-pawan.kumar@ollion.com>
1 parent e0d274c commit bc90790

File tree

11 files changed

+669
-310
lines changed

11 files changed

+669
-310
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@
369369
<exclude>**/CustomTransformationImplFetcher.*</exclude>
370370
<exclude>**/JarFileReader.*</exclude>
371371
<exclude>**/CustomTransformationWithShardFor*IT.*</exclude>
372+
<exclude>**/CustomTransformationWithCassandraForIT.*</exclude>
372373
<exclude>**/models/*</exclude>
373374
<exclude>**/exceptions/*</exclude>
374375
</excludes>
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.custom;
17+
18+
import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException;
19+
import com.google.cloud.teleport.v2.spanner.utils.ISpannerMigrationTransformer;
20+
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationRequest;
21+
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationResponse;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
public class CustomTransformationWithCassandraForIT implements ISpannerMigrationTransformer {
28+
29+
private static final Logger LOG = LoggerFactory.getLogger(CustomShardIdFetcher.class);
30+
31+
@Override
32+
public void init(String parameters) {
33+
LOG.info("init called with {}", parameters);
34+
}
35+
36+
@Override
37+
public MigrationTransformationResponse toSpannerRow(MigrationTransformationRequest request)
38+
throws InvalidTransformationException {
39+
return new MigrationTransformationResponse(null, false);
40+
}
41+
42+
@Override
43+
public MigrationTransformationResponse toSourceRow(MigrationTransformationRequest request)
44+
throws InvalidTransformationException {
45+
if (request.getTableName().equalsIgnoreCase("customers")) {
46+
Map<String, Object> requestRow = request.getRequestRow();
47+
Map<String, Object> row = new HashMap<>();
48+
row.put("full_name", requestRow.get("first_name") + " " + requestRow.get("last_name"));
49+
MigrationTransformationResponse response = new MigrationTransformationResponse(row, false);
50+
return response;
51+
}
52+
return new MigrationTransformationResponse(null, false);
53+
}
54+
}

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGenerator.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
116116
sourceTable,
117117
dmlGeneratorRequest.getNewValuesJson(),
118118
dmlGeneratorRequest.getKeyValuesJson(),
119-
dmlGeneratorRequest.getSourceDbTimezoneOffset());
119+
dmlGeneratorRequest.getSourceDbTimezoneOffset(),
120+
dmlGeneratorRequest.getCustomTransformationResponse());
120121
if (pkColumnNameValues == null) {
121122
LOG.warn(
122123
"Failed to generate primary key values for table {}. Skipping the record.",
@@ -166,7 +167,8 @@ private static DMLGeneratorResponse generateDMLResponse(
166167
sourceTable,
167168
dmlGeneratorRequest.getNewValuesJson(),
168169
dmlGeneratorRequest.getKeyValuesJson(),
169-
dmlGeneratorRequest.getSourceDbTimezoneOffset());
170+
dmlGeneratorRequest.getSourceDbTimezoneOffset(),
171+
dmlGeneratorRequest.getCustomTransformationResponse());
170172
Map<String, PreparedStatementValueObject<?>> allColumnNamesAndValues =
171173
ImmutableMap.<String, PreparedStatementValueObject<?>>builder()
172174
.putAll(pkColumnNameValues)
@@ -287,6 +289,7 @@ private static DMLGeneratorResponse getDeleteStatementCQL(
287289
* @param newValuesJson the JSON object containing new values for columns.
288290
* @param keyValuesJson the JSON object containing key values for columns.
289291
* @param sourceDbTimezoneOffset the timezone offset of the source database.
292+
* @param customTransformationResponse the custom transformation
290293
* @return a map of column names to their corresponding prepared statement value objects.
291294
* <p>This method: 1. Iterates over the non-primary key column definitions in the source table
292295
* schema. 2. Maps each column in the source table schema to its corresponding column in the
@@ -299,24 +302,37 @@ private static Map<String, PreparedStatementValueObject<?>> getColumnValues(
299302
SourceTable sourceTable,
300303
JSONObject newValuesJson,
301304
JSONObject keyValuesJson,
302-
String sourceDbTimezoneOffset) {
305+
String sourceDbTimezoneOffset,
306+
Map<String, Object> customTransformationResponse) {
303307
Map<String, PreparedStatementValueObject<?>> response = new HashMap<>();
304308
Set<String> sourcePKs = sourceTable.getPrimaryKeySet();
309+
Set<String> customTransformColumns = null;
310+
if (customTransformationResponse != null) {
311+
customTransformColumns = customTransformationResponse.keySet();
312+
}
305313
for (Map.Entry<String, SourceColumnDefinition> entry : sourceTable.getColDefs().entrySet()) {
306314
SourceColumnDefinition sourceColDef = entry.getValue();
307315

308316
String colName = sourceColDef.getName();
309317
if (sourcePKs.contains(colName)) {
310318
continue; // we only need non-primary keys
311319
}
312-
320+
PreparedStatementValueObject<?> columnValue;
321+
if (customTransformColumns != null
322+
&& customTransformColumns.contains(sourceColDef.getName())) {
323+
String cassandraType = sourceColDef.getType().getName().toLowerCase();
324+
columnValue =
325+
PreparedStatementValueObject.create(
326+
cassandraType, customTransformationResponse.get(colName));
327+
response.put(sourceColDef.getName(), columnValue);
328+
continue;
329+
}
313330
String colId = entry.getKey();
314331
SpannerColumnDefinition spannerColDef = spannerTable.getColDefs().get(colId);
315332
if (spannerColDef == null) {
316333
continue;
317334
}
318335
String spannerColumnName = spannerColDef.getName();
319-
PreparedStatementValueObject<?> columnValue;
320336
if (keyValuesJson.has(spannerColumnName)) {
321337
columnValue =
322338
getMappedColumnValue(
@@ -344,6 +360,7 @@ private static Map<String, PreparedStatementValueObject<?>> getColumnValues(
344360
* @param newValuesJson the JSON object containing new values for columns.
345361
* @param keyValuesJson the JSON object containing key values for columns.
346362
* @param sourceDbTimezoneOffset the timezone offset of the source database.
363+
* @param customTransformationResponse the user defined transformation.
347364
* @return a map of primary key column names to their corresponding prepared statement value
348365
* objects, or null if a required column is missing.
349366
* <p>This method: 1. Iterates over the primary key definitions in the source table schema. 2.
@@ -357,10 +374,14 @@ private static Map<String, PreparedStatementValueObject<?>> getPkColumnValues(
357374
SourceTable sourceTable,
358375
JSONObject newValuesJson,
359376
JSONObject keyValuesJson,
360-
String sourceDbTimezoneOffset) {
377+
String sourceDbTimezoneOffset,
378+
Map<String, Object> customTransformationResponse) {
361379
Map<String, PreparedStatementValueObject<?>> response = new HashMap<>();
362380
ColumnPK[] sourcePKs = sourceTable.getPrimaryKeys();
363-
381+
Set<String> customTransformColumns = null;
382+
if (customTransformationResponse != null) {
383+
customTransformColumns = customTransformationResponse.keySet();
384+
}
364385
for (ColumnPK currentSourcePK : sourcePKs) {
365386
String colId = currentSourcePK.getColId();
366387
SourceColumnDefinition sourceColDef = sourceTable.getColDefs().get(colId);
@@ -373,7 +394,14 @@ private static Map<String, PreparedStatementValueObject<?>> getPkColumnValues(
373394
}
374395
String spannerColumnName = spannerColDef.getName();
375396
PreparedStatementValueObject<?> columnValue;
376-
if (keyValuesJson.has(spannerColumnName)) {
397+
if (customTransformColumns != null
398+
&& customTransformColumns.contains(sourceColDef.getName())) {
399+
String cassandraType = sourceColDef.getType().getName().toLowerCase();
400+
String columnName = spannerColDef.getName();
401+
columnValue =
402+
PreparedStatementValueObject.create(
403+
cassandraType, customTransformationResponse.get(columnName));
404+
} else if (keyValuesJson.has(spannerColumnName)) {
377405
columnValue =
378406
getMappedColumnValue(
379407
spannerColDef, sourceColDef, keyValuesJson, sourceDbTimezoneOffset);

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandler.java

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -172,30 +172,9 @@ private static ByteBuffer parseBlobType(Object colValue) {
172172
return ByteBuffer.wrap((byte[]) colValue);
173173
} else if (colValue instanceof ByteBuffer) {
174174
return (ByteBuffer) colValue;
175+
} else {
176+
return ByteBuffer.wrap(java.util.Base64.getDecoder().decode((String) colValue));
175177
}
176-
return ByteBuffer.wrap(java.util.Base64.getDecoder().decode((String) colValue));
177-
}
178-
179-
/**
180-
* Converts a hexadecimal string into a byte array.
181-
*
182-
* @param binaryEncodedStr the hexadecimal string to be converted. It must have an even number of
183-
* characters, as each pair of characters represents one byte.
184-
* @return a byte array representing the binary data equivalent of the hexadecimal string.
185-
*/
186-
private static byte[] convertBinaryEncodedStringToByteArray(String binaryEncodedStr) {
187-
int length = binaryEncodedStr.length();
188-
int byteCount = (length + 7) / 8;
189-
byte[] byteArray = new byte[byteCount];
190-
191-
for (int i = 0; i < byteCount; i++) {
192-
int startIndex = i * 8;
193-
int endIndex = Math.min(startIndex + 8, length);
194-
String byteString = binaryEncodedStr.substring(startIndex, endIndex);
195-
byteArray[i] = (byte) Integer.parseInt(byteString, 2);
196-
}
197-
198-
return byteArray;
199178
}
200179

201180
/**
@@ -322,20 +301,17 @@ private static Object handleSpannerColumnType(
322301
String spannerType, String columnName, JSONObject valuesJson) {
323302
try {
324303
if (spannerType.contains("string")) {
325-
return valuesJson.optString(columnName);
304+
String value = valuesJson.optString(columnName);
305+
return value.isEmpty() ? null : value;
326306
} else if (spannerType.contains("bytes")) {
327307
if (valuesJson.isNull(columnName)) {
328308
return null;
329309
}
330310
String hexEncodedString = valuesJson.optString(columnName);
331-
return safeHandle(
332-
() -> {
333-
try {
334-
return safeHandle(() -> convertBinaryEncodedStringToByteArray(hexEncodedString));
335-
} catch (IllegalArgumentException e) {
336-
return parseBlobType(hexEncodedString);
337-
}
338-
});
311+
if (hexEncodedString.isEmpty()) {
312+
return null;
313+
}
314+
return safeHandle(() -> parseBlobType(hexEncodedString));
339315
} else {
340316
return valuesJson.isNull(columnName) ? null : valuesJson.opt(columnName);
341317
}

0 commit comments

Comments
 (0)