Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MetadataCoder;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.fs.ResourceIdCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -92,28 +93,39 @@ public PCollection<String> expand(PBegin in) {
.apply("TriggerConsumeDLQ", GenerateSequence.from(0).withRate(1, this.recheckPeriod))
.apply("AsFilePattern", MapElements.into(TypeDescriptors.strings()).via(seq -> filePattern))
.apply("MatchFiles", FileIO.matchAll())
.apply("ConsumeMatches", moveAndConsumeMatches());
.apply("MoveAndConsumeMatches", moveAndConsumeMatches());
}

/** Build a {@link PTransform} that consumes matched DLQ files. */
/**
* Build a {@link PTransform} that consumes matched DLQ files.
*
* <p>This transform is not idempotent. It should only be used when the source is guaranteed to
* provide each file only once.
*/
static PTransform<PCollection<Metadata>, PCollection<String>> moveAndConsumeMatches() {
return new PTransform<PCollection<Metadata>, PCollection<String>>() {
@Override
public PCollection<String> expand(PCollection<Metadata> input) {
// TODO(pabloem, dhercher): Use a Beam standard transform once possible
// TODO(pabloem, dhercher): Add a _metadata attribute to track whether a row comes from DLQ.
TupleTag<String> fileContents = new TupleTag<String>();
TupleTag<Metadata> fileMetadata = new TupleTag<Metadata>();
TupleTag<ResourceId> fileMetadata = new TupleTag<ResourceId>();

PCollection<ResourceId> movedFiles =
input
.apply("MoveFiles", ParDo.of(new MoveFiles()))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change introduces the possibility of data loss which is worse. Specifically, we could run into the following scenario:

  1. Bundle x contains file-1, file-2, file-3
  2. file-1 is successfully renamed to tmp-file-1, but the bundle fails on renaming file-2

file-1 is now orphaned as tmp-file-1. I guess the way it is written, we would actually get recurrent errors which is a little better, but still not ideal.

Instead, I'd propose the following tweak:

  1. We get a timestamp FOO associated with each generate sequence firing. We already get this for free, we'd just need to propogate it through
  2. Instead of renaming the file tmp-<file name>, we rename it tmp-FOO-<file name>
  3. Instead of only looking for <file name> when doing the rename operation, we look for <file name> and rename it OR if <file name> is not present, we look for tmp-FOO-<file name> and use that instead. If tmp-FOO-<file name> and <file name> are both not present, we log and move on, assuming that something else has claimed the file.

So the algorithm would be:

TriggerConsumeDLQ, AsFilePattern, MatchFiles, <new extract_ts function> -> (ts, file) tuples
in MoveFiles:
   renamed_file_name = "tmp-${ts}-${original_file_name}"
   if exists(renamed_file_name):
      return
   if !exists(original_file_name):
      log.warning('skipping, handled by different pass')
   mv(original_file_name, renamed_file_name)

Then the rest of the logic would stay the same.

.setCoder(ResourceIdCoder.of())
.apply("ReshuffleMovedFiles", Reshuffle.viaRandomKey());

PCollectionTuple results =
input.apply(
movedFiles.apply(
"ConsumeFiles",
ParDo.of(new MoveAndConsumeFn(fileContents, fileMetadata))
.withOutputTags(fileContents, TupleTagList.of(fileMetadata)));

results
.get(fileMetadata)
.setCoder(MetadataCoder.of())
.setCoder(ResourceIdCoder.of())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is inevitable, but we are breaking update compatibility. Maybe worth calling out in the PR title so that it gets into the resource notes.

.apply("ReshuffleFiles", Reshuffle.viaRandomKey())
.apply(ParDo.of(new RemoveFiles()));
.apply("RemoveFiles", ParDo.of(new RemoveFiles()));

return results
.get(fileContents)
Expand All @@ -123,14 +135,32 @@ public PCollection<String> expand(PCollection<Metadata> input) {
};
}

private static class RemoveFiles extends DoFn<Metadata, Void> {
private static class MoveFiles extends DoFn<Metadata, ResourceId> {
@ProcessElement
public void process(@Element Metadata dlqFile, OutputReceiver<ResourceId> output)
throws IOException {
ResourceId tmpFile =
dlqFile
.resourceId()
.getCurrentDirectory()
.resolve(
"tmp-" + dlqFile.resourceId().getFilename(), StandardResolveOptions.RESOLVE_FILE);
LOG.info("Moving DLQ File {} to {}", dlqFile.resourceId().toString(), tmpFile.toString());
FileSystems.rename(
Collections.singletonList(dlqFile.resourceId()), Collections.singletonList(tmpFile));
output.output(tmpFile);
}
}

private static class RemoveFiles extends DoFn<ResourceId, Void> {
private final List<ResourceId> filesToRemove = new ArrayList<>();
private final Counter failedDeletions =
Metrics.counter(MoveAndConsumeFn.class, "failedDeletions");

@ProcessElement
public void process(@Element Metadata dlqFile, MultiOutputReceiver outputs) throws IOException {
this.filesToRemove.add(dlqFile.resourceId());
public void process(@Element ResourceId dlqFile, MultiOutputReceiver outputs)
throws IOException {
this.filesToRemove.add(dlqFile);
}

@FinishBundle
Expand All @@ -148,16 +178,15 @@ public void cleanupFiles() {
}
}

// TODO(pabloem): Switch over to use FileIO after BEAM-10246
private static class MoveAndConsumeFn extends DoFn<Metadata, String> {
private static class MoveAndConsumeFn extends DoFn<ResourceId, String> {

private final Counter reconsumedElements =
Metrics.counter(MoveAndConsumeFn.class, "elementsReconsumedFromDeadLetterQueue");

private final TupleTag<Metadata> filesTag;
private final TupleTag<ResourceId> filesTag;
private final TupleTag<String> contentTag;

MoveAndConsumeFn(TupleTag<String> contentTag, TupleTag<Metadata> filesTag) {
MoveAndConsumeFn(TupleTag<String> contentTag, TupleTag<ResourceId> filesTag) {
this.filesTag = filesTag;
this.contentTag = contentTag;
}
Expand All @@ -175,18 +204,19 @@ long getRetryCountForRecord(ObjectNode resultNode) {
}

@ProcessElement
public void process(@Element Metadata dlqFile, MultiOutputReceiver outputs) throws IOException {
LOG.info("Found DLQ File: {}", dlqFile.resourceId().toString());
if (dlqFile.resourceId().toString().contains("/tmp/.temp")) {
public void process(@Element ResourceId dlqFile, MultiOutputReceiver outputs)
throws IOException {
LOG.info("Found DLQ File: {}", dlqFile.toString());
if (dlqFile.toString().contains("/tmp/.temp")) {
return;
}

BufferedReader jsonReader;
try {
jsonReader = readFile(dlqFile.resourceId());
jsonReader = readFile(dlqFile);
} catch (FileNotFoundException e) {
// If the file does exist, it will be retried on the next trigger.
LOG.warn("DLQ File Not Found: {}", dlqFile.resourceId().toString());
LOG.warn("DLQ File Not Found: {}", dlqFile.toString());
return;
}

Expand Down
Loading