diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java index 8e015bd9a8..5aebdf8edb 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java @@ -727,10 +727,23 @@ public static PipelineResult run(Options options) { dlqManager.getReconsumerDataTransform( pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes()))); } + PCollectionTuple recoveredTuple = + applySevereRetryRecovery( + reconsumedElements.get(DeadLetterQueueManager.PERMANENT_ERRORS), + "EvaluateDlqRetry", + options.getDlqMaxRetryCount()); + PCollection> dlqJsonRecords = - reconsumedElements - .get(DeadLetterQueueManager.RETRYABLE_ERRORS) - .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + PCollectionList.of( + reconsumedElements + .get(DeadLetterQueueManager.RETRYABLE_ERRORS) + .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))) + .and( + recoveredTuple + .get(DatastreamToSpannerConstants.RETRYABLE_ERROR_TAG) + .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))) + .apply(Flatten.pCollections()); + if (isRegularMode) { LOG.info("Regular Datastream flow"); PCollection> datastreamJsonRecords = @@ -859,8 +872,29 @@ public static PipelineResult run(Options options) { // We will write only the original payload from the failsafe event to the DLQ. We are doing // that in // StringDeadLetterQueueSanitizer. - spannerWriteResults - .retryableErrors() + + // 1. Gather all "fresh" permanent errors from Spanner/Transformer + PCollection> processingFailures = + PCollectionList.of(spannerWriteResults.permanentErrors()) + .and(transformedRecords.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG)) + .apply("FlattenProcessingFailures", Flatten.pCollections()); + + // 2. Apply "Severe Retry" logic to check for severe errors that need to be + // retried + PCollectionTuple recoveredLateTuple = + applySevereRetryRecovery( + processingFailures, "EvaluateProcessingFailureRetry", options.getDlqMaxRetryCount()); + + // 3. Merge "Rescued" errors with standard retryable errors + PCollection> retryableErrors = + PCollectionList.of(spannerWriteResults.retryableErrors()) + .and( + recoveredLateTuple + .get(DatastreamToSpannerConstants.RETRYABLE_ERROR_TAG) + .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))) + .apply("Flatten Retryable Errors", Flatten.pCollections()); + + retryableErrors .apply( "DLQ: Write retryable Failures to GCS", MapElements.via(new StringDeadLetterQueueSanitizer())) @@ -872,16 +906,22 @@ public static PipelineResult run(Options options) { .withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_retry/") .setIncludePaneInfo(true) .build()); + PCollection> dlqErrorRecords = - reconsumedElements - .get(DeadLetterQueueManager.PERMANENT_ERRORS) + recoveredTuple + .get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG) .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - // TODO: Write errors from transformer and spanner writer into separate folders + + // 4. Merge truly severe errors (fresh & DLQ-exhausted) for final severe bucket + // write PCollection> permanentErrors = PCollectionList.of(dlqErrorRecords) - .and(spannerWriteResults.permanentErrors()) - .and(transformedRecords.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG)) + .and( + recoveredLateTuple + .get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG) + .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))) .apply(Flatten.pCollections()); + // increment the metrics permanentErrors .apply("Update metrics", ParDo.of(new MetricUpdaterDoFn(isRegularMode))) @@ -900,6 +940,18 @@ public static PipelineResult run(Options options) { return pipeline.run(); } + private static PCollectionTuple applySevereRetryRecovery( + PCollection> input, String stepName, int maxRetries) { + return input + .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) + .apply( + stepName, + ParDo.of(new SevereRetryRecoveryFn(maxRetries)) + .withOutputTags( + DatastreamToSpannerConstants.RETRYABLE_ERROR_TAG, + TupleTagList.of(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))); + } + static SpannerConfig getShadowTableSpannerConfig(Options options) { // Validate shadow table Spanner config - both instance and database must be specified together String shadowTableSpannerInstanceId = options.getShadowTableSpannerInstanceId(); diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SevereRetryRecoveryFn.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SevereRetryRecoveryFn.java new file mode 100644 index 0000000000..cdf13e1740 --- /dev/null +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SevereRetryRecoveryFn.java @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.cloud.teleport.v2.templates.constants.DatastreamToSpannerConstants; +import com.google.cloud.teleport.v2.values.FailsafeElement; +import java.io.IOException; +import org.apache.beam.sdk.transforms.DoFn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link DoFn} that checks if a severe (permanent) error should be retried. It intercepts + * permanent errors, checks the `_metadata_severe_retries` field, and routes them to either the + * RETRYABLE bucket (if retries remain) or the PERMANENT bucket (if exhausted). + */ +public class SevereRetryRecoveryFn + extends DoFn, FailsafeElement> { + + private static final Logger LOG = LoggerFactory.getLogger(SevereRetryRecoveryFn.class); + private static final ObjectMapper mapper = new ObjectMapper(); + private final int maxRetries; + + public SevereRetryRecoveryFn(int maxRetries) { + this.maxRetries = maxRetries; + } + + @ProcessElement + public void process( + @Element FailsafeElement element, MultiOutputReceiver output) { + // If maxRetries is 0 (infinite), everything should have gone to RETRYABLE + // anyway. + // But if we are here, it means we are in Regular mode (maxRetries > 0) + // or handling edge cases. + try { + if (element.getOriginalPayload() == null) { + LOG.warn( + "Original payload is null. Cannot check for recovery. Routing to PERMANENT_ERROR_TAG."); + output.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG).output(element); + return; + } + JsonNode jsonDLQElement = mapper.readTree(element.getOriginalPayload()); + + // Check if this is a "Fresh" severe error vs a "Manual Retry" + if (!jsonDLQElement.has("_metadata_severe_retries")) { + // Initialize metadata for future manual retries + ((ObjectNode) jsonDLQElement).put("_metadata_severe_retries", 0); + + FailsafeElement permanentElement = + FailsafeElement.of(jsonDLQElement.toString(), jsonDLQElement.toString()); + permanentElement.setErrorMessage(element.getErrorMessage()); + permanentElement.setStacktrace(element.getStacktrace()); + + output.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG).output(permanentElement); + return; + } + + // intended for manual retries + int severeRetryCount = jsonDLQElement.get("_metadata_severe_retries").asInt(); + + if (severeRetryCount < maxRetries) { + LOG.info( + "Rescuing Error! Incrementing severe_retry_count to {}. Routing to RETRYABLE_ERROR_TAG.", + severeRetryCount + 1); + + // Allow retry, increment severe retry count + ((ObjectNode) jsonDLQElement).put("_metadata_severe_retries", severeRetryCount + 1); + FailsafeElement updatedElement = + FailsafeElement.of(jsonDLQElement.toString(), jsonDLQElement.toString()); + + updatedElement.setErrorMessage(element.getErrorMessage()); + updatedElement.setStacktrace(element.getStacktrace()); + + output.get(DatastreamToSpannerConstants.RETRYABLE_ERROR_TAG).output(updatedElement); + return; + } + + // Both counts maxed out - truly permanent failure + LOG.info( + "Severe retry count exhausted ({} >= {}). Routing to PERMANENT_ERROR_TAG (Active Severe Bucket).", + severeRetryCount, + maxRetries); + + // Reset severe retry count to 0 for potential future manual retries + ((ObjectNode) jsonDLQElement).put("_metadata_severe_retries", 0); + + FailsafeElement permanentElement = + FailsafeElement.of(jsonDLQElement.toString(), jsonDLQElement.toString()); + permanentElement.setErrorMessage(element.getErrorMessage()); + permanentElement.setStacktrace(element.getStacktrace()); + output.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG).output(permanentElement); + } catch (IOException e) { + LOG.error("Issue parsing JSON record in RecoveryFn. Unable to continue.", e); + output.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG).output(element); + } + } +}