Skip to content

Commit cb18ecc

Browse files
Add metrics/DLQ record for records getting silently dropped with a warning in reverse replication (#3200)
## Summary This PR addresses critical issues in the SpannerToSourceDb pipeline where certain error conditions were being silently dropped or causing inefficient worker retries. A thorough investigation of LOG warnings and exceptions revealed that invalid DML generation and specific JSON parsing failures were not being handled robustly. ## Changes ### DML Generation Integrity - **Problem:** Previously, when the DMLGenerator encountered invalid requests (e.g., missing primary keys or schema mismatches), it returned an empty string ("") and logged a warning. This caused SourceWriterFn to mistakenly treat the record as "processed" or simply skip it, leading to silent data loss without alerting the user or routing to the Dead Letter Queue (DLQ). - **Fix:** Updated MySQLDMLGenerator and CassandraDMLGenerator to throw a new InvalidDMLGenerationException instead of returning an empty string. - **Classification:** This exception is classified as a Severe Error because its caused due to schema issues that will not resolve upon retry. - **Propagation:** The exception is now propagated from DMLGenerator -> InputRecordProcessor -> SourceWriterFn It is then caught in SourceWriterFn, correctly unwrapped and identified by SpannerToSourceDbExceptionClassifier, and finally routed to the DLQ with a PERMANENT_ERROR_TAG to ensure visibility and prevent data loss.
1 parent 8ac9904 commit cb18ecc

File tree

9 files changed

+316
-236
lines changed

9 files changed

+316
-236
lines changed

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGenerator.java

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceColumn;
2323
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
2424
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceTable;
25+
import com.google.cloud.teleport.v2.templates.exceptions.InvalidDMLGenerationException;
2526
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest;
2627
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
2728
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
@@ -76,50 +77,47 @@ public class CassandraDMLGenerator implements IDMLGenerator {
7677
@Override
7778
public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequest) {
7879
if (dmlGeneratorRequest == null) {
79-
LOG.warn("DMLGeneratorRequest is null. Cannot process the request.");
80-
return new DMLGeneratorResponse("");
80+
throw new InvalidDMLGenerationException(
81+
"DMLGeneratorRequest is null. Cannot process the request.");
8182
}
8283
ISchemaMapper schemaMapper = dmlGeneratorRequest.getSchemaMapper();
8384
String spannerTableName = dmlGeneratorRequest.getSpannerTableName();
8485
Ddl spannerDdl = dmlGeneratorRequest.getSpannerDdl();
8586
SourceSchema sourceSchema = dmlGeneratorRequest.getSourceSchema();
86-
if (schemaMapper == null || spannerDdl == null || sourceSchema == null) {
87-
LOG.warn(
88-
"Schema Mapper, Ddl and SourceSchema must be not null, respectively found {},{},{}.",
89-
schemaMapper,
90-
spannerDdl,
91-
sourceSchema);
92-
return new DMLGeneratorResponse("");
87+
if (schemaMapper == null) {
88+
throw new InvalidDMLGenerationException("Schema Mapper must be not null");
89+
}
90+
if (spannerDdl == null) {
91+
throw new InvalidDMLGenerationException("Spanner Ddl must be not null.");
92+
}
93+
if (sourceSchema == null) {
94+
throw new InvalidDMLGenerationException("SourceSchema must be not null.");
9395
}
9496
String sourceTableName = "";
9597
try {
9698
sourceTableName = schemaMapper.getSourceTableName("", spannerTableName);
9799
} catch (Exception e) {
98-
LOG.warn(
99-
"Equivalent table for {} was not found in source, check schema mapping provided",
100-
spannerTableName);
101-
return new DMLGeneratorResponse("");
100+
throw new InvalidDMLGenerationException(
101+
"Could not find source table name for spanner table: " + spannerTableName, e);
102102
}
103103

104104
Table spannerTable = spannerDdl.table(spannerTableName);
105105
if (spannerTable == null) {
106-
LOG.warn("Spanner table {} not found. Dropping the record.", spannerTableName);
107-
return new DMLGeneratorResponse("");
106+
throw new InvalidDMLGenerationException(
107+
String.format(
108+
"The spanner table %s was not found in ddl found on spanner.", spannerTableName));
108109
}
109110
SourceTable sourceTable = sourceSchema.table(sourceTableName);
110111
if (sourceTable == null) {
111-
LOG.warn(
112-
"Source table {} not found for Spanner table Name: {}",
113-
sourceTableName,
114-
spannerTableName);
115-
return new DMLGeneratorResponse("");
112+
throw new InvalidDMLGenerationException(
113+
String.format("The source table %s was not found in source schema.", sourceTableName));
116114
}
117115

118116
if (sourceTable.primaryKeyColumns() == null || sourceTable.primaryKeyColumns().size() == 0) {
119-
LOG.warn(
120-
"Cannot reverse replicate table {} without primary key. Skipping the record.",
121-
sourceTableName);
122-
return new DMLGeneratorResponse("");
117+
throw new InvalidDMLGenerationException(
118+
String.format(
119+
"Cannot reverse replicate for source table %s without primary key, skipping the record",
120+
sourceTableName));
123121
}
124122

125123
Map<String, PreparedStatementValueObject<?>> pkColumnNameValues =
@@ -132,10 +130,10 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
132130
dmlGeneratorRequest.getSourceDbTimezoneOffset(),
133131
dmlGeneratorRequest.getCustomTransformationResponse());
134132
if (pkColumnNameValues == null) {
135-
LOG.warn(
136-
"Failed to generate primary key values for table {}. Skipping the record.",
137-
sourceTableName);
138-
return new DMLGeneratorResponse("");
133+
throw new InvalidDMLGenerationException(
134+
String.format(
135+
"Cannot reverse replicate for table %s without primary key, skipping the record",
136+
sourceTableName));
139137
}
140138
java.sql.Timestamp timestamp = dmlGeneratorRequest.getCommitTimestamp().toSqlTimestamp();
141139
String modType = dmlGeneratorRequest.getModType();
@@ -200,8 +198,8 @@ private static DMLGeneratorResponse generateDMLResponse(
200198
} else if ("DELETE".equals(modType)) {
201199
return getDeleteStatementCQL(sourceTable.name(), timestamp, pkColumnNameValues);
202200
} else {
203-
LOG.error("Unsupported modType: {} for table {}", modType, spannerTable.name());
204-
return new DMLGeneratorResponse("");
201+
throw new InvalidDMLGenerationException(
202+
String.format("Unsupported modType: %s for table %s", modType, spannerTable.name()));
205203
}
206204
}
207205

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/MySQLDMLGenerator.java

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceColumn;
2323
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
2424
import com.google.cloud.teleport.v2.spanner.type.Type;
25+
import com.google.cloud.teleport.v2.templates.exceptions.InvalidDMLGenerationException;
2526
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest;
2627
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
2728
import com.google.common.annotations.VisibleForTesting;
@@ -43,52 +44,52 @@ public class MySQLDMLGenerator implements IDMLGenerator {
4344

4445
public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequest) {
4546
if (dmlGeneratorRequest == null) {
46-
LOG.warn("DMLGeneratorRequest is null. Cannot process the request.");
47-
return new DMLGeneratorResponse("");
47+
throw new InvalidDMLGenerationException(
48+
"DMLGeneratorRequest is null. Cannot process the request.");
4849
}
4950
String spannerTableName = dmlGeneratorRequest.getSpannerTableName();
5051
ISchemaMapper schemaMapper = dmlGeneratorRequest.getSchemaMapper();
5152
Ddl spannerDdl = dmlGeneratorRequest.getSpannerDdl();
5253
SourceSchema sourceSchema = dmlGeneratorRequest.getSourceSchema();
53-
if (schemaMapper == null || spannerDdl == null || sourceSchema == null) {
54-
LOG.warn(
55-
"Schema Mapper, Ddl and SourceSchema must be not null, respectively found {},{},{}.",
56-
schemaMapper,
57-
spannerDdl,
58-
sourceSchema);
59-
return new DMLGeneratorResponse("");
54+
55+
if (schemaMapper == null) {
56+
throw new InvalidDMLGenerationException("Schema Mapper must be not null");
57+
}
58+
if (spannerDdl == null) {
59+
throw new InvalidDMLGenerationException("Spanner Ddl must be not null.");
6060
}
61+
if (sourceSchema == null) {
62+
throw new InvalidDMLGenerationException("SourceSchema must be not null.");
63+
}
64+
6165
Table spannerTable = spannerDdl.table(spannerTableName);
6266
if (spannerTable == null) {
63-
LOG.warn(
64-
"The spanner table {} was not found in ddl found on spanner. Ddl: {}",
65-
spannerTableName,
66-
spannerDdl);
67-
return new DMLGeneratorResponse("");
67+
throw new InvalidDMLGenerationException(
68+
String.format(
69+
"The spanner table %s was not found in ddl found on spanner", spannerTableName));
6870
}
6971

7072
String sourceTableName = "";
7173
try {
7274
sourceTableName = schemaMapper.getSourceTableName("", spannerTableName);
7375
} catch (NoSuchElementException e) {
74-
return new DMLGeneratorResponse("");
76+
throw new InvalidDMLGenerationException(
77+
"Could not find source table name for spanner table: " + spannerTableName, e);
7578
}
7679
com.google.cloud.teleport.v2.spanner.sourceddl.SourceTable sourceTable =
7780
sourceSchema.table(sourceTableName);
7881
if (sourceTable == null) {
79-
LOG.warn(
80-
"Equivalent table {} was not found in source for spanner table {}",
81-
sourceTableName,
82-
spannerTableName);
83-
return new DMLGeneratorResponse("");
82+
throw new InvalidDMLGenerationException(
83+
String.format(
84+
"Equivalent table %s was not found in source for spanner table %s",
85+
sourceTableName, spannerTableName));
8486
}
8587

8688
if (sourceTable.primaryKeyColumns() == null || sourceTable.primaryKeyColumns().size() == 0) {
87-
LOG.warn(
88-
"Cannot reverse replicate for source table {} without primary key, skipping the record. Source Table: {}",
89-
sourceTableName,
90-
sourceTable);
91-
return new DMLGeneratorResponse("");
89+
throw new InvalidDMLGenerationException(
90+
String.format(
91+
"Cannot reverse replicate for source table %s without primary key, skipping the record.",
92+
sourceTableName));
9293
}
9394

9495
Map<String, String> pkcolumnNameValues =
@@ -101,10 +102,10 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
101102
dmlGeneratorRequest.getSourceDbTimezoneOffset(),
102103
dmlGeneratorRequest.getCustomTransformationResponse());
103104
if (pkcolumnNameValues == null) {
104-
LOG.warn(
105-
"Cannot reverse replicate for table {} without primary key, skipping the record",
106-
sourceTableName);
107-
return new DMLGeneratorResponse("");
105+
throw new InvalidDMLGenerationException(
106+
String.format(
107+
"Cannot reverse replicate for table %s without primary key, skipping the record",
108+
sourceTableName));
108109
}
109110

110111
if ("INSERT".equals(dmlGeneratorRequest.getModType())
@@ -115,8 +116,10 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
115116
} else if ("DELETE".equals(dmlGeneratorRequest.getModType())) {
116117
return getDeleteStatement(sourceTable.name(), pkcolumnNameValues);
117118
} else {
118-
LOG.warn("Unsupported modType: " + dmlGeneratorRequest.getModType());
119-
return new DMLGeneratorResponse("");
119+
throw new InvalidDMLGenerationException(
120+
String.format(
121+
"Unsupported modType: %s for table %s",
122+
dmlGeneratorRequest.getModType(), spannerTableName));
120123
}
121124
}
122125

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/InputRecordProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.IDao;
3030
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.TransactionalCheck;
3131
import com.google.cloud.teleport.v2.templates.dbutils.dml.IDMLGenerator;
32+
import com.google.cloud.teleport.v2.templates.exceptions.InvalidDMLGenerationException;
3233
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest;
3334
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
3435
import java.time.Instant;
@@ -110,8 +111,7 @@ public static boolean processRecord(
110111

111112
DMLGeneratorResponse dmlGeneratorResponse = dmlGenerator.getDMLStatement(dmlGeneratorRequest);
112113
if (dmlGeneratorResponse.getDmlStatement().isEmpty()) {
113-
LOG.warn("DML statement is empty for table: " + tableName);
114-
return false;
114+
throw new InvalidDMLGenerationException("DML statement is empty for table: " + tableName);
115115
}
116116
// TODO we need to handle it as proper Interface Level as of now we have handle Prepared
117117
// TODO Statement and Raw Statement Differently
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (C) 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates.exceptions;
17+
18+
/**
19+
* Exception thrown when the DML Generator fails to create a valid DML statement. This effectively
20+
* prevents silent data loss by routing these records to DLQ.
21+
*/
22+
public class InvalidDMLGenerationException extends RuntimeException {
23+
public InvalidDMLGenerationException(String message) {
24+
super(message);
25+
}
26+
27+
public InvalidDMLGenerationException(String message, Throwable cause) {
28+
super(message, cause);
29+
}
30+
}

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/utils/SpannerToSourceDbExceptionClassifier.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException;
2222
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
2323
import com.google.cloud.teleport.v2.templates.constants.Constants;
24+
import com.google.cloud.teleport.v2.templates.exceptions.InvalidDMLGenerationException;
2425
import java.sql.SQLDataException;
2526
import java.sql.SQLNonTransientConnectionException;
2627
import java.sql.SQLSyntaxErrorException;
@@ -47,6 +48,7 @@ public static TupleTag<String> classify(Exception exception) {
4748
return classifySpannerException(e);
4849
} else if (exception instanceof ChangeEventConvertorException
4950
|| exception instanceof IllegalArgumentException
51+
|| exception instanceof InvalidDMLGenerationException
5052
|| exception instanceof NullPointerException) {
5153
return Constants.PERMANENT_ERROR_TAG;
5254
}
@@ -61,7 +63,8 @@ private static TupleTag<String> classifySpannerException(SpannerException except
6163
Throwable cause = exception.getCause();
6264

6365
if (cause instanceof InvalidTransformationException
64-
|| cause instanceof ChangeEventConvertorException) {
66+
|| cause instanceof ChangeEventConvertorException
67+
|| cause instanceof InvalidDMLGenerationException) {
6568
return Constants.PERMANENT_ERROR_TAG;
6669
} else if (cause instanceof CodecNotFoundException
6770
|| cause instanceof SQLSyntaxErrorException

0 commit comments

Comments
 (0)