Skip to content

Conversation

@aasthabharill
Copy link
Member

@aasthabharill aasthabharill commented Dec 29, 2025

Documentation Change: GoogleCloudPlatform/spanner-migration-tool#1255

Implementation Details

A. The Recovery Function (SevereRetryRecoveryFn)

Acts as a gatekeeper for all errors destined for the Severe bucket.
Input: FailsafeElement tagged as PERMANENT.

Logic:

  1. Freshness Check: If _metadata_severe_retries is missing, initialize it to 0.
  • Why: By stamping it with 0 now, we ensure that if a user manually retries this file later, the pipeline knows it's a "candidate for severe retry" rather than a brand new error.
  1. Retry Check: If _metadata_severe_retries < maxRetries:
  • Increment _metadata_severe_retries.
  • Output to RETRYABLE tag ("Rescued").
  • Note: The increment happens before routing, so the item enters the retry queue with count 1.
  1. Exhaustion: If counter >= maxRetries:
  • Reset _metadata_severe_retries to 0.
  • Why: Just like the freshness check, resetting to 0 allows the user to restart the manual retry cycle if they choose to do so again.
  • Output to PERMANENT tag (Graveyard).

B. Pipeline Graph Changes

Specific steps were added to intercept and process permanent errors from multiple sources.

1. FlattenProcessingFailures (Flatten)

  • What: Merges two error streams:
    • spannerWriteResults.permanentErrors(): Failures from Spanner itself.
    • transformedRecords.get(PERMANENT_ERROR_TAG): Failures from the transformer.
  • Why: Creates a single logical stream of "all failures" to apply the severe retry logic uniformly.

2. EvaluateProcessingFailureRetry (ParDo)

  • What: Applies the SevereRetryRecoveryFn to the processingFailures stream.
  • Why: This is the "Freshness Check". It ensures that every permanent error is checked for metadata. Fresh errors get tagged with 0 and sent to Severe, preventing them from bypassing the safety net.

3. EvaluateDlqRetry (ParDo)

  • What: Applies the SevereRetryRecoveryFn to the reconsumedElements stream (errors coming from the DLQ).
  • Why: Intercepts errors that were already in the DLQ and are being retried.

4. Flatten Retryable Errors (Flatten)

  • What: Merges standard retryable errors with "Rescued" severe errors.
  • Why: "Rescued" errors are merged into the main retry pipeline to be written to the
    retry/ GCS bucket.

Potential Regressions & Risks

  1. Metrics discrepancy: Metrics and counters for "Rescued" errors are not fully accounted for yet.
  2. Log noise: Every time a severe error is "rescued", it might log "A severe error occurred..." before being successfully rescued (i.e. its actually a retriable error at the time being), which could be confusing.
  3. Metadata Overhead: The JSON payloads now carry an extra integer field _metadata_severe_retries.
  4. Pipeline Graph Change: The DAG structure has changed (extra Flatten and ParDo steps).

Important Callout: The _metadata_retry_count keeps incrementing on each retry. So if a retriable error has gone through: retry -> severe -> retry -> severe, it's _metadata_retry_count = 11 if maxRetryCount was 5

Design Decision: Template vs. Common Library

It was explicitly chosen NOT to implement this in the shared DeadLetterQueueManager.java

  • Reason: DeadLetterQueueManager is a shared component used by multiple Dataflow templates (BigQuery, JDBC, MongoDB, etc.).
  • Risk: Modifying the common retry logic to support "Severe Retries" could introduce unexpected behavior or regressions in other stable templates that do not require this feature.
  • Isolation: By implementing this logic entirely within DataStreamToSpanner.java, we strictly gate this behavior to this specific template/use-case, ensuring zero impact on the broader ecosystem.

Testing

Scenario 1: Fresh Severe Error (Direct to Severe)

Condition: A unique constraint violation (ALREADY_EXISTS) occurs immediately.
Behavior:

  • Logic detects missing _metadata_severe_retries.
  • Sets _metadata_severe_retries = 0.
  • Routes DIRECTLY to Severe Bucket.

DLQ Entry:

