Skip to content

Commit 649a70e

Browse files
authored
[SourceDbToSpanner] Correcting Binary datatype handling in SourceDbToSpanner template on Spanner write errors (#3159)
* [SourceDbToSpanner] Correcting Binary datatype handling in SourceDbToSpanner template on Spanner write errors * Correcting job name * Checking MySQLAllDataTypesBulkAndLiveIT * Correcting DeadLetterQueue FileWriterTransform creation * Correcting BIT column handling for CustomTransformation applied rows * Correcting MySQLCustomTransformationsNonShardedIT
1 parent 5de840a commit 649a70e

File tree

11 files changed

+546
-170
lines changed

11 files changed

+546
-170
lines changed

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/writer/DeadLetterQueue.java

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.google.cloud.teleport.v2.writer;
1717

1818
import com.google.cloud.spanner.Mutation;
19+
import com.google.cloud.spanner.Type;
1920
import com.google.cloud.spanner.Value;
2021
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
2122
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
@@ -33,6 +34,7 @@
3334
import java.time.Instant;
3435
import java.util.Map;
3536
import java.util.UUID;
37+
import java.util.stream.Collectors;
3638
import org.apache.avro.Schema.Field;
3739
import org.apache.avro.generic.GenericRecord;
3840
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -46,6 +48,7 @@
4648
import org.apache.beam.sdk.transforms.ParDo;
4749
import org.apache.beam.sdk.values.PCollection;
4850
import org.apache.beam.sdk.values.PDone;
51+
import org.apache.commons.codec.binary.Hex;
4952
import org.checkerframework.checker.initialization.qual.Initialized;
5053
import org.checkerframework.checker.nullness.qual.NonNull;
5154
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
@@ -62,8 +65,6 @@ public class DeadLetterQueue implements Serializable {
6265

6366
private final Ddl ddl;
6467

65-
private final PTransform<PCollection<String>, PDone> dlqTransform;
66-
6768
private Map<String, String> srcTableToShardIdColumnMap;
6869

6970
private final SQLDialect sqlDialect;
@@ -87,26 +88,21 @@ public String getDlqDirectory() {
8788
return dlqDirectory;
8889
}
8990

90-
public PTransform<PCollection<String>, PDone> getDlqTransform() {
91-
return dlqTransform;
92-
}
93-
9491
private DeadLetterQueue(
9592
String dlqDirectory,
9693
Ddl ddl,
9794
Map<String, String> srcTableToShardIdColumnMap,
9895
SQLDialect sqlDialect,
9996
ISchemaMapper iSchemaMapper) {
10097
this.dlqDirectory = dlqDirectory;
101-
this.dlqTransform = createDLQTransform(dlqDirectory);
10298
this.ddl = ddl;
10399
this.srcTableToShardIdColumnMap = srcTableToShardIdColumnMap;
104100
this.sqlDialect = sqlDialect;
105101
this.schemaMapper = iSchemaMapper;
106102
}
107103

108104
@VisibleForTesting
109-
private PTransform<PCollection<String>, PDone> createDLQTransform(String dlqDirectory) {
105+
PTransform<PCollection<String>, PDone> createDLQTransform(String dlqDirectory) {
110106
if (dlqDirectory == null) {
111107
throw new RuntimeException("Unable to start pipeline as DLQ is not configured");
112108
}
@@ -163,7 +159,7 @@ public void processElement(
163159
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
164160
.apply("SanitizeTransformWriteDLQ", MapElements.via(new StringDeadLetterQueueSanitizer()))
165161
.setCoder(StringUtf8Coder.of())
166-
.apply("FilteredRowsDLQ", dlqTransform);
162+
.apply("FilteredRowsDLQ", createDLQTransform(dlqDirectory));
167163
LOG.info("added filtering dlq stage after transformer");
168164
}
169165

@@ -186,7 +182,7 @@ public void processElement(
186182
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
187183
.apply("SanitizeTransformWriteDLQ", MapElements.via(new StringDeadLetterQueueSanitizer()))
188184
.setCoder(StringUtf8Coder.of())
189-
.apply("TransformerDLQ", dlqTransform);
185+
.apply("TransformerDLQ", createDLQTransform(dlqDirectory));
190186
LOG.info("added dlq stage after transformer");
191187
}
192188

@@ -259,7 +255,7 @@ public void processElement(
259255
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
260256
.apply("SanitizeSpannerWriteDLQ", MapElements.via(new StringDeadLetterQueueSanitizer()))
261257
.setCoder(StringUtf8Coder.of())
262-
.apply("WriterDLQ", dlqTransform);
258+
.apply("WriterDLQ", createDLQTransform(dlqDirectory));
263259
LOG.info("added dlq stage after writer");
264260
}
265261

@@ -273,7 +269,39 @@ protected FailsafeElement<String, String> mutationToDlqElement(Mutation m) {
273269
Map<String, Value> mutationMap = m.asMap();
274270
for (Map.Entry<String, Value> entry : mutationMap.entrySet()) {
275271
Value value = entry.getValue();
276-
json.put(entry.getKey(), value == null ? null : String.valueOf(value));
272+
Object val = null;
273+
if (value != null && !value.isNull()) {
274+
switch (value.getType().getCode()) {
275+
case BYTES:
276+
val = Hex.encodeHexString(value.getBytes().toByteArray());
277+
break;
278+
case INT64:
279+
val = value.getInt64();
280+
break;
281+
case FLOAT64:
282+
val = value.getFloat64();
283+
break;
284+
case NUMERIC:
285+
val = value.getNumeric();
286+
break;
287+
case BOOL:
288+
val = value.getBool();
289+
break;
290+
case ARRAY:
291+
if (value.getType().getArrayElementType().getCode() == Type.Code.BYTES) {
292+
val =
293+
value.getBytesArray().stream()
294+
.map(v -> v == null ? null : Hex.encodeHexString(v.toByteArray()))
295+
.collect(Collectors.toList());
296+
} else {
297+
val = value.toString();
298+
}
299+
break;
300+
default:
301+
val = value.toString();
302+
}
303+
}
304+
putValueToJson(json, entry.getKey(), val);
277305
}
278306

279307
return FailsafeElement.of(json.toString(), json.toString())
@@ -305,7 +333,7 @@ private void initializeJsonNode(JSONObject json, String tableName, long timeStam
305333
private void putValueToJson(JSONObject json, String key, Object value) {
306334
if (value == null) {
307335
json.put(key, (Object) null);
308-
} else if (value instanceof Number) {
336+
} else if (value instanceof Number || value instanceof java.util.Collection) {
309337
json.put(key, value);
310338
} else {
311339
json.put(key, value.toString());

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/failureinjectiontesting/MySQLAllDataTypesCustomTransformationsBulkAndLiveFT.java renamed to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLAllDataTypesBulkAndLiveIT.java

Lines changed: 106 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
* License for the specific language governing permissions and limitations under
1414
* the License.
1515
*/
16-
package com.google.cloud.teleport.v2.templates.failureinjectiontesting;
16+
package com.google.cloud.teleport.v2.templates;
1717

18+
import static com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider.AUTHORS_TABLE;
19+
import static com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider.BOOKS_TABLE;
1820
import static com.google.cloud.teleport.v2.templates.MySQLDataTypesIT.repeatString;
1921
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
2022
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
@@ -23,7 +25,8 @@
2325
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
2426
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
2527
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
26-
import com.google.cloud.teleport.v2.templates.SourceDbToSpanner;
28+
import com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.MySQLSrcDataProvider;
29+
import com.google.cloud.teleport.v2.templates.failureinjectiontesting.SourceDbToSpannerFTBase;
2730
import java.io.IOException;
2831
import java.time.Duration;
2932
import java.util.ArrayList;
@@ -51,30 +54,35 @@
5154
import org.slf4j.LoggerFactory;
5255

5356
/**
54-
* An integration test for {@link SourceDbToSpanner} Flex template which tests all data types
55-
* migration with custom transformations, bulk failure injection, and live retry.
57+
* A failure injection test for Bulk + retry DLQ Live migration i.e., SourceDbToSpanner and
58+
* DataStreamToSpanner templates. The bulk migration template does not retry transient failures.
59+
* Live migration template is used to retry the failures which happened during the Bulk migration.
60+
* This test injects Spanner errors to simulate transient errors during Bulk migration and checks
61+
* the template behaviour. This test tests all data types migration with custom transformations,
62+
* bulk failure injection, and live retry. The Bulk failures are injected both at Custom
63+
* transformation phase and Spanner write phase. The test case also includes an interleaved table.
5664
*/
5765
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
5866
@TemplateIntegrationTest(SourceDbToSpanner.class)
5967
@RunWith(JUnit4.class)
60-
public class MySQLAllDataTypesCustomTransformationsBulkAndLiveFT extends SourceDbToSpannerFTBase {
68+
public class MySQLAllDataTypesBulkAndLiveIT extends SourceDbToSpannerFTBase {
6169

62-
private static final Logger LOG =
63-
LoggerFactory.getLogger(MySQLAllDataTypesCustomTransformationsBulkAndLiveFT.class);
70+
private static final Logger LOG = LoggerFactory.getLogger(MySQLAllDataTypesBulkAndLiveIT.class);
6471

6572
private static final String MYSQL_DDL_RESOURCE =
66-
"MySQLAllDataTypesCustomTransformationsBulkAndLiveFT/mysql-schema.sql";
73+
"MySQLAllDataTypesBulkAndLiveIT/mysql-schema.sql";
6774
private static final String SPANNER_DDL_RESOURCE =
68-
"MySQLAllDataTypesCustomTransformationsBulkAndLiveFT/spanner-schema.sql";
69-
private static final String TABLE_NAME = "AllDataTypes";
75+
"MySQLAllDataTypesBulkAndLiveIT/spanner-schema.sql";
76+
77+
private static final String TABLE_CT = "AllDataTypes_CT"; // Custom transformation failures
78+
private static final String TABLE_SWF = "AllDataTypes_SWF"; // Spanner write failures
7079

7180
private static PipelineLauncher.LaunchInfo bulkJobInfo;
72-
private static PipelineLauncher.LaunchInfo retryLiveJobInfo;
81+
private static PipelineLauncher.LaunchInfo liveJobInfo;
7382

7483
public static CloudMySQLResourceManager mySQLResourceManager;
7584
public static SpannerResourceManager spannerResourceManager;
7685
private static GcsResourceManager gcsResourceManager;
77-
private static String bulkErrorFolderFullPath;
7886

7987
@Before
8088
public void setUp() throws Exception {
@@ -93,100 +101,142 @@ public void setUp() throws Exception {
93101
// create MySQL Resources
94102
mySQLResourceManager = CloudMySQLResourceManager.builder(testName).build();
95103

104+
// Load DDL for AllDataTypes tables and Authors/Books
96105
loadSQLFileResource(mySQLResourceManager, MYSQL_DDL_RESOURCE);
97106

98-
bulkErrorFolderFullPath = getGcsPath("output", gcsResourceManager);
107+
// Insert data for Authors and Books
108+
MySQLSrcDataProvider.writeRowsInSourceDB(1, 200, mySQLResourceManager);
109+
}
110+
111+
@After
112+
public void cleanUp() {
113+
ResourceManagerUtils.cleanResources(
114+
spannerResourceManager, mySQLResourceManager, gcsResourceManager);
115+
}
116+
117+
@Test
118+
public void testAllScenarios() throws IOException {
119+
// --------------------------------------------------------------------------------------------
120+
// Phase 1: Bulk Migration
121+
// --------------------------------------------------------------------------------------------
122+
123+
// Launch Bulk Job for All Scenarios
124+
// We use CustomTransformationAllTypesWithException which is selective:
125+
// - AllDataTypes_CT table: Fails (Simulated) - Custom transformation class throws exception
126+
// - AllDataTypes_SWF table: Passes transformation, Fails at Spanner Write (Schema mismatch) -
127+
// length of bit_col is too small
128+
// - Authors/Books table: Passes transformation, Fails at Spanner Write (Schema mismatch for
129+
// Authors) - length of name column in authors table is too small. It is such that 9 rows
130+
// `author_1` to `author_9` gets inserted and the rest 191 rows fail. Similarly, 9 books rows
131+
// corresponding to authors would get inserted and rest 191 would fail.
99132

100-
// Define Custom Transformation with Exception (Bad)
101133
CustomTransformation customTransformationBad =
102134
CustomTransformation.builder(
103135
"customTransformation.jar", "com.custom.CustomTransformationAllTypesWithException")
104136
.build();
105137

106-
// launch bulk migration
138+
Map<String, String> params = new HashMap<>();
139+
// No 'tables' parameter needed, migrate everything
140+
params.put("outputDirectory", getGcsPath("output", gcsResourceManager));
141+
107142
bulkJobInfo =
108143
launchBulkDataflowJob(
109-
getClass().getSimpleName(),
110-
null,
144+
getClass().getSimpleName() + "-Bulk",
111145
null,
146+
params,
112147
spannerResourceManager,
113148
gcsResourceManager,
114149
mySQLResourceManager,
115150
customTransformationBad);
116-
}
117-
118-
@After
119-
public void cleanUp() {
120-
ResourceManagerUtils.cleanResources(
121-
spannerResourceManager, mySQLResourceManager, gcsResourceManager);
122-
}
123151

124-
@Test
125-
public void testAllDataTypesCustomTransformationsBulkAndLive() throws IOException {
126-
// Wait for Bulk migration job to be in running state
152+
// Wait for bulk job
127153
assertThatPipeline(bulkJobInfo).isRunning();
128154

129155
PipelineOperator.Result result =
130156
pipelineOperator().waitUntilDone(createConfig(bulkJobInfo, Duration.ofMinutes(30)));
131157
assertThatResult(result).isLaunchFinished();
132158

133-
// Verify DLQ has 3 events
134-
ConditionCheck conditionCheck =
135-
// Check that there is at least 3 errors in DLQ
159+
// Verify DLQ Events
160+
// Total events expected:
161+
// - CT: 4 events (3 rows + 1 null row)
162+
// - SWF: 3 events (3 rows, bit_col too small)
163+
// - Authors: 191 events (name column too small)
164+
// - Books: 191 events (parent Authors failed)
165+
// Total: 4 + 3 + 191 + 191 = 389
166+
assertTrue(
136167
DlqEventsCountCheck.builder(gcsResourceManager, "output/dlq/severe/")
137-
.setMinEvents(4)
138-
.build();
139-
assertTrue(conditionCheck.get());
168+
.setMinEvents(389)
169+
.build()
170+
.get());
171+
172+
// --------------------------------------------------------------------------------------------
173+
// Phase 2: Live Migration (Retry)
174+
// --------------------------------------------------------------------------------------------
140175

141-
// Define Custom Transformation without Exception (Good)
176+
// Fix schemas before retry
177+
spannerResourceManager.executeDdlStatement(
178+
"ALTER TABLE `" + TABLE_SWF + "` ALTER COLUMN `bit_col` BYTES(MAX)");
179+
spannerResourceManager.executeDdlStatement(
180+
"ALTER TABLE `Authors` ALTER COLUMN `name` STRING(200)");
181+
182+
// Retry with Good Custom Transformation class
142183
CustomTransformation customTransformationGood =
143184
CustomTransformation.builder(
144185
"customTransformation.jar", "com.custom.CustomTransformationAllTypes")
145186
.build();
146187

147-
// launch forward migration template in retryDLQ mode
148-
retryLiveJobInfo =
188+
liveJobInfo =
149189
launchFwdDataflowJobInRetryDlqMode(
150190
spannerResourceManager,
151-
bulkErrorFolderFullPath,
152-
bulkErrorFolderFullPath + "/dlq",
191+
getGcsPath("output", gcsResourceManager),
192+
getGcsPath("output/dlq", gcsResourceManager),
153193
gcsResourceManager,
154194
customTransformationGood);
155195

156-
// Wait for Spanner to have all 3 rows
157-
conditionCheck =
196+
// Wait and Verify All Tables
197+
ConditionCheck conditionCheck =
158198
ChainedConditionCheck.builder(
159199
List.of(
160-
SpannerRowsCheck.builder(spannerResourceManager, TABLE_NAME)
200+
SpannerRowsCheck.builder(spannerResourceManager, TABLE_CT)
161201
.setMinRows(4)
162202
.setMaxRows(4)
203+
.build(),
204+
SpannerRowsCheck.builder(spannerResourceManager, TABLE_SWF)
205+
.setMinRows(3)
206+
.setMaxRows(3)
207+
.build(),
208+
SpannerRowsCheck.builder(spannerResourceManager, AUTHORS_TABLE)
209+
.setMinRows(200)
210+
.setMaxRows(200)
211+
.build(),
212+
SpannerRowsCheck.builder(spannerResourceManager, BOOKS_TABLE)
213+
.setMinRows(200)
214+
.setMaxRows(200)
163215
.build()))
164216
.build();
165217

166-
result =
167-
pipelineOperator()
168-
.waitForConditionAndCancel(
169-
createConfig(retryLiveJobInfo, Duration.ofMinutes(15)), conditionCheck);
170-
assertThatResult(result).meetsConditions();
218+
assertThatResult(
219+
pipelineOperator()
220+
.waitForConditionAndCancel(
221+
createConfig(liveJobInfo, Duration.ofMinutes(15)), conditionCheck))
222+
.meetsConditions();
171223

172-
// Verify Non null Data Content
224+
// Verify CT Data
173225
List<Map<String, Object>> expectedDataNonNull = getExpectedData();
174-
175-
List<com.google.cloud.spanner.Struct> allRecords =
176-
spannerResourceManager.runQuery("SELECT * FROM " + TABLE_NAME);
177-
178-
SpannerAsserts.assertThatStructs(allRecords)
226+
List<com.google.cloud.spanner.Struct> allRecordsCT =
227+
spannerResourceManager.runQuery("SELECT * FROM " + TABLE_CT);
228+
SpannerAsserts.assertThatStructs(allRecordsCT)
179229
.hasRecordsUnorderedCaseInsensitiveColumns(expectedDataNonNull);
230+
verifyNullRow(allRecordsCT);
180231

181-
// Manual assertion for the null row
182-
verifyNullRow(allRecords);
232+
// Verify SWF Data
233+
SpannerAsserts.assertThatStructs(spannerResourceManager.runQuery("SELECT * FROM " + TABLE_SWF))
234+
.hasRecordsUnorderedCaseInsensitiveColumns(expectedDataNonNull);
183235
}
184236

185237
private void verifyNullRow(List<com.google.cloud.spanner.Struct> structs) {
186-
// Iterate over all structs and verify the struct with id=4
187238
for (com.google.cloud.spanner.Struct struct : structs) {
188239
if (struct.getLong("id") == 4) {
189-
// Verify all columns except id are null
190240
for (com.google.cloud.spanner.Type.StructField field : struct.getType().getStructFields()) {
191241
if (field.getName().equalsIgnoreCase("id")) {
192242
continue;

0 commit comments

Comments
 (0)