Skip to content

Fix the FileBasedDeadLetterQueueReconsumer dup issue#2476

Closed
liferoad wants to merge 2 commits intoGoogleCloudPlatform:mainfrom
liferoad:fix-dlq-dup
Closed

Fix the FileBasedDeadLetterQueueReconsumer dup issue#2476
liferoad wants to merge 2 commits intoGoogleCloudPlatform:mainfrom
liferoad:fix-dlq-dup

Conversation

@liferoad
Copy link
Copy Markdown
Contributor

@liferoad liferoad commented Jun 22, 2025

Fixes #2236

This PR changes the process from a "match -> read -> delete" sequence to a "match -> atomic rename -> read -> delete" sequence.

MoveFiles:

A new MoveFiles step is introduced, which renames each matched dead-letter file by adding a tmp- prefix to its name. his atomic "move" operation effectively claims the file, ensuring that subsequent scans won't find the original file and attempt to process it again.

After the files are moved, a Reshuffle transform is applied to guarantee that the rename operation is completed.

@codecov
Copy link
Copy Markdown

codecov bot commented Jun 22, 2025

Codecov Report

Attention: Patch coverage is 88.46154% with 3 lines in your changes missing coverage. Please review.

Project coverage is 49.62%. Comparing base (43be033) to head (d893e98).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...v2/cdc/dlq/FileBasedDeadLetterQueueReconsumer.java 88.46% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##               main    #2476    +/-   ##
==========================================
  Coverage     49.62%   49.62%            
- Complexity     4808     5157   +349     
==========================================
  Files           941      941            
  Lines         57661    57675    +14     
  Branches       6233     6233            
==========================================
+ Hits          28614    28623     +9     
- Misses        27007    27010     +3     
- Partials       2040     2042     +2     
Components Coverage Δ
spanner-templates 69.94% <ø> (-0.01%) ⬇️
spanner-import-export 68.61% <ø> (-0.03%) ⬇️
spanner-live-forward-migration 78.77% <ø> (ø)
spanner-live-reverse-replication 77.36% <ø> (ø)
spanner-bulk-migration 87.89% <ø> (ø)
Files with missing lines Coverage Δ
...v2/cdc/dlq/FileBasedDeadLetterQueueReconsumer.java 72.54% <88.46%> (+4.36%) ⬆️

... and 2 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@liferoad liferoad marked this pull request as ready for review July 12, 2025 19:54
@liferoad liferoad requested a review from damccorm July 12, 2025 19:54

PCollection<ResourceId> movedFiles =
input
.apply("MoveFiles", ParDo.of(new MoveFiles()))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change introduces the possibility of data loss which is worse. Specifically, we could run into the following scenario:

  1. Bundle x contains file-1, file-2, file-3
  2. file-1 is successfully renamed to tmp-file-1, but the bundle fails on renaming file-2

file-1 is now orphaned as tmp-file-1. I guess the way it is written, we would actually get recurrent errors which is a little better, but still not ideal.

Instead, I'd propose the following tweak:

  1. We get a timestamp FOO associated with each generate sequence firing. We already get this for free, we'd just need to propogate it through
  2. Instead of renaming the file tmp-<file name>, we rename it tmp-FOO-<file name>
  3. Instead of only looking for <file name> when doing the rename operation, we look for <file name> and rename it OR if <file name> is not present, we look for tmp-FOO-<file name> and use that instead. If tmp-FOO-<file name> and <file name> are both not present, we log and move on, assuming that something else has claimed the file.

So the algorithm would be:

TriggerConsumeDLQ, AsFilePattern, MatchFiles, <new extract_ts function> -> (ts, file) tuples
in MoveFiles:
   renamed_file_name = "tmp-${ts}-${original_file_name}"
   if exists(renamed_file_name):
      return
   if !exists(original_file_name):
      log.warning('skipping, handled by different pass')
   mv(original_file_name, renamed_file_name)

Then the rest of the logic would stay the same.

results
.get(fileMetadata)
.setCoder(MetadataCoder.of())
.setCoder(ResourceIdCoder.of())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is inevitable, but we are breaking update compatibility. Maybe worth calling out in the PR title so that it gets into the resource notes.

@liferoad liferoad closed this Oct 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: FileBasedDeadLetterQueueReconsumer could result in duplicates

2 participants