{"message":{"ddrkey":14,"userId":400,"nucUserId":4000,"nucPersId":40000,"time":1679145600,"_metadata_stream":"projects/443856856628/locations/us-central1/streams/aastha-severe-retry3-shard1-mysql-stream","_metadata_timestamp":1767185793,"_metadata_read_timestamp":1767185794,"_metadata_dataflow_timestamp":1767185801,"_metadata_read_method":"mysql-cdc-binlog","_metadata_source_type":"mysql","_metadata_deleted":false,"_metadata_table":"ut_bidtokens","_metadata_change_type":"INSERT","_metadata_primary_keys":["ddrkey","userId"],"_metadata_uuid":"5ca45833-ebcb-4359-ba74-727168c353fa","_metadata_schema":"final_test00","_metadata_log_file":"mysql-bin.001884","_metadata_log_position":"16541","_metadata_source":{"table":"ut_bidtokens","database":"final_test00","primary_keys":["ddrkey","userId"],"log_file":"mysql-bin.001884","log_position":16541,"change_type":"INSERT","is_deleted":false},"_metadata_severe_retries":0},"error_message":"ALREADY_EXISTS: io.grpc.StatusRuntimeException: ALREADY_EXISTS: Unique index violation on index ut_bidtokens_userId at index key [400,4]. It conflicts with row [4,400] in table ut_bidtokens."}

Scenario 2: Manual Retry Loop (Severe -> Retry -> Severe)

Condition: User manually moves the file from Scenario 1 into the retry/bucket. The error is NOT resolved.
Behavior:

  • Pipeline consumes file (_metadata_severe_retries=0).
  • Validation: 0 < maxDlqRetries (e.g. 10).
  • Action: Increments _metadata_severe_retries to 1. Routes to Retry Bucket ("Rescued").
  • The above keeps repeating till maxDlqRetries is met and then the error is shifted back to severe bucket with _metadata_severe_retries reset to 0.
  • Note that _metadata_retry_count does get incremented with each retry (so becomes 11 in this case when its written back)

