Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -727,10 +727,23 @@ public static PipelineResult run(Options options) {
dlqManager.getReconsumerDataTransform(
pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
}
PCollectionTuple recoveredTuple =
applySevereRetryRecovery(
reconsumedElements.get(DeadLetterQueueManager.PERMANENT_ERRORS),
"EvaluateDlqRetry",
options.getDlqMaxRetryCount());

PCollection<FailsafeElement<String, String>> dlqJsonRecords =
reconsumedElements
.get(DeadLetterQueueManager.RETRYABLE_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollectionList.of(
reconsumedElements
.get(DeadLetterQueueManager.RETRYABLE_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))
.and(
recoveredTuple
.get(DatastreamToSpannerConstants.RETRYABLE_ERROR_TAG)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))
.apply(Flatten.pCollections());

if (isRegularMode) {
LOG.info("Regular Datastream flow");
PCollection<FailsafeElement<String, String>> datastreamJsonRecords =
Expand Down Expand Up @@ -859,8 +872,29 @@ public static PipelineResult run(Options options) {
// We will write only the original payload from the failsafe event to the DLQ. We are doing
// that in
// StringDeadLetterQueueSanitizer.
spannerWriteResults
.retryableErrors()

// 1. Gather all "fresh" permanent errors from Spanner/Transformer
PCollection<FailsafeElement<String, String>> processingFailures =
PCollectionList.of(spannerWriteResults.permanentErrors())
.and(transformedRecords.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))
.apply("FlattenProcessingFailures", Flatten.pCollections());

// 2. Apply "Severe Retry" logic to check for severe errors that need to be
// retried
PCollectionTuple recoveredLateTuple =
applySevereRetryRecovery(
processingFailures, "EvaluateProcessingFailureRetry", options.getDlqMaxRetryCount());

// 3. Merge "Rescued" errors with standard retryable errors
PCollection<FailsafeElement<String, String>> retryableErrors =
PCollectionList.of(spannerWriteResults.retryableErrors())
.and(
recoveredLateTuple
.get(DatastreamToSpannerConstants.RETRYABLE_ERROR_TAG)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))
.apply("Flatten Retryable Errors", Flatten.pCollections());

retryableErrors
.apply(
"DLQ: Write retryable Failures to GCS",
MapElements.via(new StringDeadLetterQueueSanitizer()))
Expand All @@ -872,16 +906,22 @@ public static PipelineResult run(Options options) {
.withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_retry/")
.setIncludePaneInfo(true)
.build());

PCollection<FailsafeElement<String, String>> dlqErrorRecords =
reconsumedElements
.get(DeadLetterQueueManager.PERMANENT_ERRORS)
recoveredTuple
.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
// TODO: Write errors from transformer and spanner writer into separate folders

// 4. Merge truly severe errors (fresh & DLQ-exhausted) for final severe bucket
// write
PCollection<FailsafeElement<String, String>> permanentErrors =
PCollectionList.of(dlqErrorRecords)
.and(spannerWriteResults.permanentErrors())
.and(transformedRecords.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))
.and(
recoveredLateTuple
.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))
.apply(Flatten.pCollections());

// increment the metrics
permanentErrors
.apply("Update metrics", ParDo.of(new MetricUpdaterDoFn(isRegularMode)))
Expand All @@ -900,6 +940,18 @@ public static PipelineResult run(Options options) {
return pipeline.run();
}

private static PCollectionTuple applySevereRetryRecovery(
PCollection<FailsafeElement<String, String>> input, String stepName, int maxRetries) {
return input
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.apply(
stepName,
ParDo.of(new SevereRetryRecoveryFn(maxRetries))
.withOutputTags(
DatastreamToSpannerConstants.RETRYABLE_ERROR_TAG,
TupleTagList.of(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG)));
}

static SpannerConfig getShadowTableSpannerConfig(Options options) {
// Validate shadow table Spanner config - both instance and database must be specified together
String shadowTableSpannerInstanceId = options.getShadowTableSpannerInstanceId();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.teleport.v2.templates.constants.DatastreamToSpannerConstants;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import java.io.IOException;
import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link DoFn} that checks if a severe (permanent) error should be retried. It intercepts
* permanent errors, checks the `_metadata_severe_retries` field, and routes them to either the
* RETRYABLE bucket (if retries remain) or the PERMANENT bucket (if exhausted).
*/
public class SevereRetryRecoveryFn
extends DoFn<FailsafeElement<String, String>, FailsafeElement<String, String>> {

private static final Logger LOG = LoggerFactory.getLogger(SevereRetryRecoveryFn.class);
private static final ObjectMapper mapper = new ObjectMapper();
private final int maxRetries;

public SevereRetryRecoveryFn(int maxRetries) {
this.maxRetries = maxRetries;
}

@ProcessElement
public void process(
@Element FailsafeElement<String, String> element, MultiOutputReceiver output) {
// If maxRetries is 0 (infinite), everything should have gone to RETRYABLE
// anyway.
// But if we are here, it means we are in Regular mode (maxRetries > 0)
// or handling edge cases.
try {
if (element.getOriginalPayload() == null) {
LOG.warn(
"Original payload is null. Cannot check for recovery. Routing to PERMANENT_ERROR_TAG.");
output.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG).output(element);
return;
}
JsonNode jsonDLQElement = mapper.readTree(element.getOriginalPayload());

// Check if this is a "Fresh" severe error vs a "Manual Retry"
if (!jsonDLQElement.has("_metadata_severe_retries")) {
// Initialize metadata for future manual retries
((ObjectNode) jsonDLQElement).put("_metadata_severe_retries", 0);

FailsafeElement<String, String> permanentElement =
FailsafeElement.of(jsonDLQElement.toString(), jsonDLQElement.toString());
permanentElement.setErrorMessage(element.getErrorMessage());
permanentElement.setStacktrace(element.getStacktrace());

output.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG).output(permanentElement);
return;
}

// intended for manual retries
int severeRetryCount = jsonDLQElement.get("_metadata_severe_retries").asInt();

if (severeRetryCount < maxRetries) {
LOG.info(
"Rescuing Error! Incrementing severe_retry_count to {}. Routing to RETRYABLE_ERROR_TAG.",
severeRetryCount + 1);

// Allow retry, increment severe retry count
((ObjectNode) jsonDLQElement).put("_metadata_severe_retries", severeRetryCount + 1);
FailsafeElement<String, String> updatedElement =
FailsafeElement.of(jsonDLQElement.toString(), jsonDLQElement.toString());

updatedElement.setErrorMessage(element.getErrorMessage());
updatedElement.setStacktrace(element.getStacktrace());

output.get(DatastreamToSpannerConstants.RETRYABLE_ERROR_TAG).output(updatedElement);
return;
}

// Both counts maxed out - truly permanent failure
LOG.info(
"Severe retry count exhausted ({} >= {}). Routing to PERMANENT_ERROR_TAG (Active Severe Bucket).",
severeRetryCount,
maxRetries);

// Reset severe retry count to 0 for potential future manual retries
((ObjectNode) jsonDLQElement).put("_metadata_severe_retries", 0);

FailsafeElement<String, String> permanentElement =
FailsafeElement.of(jsonDLQElement.toString(), jsonDLQElement.toString());
permanentElement.setErrorMessage(element.getErrorMessage());
permanentElement.setStacktrace(element.getStacktrace());
output.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG).output(permanentElement);
} catch (IOException e) {
LOG.error("Issue parsing JSON record in RecoveryFn. Unable to continue.", e);
output.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG).output(element);
}
}
}
Loading