Skip to content

Commit daf32d7

Browse files
committed
Correcting shadow table read
1 parent 46bfa06 commit daf32d7

File tree

5 files changed

+142
-2
lines changed

5 files changed

+142
-2
lines changed

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,7 @@ public static PipelineResult run(Options options) {
616616

617617
shadowTableCreator.createShadowTablesInSpanner();
618618
Ddl ddl = SpannerSchema.getInformationSchemaAsDdl(spannerConfig);
619+
Ddl shadowTableDdl = SpannerSchema.getInformationSchemaAsDdl(spannerMetadataConfig);
619620
List<Shard> shards;
620621
String shardingMode;
621622
if (MYSQL_SOURCE_TYPE.equals(options.getSourceType())) {
@@ -764,6 +765,7 @@ public static PipelineResult run(Options options) {
764765
spannerMetadataConfig,
765766
options.getSourceDbTimezoneOffset(),
766767
ddl,
768+
shadowTableDdl,
767769
sourceSchema,
768770
options.getShadowTablePrefix(),
769771
options.getSkipDirectoryName(),

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public class SourceWriterFn extends DoFn<KV<Long, TrimmedShardedDataChangeRecord
101101
private final SpannerConfig spannerConfig;
102102
private transient SpannerDao spannerDao;
103103
private final Ddl ddl;
104+
private final Ddl shadowTableDdl;
104105
private final SourceSchema sourceSchema;
105106
private final String shadowTablePrefix;
106107
private final String skipDirName;
@@ -116,6 +117,7 @@ public SourceWriterFn(
116117
SpannerConfig spannerConfig,
117118
String sourceDbTimezoneOffset,
118119
Ddl ddl,
120+
Ddl shadowTableDdl,
119121
SourceSchema sourceSchema,
120122
String shadowTablePrefix,
121123
String skipDirName,
@@ -128,6 +130,7 @@ public SourceWriterFn(
128130
this.shards = shards;
129131
this.spannerConfig = spannerConfig;
130132
this.ddl = ddl;
133+
this.shadowTableDdl = shadowTableDdl;
131134
this.sourceSchema = sourceSchema;
132135
this.shadowTablePrefix = shadowTablePrefix;
133136
this.skipDirName = skipDirName;
@@ -211,7 +214,7 @@ public void processElement(ProcessContext c) {
211214
boolean isSourceAhead = false;
212215
ShadowTableRecord shadowTableRecord =
213216
spannerDao.readShadowTableRecordWithExclusiveLock(
214-
shadowTableName, primaryKey, ddl, shadowTransaction);
217+
shadowTableName, primaryKey, shadowTableDdl, shadowTransaction);
215218
isSourceAhead =
216219
shadowTableRecord != null
217220
&& ((shadowTableRecord
@@ -235,7 +238,10 @@ public void processElement(ProcessContext c) {
235238
() -> {
236239
ShadowTableRecord newShadowTableRecord =
237240
spannerDao.readShadowTableRecordWithExclusiveLock(
238-
shadowTableName, primaryKey, ddl, shadowTransaction);
241+
shadowTableName,
242+
primaryKey,
243+
shadowTableDdl,
244+
shadowTransaction);
239245
if (!ShadowTableRecord.isEquals(
240246
shadowTableRecord, newShadowTableRecord)) {
241247
throw new TransactionalCheckException(

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterTransform.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class SourceWriterTransform
5050
private final List<Shard> shards;
5151
private final SpannerConfig spannerConfig;
5252
private final Ddl ddl;
53+
private final Ddl shadowTableDdl;
5354
private final SourceSchema sourceSchema;
5455
private final String shadowTablePrefix;
5556
private final String skipDirName;
@@ -63,6 +64,7 @@ public SourceWriterTransform(
6364
SpannerConfig spannerConfig,
6465
String sourceDbTimezoneOffset,
6566
Ddl ddl,
67+
Ddl shadowTableDdl,
6668
SourceSchema sourceSchema,
6769
String shadowTablePrefix,
6870
String skipDirName,
@@ -75,6 +77,7 @@ public SourceWriterTransform(
7577
this.shards = shards;
7678
this.spannerConfig = spannerConfig;
7779
this.ddl = ddl;
80+
this.shadowTableDdl = shadowTableDdl;
7881
this.sourceSchema = sourceSchema;
7982
this.shadowTablePrefix = shadowTablePrefix;
8083
this.skipDirName = skipDirName;
@@ -96,6 +99,7 @@ public SourceWriterTransform.Result expand(
9699
this.spannerConfig,
97100
this.sourceDbTimezoneOffset,
98101
this.ddl,
102+
this.shadowTableDdl,
99103
this.sourceSchema,
100104
this.shadowTablePrefix,
101105
this.skipDirName,

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFnTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public class SourceWriterFnTest {
9696
private Shard testShard;
9797
private Schema testSchema;
9898
private Ddl testDdl;
99+
private Ddl shadowTableDdl;
99100
private SourceSchema testSourceSchema;
100101

101102
private ISchemaMapper schemaMapper;
@@ -177,6 +178,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
177178
testSourceDbTimezoneOffset = "+00:00";
178179
testDdl =
179180
SchemaUtils.buildSpannerDdlFromSessionFile("src/test/resources/sourceWriterUTSession.json");
181+
shadowTableDdl =
182+
SchemaUtils.buildSpannerShadowTableDdlFromSessionFile(
183+
"src/test/resources/sourceWriterUTSession.json");
180184
testSourceSchema =
181185
SchemaUtils.buildSourceSchemaFromSessionFile(
182186
"src/test/resources/sourceWriterUTSession.json");
@@ -199,6 +203,7 @@ public void testSourceIsAhead() throws Exception {
199203
mockSpannerConfig,
200204
testSourceDbTimezoneOffset,
201205
testDdl,
206+
shadowTableDdl,
202207
testSourceSchema,
203208
"shadow_",
204209
"skip",
@@ -229,6 +234,7 @@ public void testSourceIsAheadWithSameCommitTimestamp() throws Exception {
229234
mockSpannerConfig,
230235
testSourceDbTimezoneOffset,
231236
testDdl,
237+
shadowTableDdl,
232238
testSourceSchema,
233239
"shadow_",
234240
"skip",
@@ -258,6 +264,7 @@ public void testSourceIsBehind() throws Exception {
258264
mockSpannerConfig,
259265
testSourceDbTimezoneOffset,
260266
testDdl,
267+
shadowTableDdl,
261268
testSourceSchema,
262269
"shadow_",
263270
"skip",
@@ -292,6 +299,7 @@ public void testCustomTransformationException() throws Exception {
292299
mockSpannerConfig,
293300
testSourceDbTimezoneOffset,
294301
testDdl,
302+
shadowTableDdl,
295303
testSourceSchema,
296304
"shadow_",
297305
"skip",
@@ -333,6 +341,7 @@ public void testCustomTransformationApplied() throws Exception {
333341
mockSpannerConfig,
334342
testSourceDbTimezoneOffset,
335343
testDdl,
344+
shadowTableDdl,
336345
testSourceSchema,
337346
"shadow_",
338347
"skip",
@@ -370,6 +379,7 @@ public void testCustomTransformationFiltered() throws Exception {
370379
mockSpannerConfig,
371380
testSourceDbTimezoneOffset,
372381
testDdl,
382+
shadowTableDdl,
373383
testSourceSchema,
374384
"shadow_",
375385
"skip",
@@ -405,6 +415,7 @@ public void testNoShard() throws Exception {
405415
mockSpannerConfig,
406416
testSourceDbTimezoneOffset,
407417
testDdl,
418+
shadowTableDdl,
408419
testSourceSchema,
409420
"shadow_",
410421
"skip",
@@ -437,6 +448,7 @@ public void testSkipShard() throws Exception {
437448
mockSpannerConfig,
438449
testSourceDbTimezoneOffset,
439450
testDdl,
451+
shadowTableDdl,
440452
testSourceSchema,
441453
"shadow_",
442454
"skip",
@@ -467,6 +479,7 @@ public void testPermanentError() throws Exception {
467479
mockSpannerConfig,
468480
testSourceDbTimezoneOffset,
469481
testDdl,
482+
shadowTableDdl,
470483
testSourceSchema,
471484
"shadow_",
472485
"skip",
@@ -501,6 +514,7 @@ public void testRetryableError() throws Exception {
501514
mockSpannerConfig,
502515
testSourceDbTimezoneOffset,
503516
testDdl,
517+
shadowTableDdl,
504518
testSourceSchema,
505519
"shadow_",
506520
"skip",
@@ -531,6 +545,7 @@ public void testRetryableErrorForForeignKey() throws Exception {
531545
mockSpannerConfig,
532546
testSourceDbTimezoneOffset,
533547
testDdl,
548+
shadowTableDdl,
534549
testSourceSchema,
535550
"shadow_",
536551
"skip",
@@ -563,6 +578,7 @@ public void testRetryableErrorConnectionFailure() throws Exception {
563578
mockSpannerConfig,
564579
testSourceDbTimezoneOffset,
565580
testDdl,
581+
shadowTableDdl,
566582
testSourceSchema,
567583
"shadow_",
568584
"skip",
@@ -595,6 +611,7 @@ public void testPermanentConnectionFailure() throws Exception {
595611
mockSpannerConfig,
596612
testSourceDbTimezoneOffset,
597613
testDdl,
614+
shadowTableDdl,
598615
testSourceSchema,
599616
"shadow_",
600617
"skip",
@@ -627,6 +644,7 @@ public void testPermanentGenericException() throws Exception {
627644
mockSpannerConfig,
628645
testSourceDbTimezoneOffset,
629646
testDdl,
647+
shadowTableDdl,
630648
testSourceSchema,
631649
"shadow_",
632650
"skip",
@@ -658,6 +676,7 @@ public void testDMLEmpty() throws Exception {
658676
mockSpannerConfig,
659677
testSourceDbTimezoneOffset,
660678
testDdlForNullDML(),
679+
testDdlForNullDML(),
661680
testSourceSchema,
662681
"shadow_",
663682
"skip",
@@ -681,6 +700,7 @@ public void testTeardown() throws Exception {
681700
mockSpannerConfig,
682701
testSourceDbTimezoneOffset,
683702
testDdl,
703+
shadowTableDdl,
684704
testSourceSchema,
685705
"shadow_",
686706
"skip",
@@ -703,6 +723,7 @@ public void testTeardownWithNulls() throws Exception {
703723
mockSpannerConfig,
704724
testSourceDbTimezoneOffset,
705725
testDdl,
726+
shadowTableDdl,
706727
testSourceSchema,
707728
"shadow_",
708729
"skip",

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtils.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.google.cloud.teleport.v2.templates.utils;
1717

1818
import com.fasterxml.jackson.databind.ObjectMapper;
19+
import com.google.cloud.teleport.v2.spanner.ddl.Column;
1920
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
2021
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
2122
import com.google.cloud.teleport.v2.spanner.ddl.Table;
@@ -24,9 +25,11 @@
2425
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
2526
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceTable;
2627
import com.google.cloud.teleport.v2.spanner.type.Type;
28+
import com.google.cloud.teleport.v2.templates.constants.Constants;
2729
import com.google.common.collect.ImmutableList;
2830
import com.google.common.collect.ImmutableMap;
2931
import java.io.File;
32+
import java.util.ArrayList;
3033
import java.util.List;
3134
import java.util.Map;
3235

@@ -223,4 +226,108 @@ public static SourceSchema buildSourceSchemaFromSessionFile(String sessionFile)
223226
throw new RuntimeException("Error reading or parsing session file: " + sessionFile, e);
224227
}
225228
}
229+
230+
public static Ddl buildSpannerShadowTableDdlFromSessionFile(String sessionFile) {
231+
try {
232+
ObjectMapper mapper = new ObjectMapper();
233+
Map<String, Object> session = mapper.readValue(new File(sessionFile), Map.class);
234+
Map<String, Object> spSchema = (Map<String, Object>) session.get(KEY_SP_SCHEMA);
235+
Ddl.Builder ddlBuilder = Ddl.builder();
236+
237+
for (Map.Entry<String, Object> entry : spSchema.entrySet()) {
238+
Map<String, Object> tableMap = (Map<String, Object>) entry.getValue();
239+
String tableName = (String) tableMap.get(KEY_NAME);
240+
String shadowTableName = "shadow_" + tableName;
241+
Table.Builder tableBuilder = ddlBuilder.createTable(shadowTableName);
242+
Map<String, Object> colDefs = (Map<String, Object>) tableMap.get(KEY_COL_DEFS);
243+
244+
List<String> pkColNames = new ArrayList<>();
245+
List<Map<String, Object>> pks = (List<Map<String, Object>>) tableMap.get(KEY_PRIMARY_KEYS);
246+
if (pks != null && !pks.isEmpty()) {
247+
for (Map<String, Object> pk : pks) {
248+
String colId = (String) pk.get(KEY_COL_ID);
249+
Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
250+
String colName = (String) colMap.get(KEY_NAME);
251+
pkColNames.add(colName);
252+
}
253+
}
254+
255+
for (String colId : colDefs.keySet()) {
256+
Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
257+
String colName = (String) colMap.get(KEY_NAME);
258+
if (!pkColNames.contains(colName)) {
259+
continue;
260+
}
261+
Map<String, Object> typeMap = (Map<String, Object>) colMap.get(KEY_TYPE_SPANNER);
262+
String typeName = (String) typeMap.get(KEY_NAME);
263+
Boolean isArray = (Boolean) typeMap.get(KEY_IS_ARRAY);
264+
265+
switch (typeName) {
266+
case SPANNER_TYPE_STRING:
267+
if (Boolean.TRUE.equals(isArray)) {
268+
tableBuilder.column(colName).array(Type.string()).endColumn();
269+
} else {
270+
tableBuilder.column(colName).string().max().endColumn();
271+
}
272+
break;
273+
case SPANNER_TYPE_INT64:
274+
tableBuilder.column(colName).int64().endColumn();
275+
break;
276+
case SPANNER_TYPE_FLOAT32:
277+
tableBuilder.column(colName).float32().endColumn();
278+
break;
279+
case SPANNER_TYPE_FLOAT64:
280+
tableBuilder.column(colName).float64().endColumn();
281+
break;
282+
case SPANNER_TYPE_BOOL:
283+
tableBuilder.column(colName).bool().endColumn();
284+
break;
285+
case SPANNER_TYPE_BYTES:
286+
tableBuilder.column(colName).bytes().max().endColumn();
287+
break;
288+
case SPANNER_TYPE_TIMESTAMP:
289+
tableBuilder.column(colName).timestamp().endColumn();
290+
break;
291+
case SPANNER_TYPE_DATE:
292+
tableBuilder.column(colName).date().endColumn();
293+
break;
294+
case SPANNER_TYPE_NUMERIC:
295+
tableBuilder.column(colName).numeric().endColumn();
296+
break;
297+
case SPANNER_TYPE_JSON:
298+
tableBuilder.column(colName).json().endColumn();
299+
break;
300+
default:
301+
throw new IllegalArgumentException(
302+
"Unsupported Spanner type in session file: " + typeName);
303+
}
304+
}
305+
306+
Column.Builder processedCommitTimestampColumnBuilder =
307+
tableBuilder.column(Constants.PROCESSED_COMMIT_TS_COLUMN_NAME);
308+
tableBuilder.addColumn(
309+
processedCommitTimestampColumnBuilder
310+
.type(Type.timestamp())
311+
.notNull(false)
312+
.autoBuild());
313+
314+
Column.Builder recordSequenceColumnBuilder =
315+
tableBuilder.column(Constants.RECORD_SEQ_COLUMN_NAME);
316+
tableBuilder.addColumn(
317+
recordSequenceColumnBuilder.type(Type.int64()).notNull(false).autoBuild());
318+
319+
if (!pkColNames.isEmpty()) {
320+
IndexColumn.IndexColumnsBuilder<Table.Builder> pkBuilder = tableBuilder.primaryKey();
321+
for (String pkColName : pkColNames) {
322+
pkBuilder.asc(pkColName);
323+
}
324+
pkBuilder.end();
325+
}
326+
tableBuilder.endTable();
327+
}
328+
return ddlBuilder.build();
329+
} catch (Exception e) {
330+
throw new RuntimeException("Error reading or parsing session file: " + sessionFile, e);
331+
}
332+
}
226333
}

0 commit comments

Comments
 (0)