DLQ Entry (when it's written back to severe):

{"message":{"ddrkey":15,"userId":500,"nucUserId":5000,"nucPersId":50000,"time":1679232000,"_metadata_stream":"projects/443856856628/locations/us-central1/streams/aastha-severe-retry3-shard1-mysql-stream","_metadata_timestamp":1767185793,"_metadata_read_timestamp":1767185794,"_metadata_dataflow_timestamp":1767185801,"_metadata_read_method":"mysql-cdc-binlog","_metadata_source_type":"mysql","_metadata_deleted":false,"_metadata_table":"ut_bidtokens","_metadata_change_type":"INSERT","_metadata_primary_keys":["ddrkey","userId"],"_metadata_uuid":"4466cc0f-2384-4118-b21e-fcb36d542b71","_metadata_schema":"final_test00","_metadata_log_file":"mysql-bin.001884","_metadata_log_position":"16541","_metadata_source":{"table":"ut_bidtokens","database":"final_test00","primary_keys":["ddrkey","userId"],"log_file":"mysql-bin.001884","log_position":16541,"change_type":"INSERT","is_deleted":false},"_metadata_severe_retries":0,"_metadata_error":"ALREADY_EXISTS: io.grpc.StatusRuntimeException: ALREADY_EXISTS: Unique index violation on index ut_bidtokens_userId at index key [500,5]. It conflicts with row [5,500] in table ut_bidtokens.","_metadata_retry_count":11},"error_message":"ALREADY_EXISTS: io.grpc.StatusRuntimeException: ALREADY_EXISTS: Unique index violation on index ut_bidtokens_userId at index key [500,5]. It conflicts with row [5,500] in table ut_bidtokens."}

Scenario 3: "Fresh" severe error is moved to retry bucket after resolving issue (Severe -> Retry)

Condition: User manually moves the file from Scenario 1 into the retry/bucket after the error is resolved.
Behavior:

  • Pipeline consumes file (_metadata_severe_retries=0).
  • Validation: 0 < maxDlqRetries (e.g. 10).
  • Action: Increments _metadata_severe_retries to 1. Routes to Retry Bucket ("Rescued").
  • Since error was resolved, query is written to Spanner and error file is deleted.

Scenario 4: Retriable Error that got exhausted and moved to severe (Retry -> Severe)

Condition: INTERLEAVE IN PARENT violation by orphan child - Error that is first written to retry bucket and is retried dlqMaxRetryCount (10 in this case) times but keeps failing
Behavior:

  • Pipeline consumes file and retries maxDlqRetries times as _metadata_retry_count < maxDlqRetries
  • File is moved to severe bucket when _metadata_severe_retries = 11
  • _metadata_severe_retries is set to 0 before file is written in severe bucket

DLQ Entry (when it's written to severe):

{"message":{"ddrkey":555,"userId":789,"itemId":1001,"added_col_char":"test","_metadata_stream":"projects/443856856628/locations/us-central1/streams/aastha-severe-retry3-shard1-mysql-stream","_metadata_timestamp":1767183215,"_metadata_read_timestamp":1767183219,"_metadata_dataflow_timestamp":1767183232,"_metadata_read_method":"mysql-cdc-binlog","_metadata_source_type":"mysql","_metadata_deleted":false,"_metadata_table":"ut_academy_tradeable_player","_metadata_change_type":"INSERT","_metadata_primary_keys":["ddrkey","userId","itemId"],"_metadata_uuid":"ff97ce77-c834-4c86-acab-dd2d19418c85","_metadata_schema":"final_test00","_metadata_log_file":"mysql-bin.001884","_metadata_log_position":"15032","_metadata_source":{"table":"ut_academy_tradeable_player","database":"final_test00","primary_keys":["ddrkey","userId","itemId"],"log_file":"mysql-bin.001884","log_position":15032,"change_type":"INSERT","is_deleted":false},"_metadata_error":"NOT_FOUND: io.grpc.StatusRuntimeException: NOT_FOUND: Parent row for row [555,789,1001] in table ut_academy_tradeable_player is missing. Row cannot be written.","_metadata_retry_count":11,"_metadata_severe_retries":0},"error_message":"NOT_FOUND: io.grpc.StatusRuntimeException: NOT_FOUND: Parent row for row [555,789,1001] in table ut_academy_tradeable_player is missing. Row cannot be written."}

Scenario 5: Cyclic Severe Exhaustion (Retry -> Severe -> Retry -> Severe)

Condition: User keeps moving the above file back to Retry, but error persists (5 times).
Behavior:

  • Error is retried on the basis of _metadata_severe_retries value now, which keeps retrying till its <= dlqMaxRetryCount or success.
  • Everytime its written back to severe bucket _metadata_severe_retries is reset but _metadata_severe_retries keeps increasing

@gemini-code-assist
Copy link

Summary of Changes

Hello @aasthabharill, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a temporary fix to refine the Dead Letter Queue (DLQ) reconsumption mechanism within the DataStreamToSpanner and SpannerToSourceDb templates. The core change involves dynamically adjusting the set of DLQ paths to ignore based on the pipeline's execution mode ('regular' or 'retryDlq'). This ensures that the Pub/Sub consumer correctly filters and processes messages, routing them appropriately for reconsumption without unintended interference from other error directories.

Highlights

  • Dynamic DLQ Path Filtering: The logic for determining which Dead Letter Queue (DLQ) paths to ignore during reconsumption from Pub/Sub has been made dynamic based on the pipeline's run mode.
  • Run Mode Specific Filtering: Depending on whether the pipeline is executing in 'regular' or 'retryDlq' mode, the system now dynamically adds either '/severe/' or '/retry/' to the list of file paths to ignore when processing messages from the DLQ.
  • Improved PubSubNotifiedDlqIO Usage: The PubSubNotifiedDlqIO constructor now receives a List<String> for filePathsToIgnore that is constructed with conditional logic, ensuring the correct set of paths is ignored for the specific run mode.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@codecov
Copy link

codecov bot commented Dec 29, 2025

Codecov Report

❌ Patch coverage is 0% with 78 lines in your changes missing coverage. Please review.
✅ Project coverage is 54.61%. Comparing base (9e360c9) to head (72a0810).
⚠️ Report is 17 commits behind head on main.

Files with missing lines Patch % Lines
...d/teleport/v2/templates/SevereRetryRecoveryFn.java 0.00% 43 Missing ⚠️
...oud/teleport/v2/templates/DataStreamToSpanner.java 0.00% 35 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #3163      +/-   ##
============================================
+ Coverage     50.79%   54.61%   +3.81%     
+ Complexity     5116     2234    -2882     
============================================
  Files           976      491     -485     
  Lines         59994    28440   -31554     
  Branches       6555     2982    -3573     
============================================
- Hits          30476    15533   -14943     
+ Misses        27380    11965   -15415     
+ Partials       2138      942    -1196     
Components Coverage Δ
spanner-templates 71.82% <0.00%> (+0.82%) ⬆️
spanner-import-export ∅ <ø> (∅)
spanner-live-forward-migration 79.00% <0.00%> (-1.04%) ⬇️
spanner-live-reverse-replication 77.72% <ø> (+0.03%) ⬆️
spanner-bulk-migration 88.47% <ø> (-0.02%) ⬇️
Files with missing lines Coverage Δ
...oud/teleport/v2/templates/DataStreamToSpanner.java 17.20% <0.00%> (-1.92%) ⬇️
...d/teleport/v2/templates/SevereRetryRecoveryFn.java 0.00% <0.00%> (ø)

... and 505 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@pull-request-size pull-request-size bot added size/L and removed size/M labels Dec 31, 2025
@aasthabharill aasthabharill changed the title PubSub consumer should be used in retryDlq mode if dlqGcsPubSubSubscription is passed Consuming severe errors in regular mode using rerouting logic Jan 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant