diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index 8d6e52c64a52..4d65fbbd5e93 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -26,12 +26,11 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicReference; -import org.apache.beam.sdk.io.FileSystem.LineageLevel; +import java.util.stream.Collectors; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; @@ -318,35 +317,10 @@ public final List> split( } } - /** - * Report source Lineage. Due to the size limit of Beam metrics, report full file name or only dir - * depend on the number of files. - * - *

- Number of files<=100, report full file paths; - * - *

- Number of directory<=100, report directory names (one level up); - * - *

- Otherwise, report top level only. - */ private static void reportSourceLineage(List expandedFiles) { - if (expandedFiles.size() <= 100) { - for (Metadata metadata : expandedFiles) { - FileSystems.reportSourceLineage(metadata.resourceId()); - } - } else { - HashSet uniqueDirs = new HashSet<>(); - for (Metadata metadata : expandedFiles) { - ResourceId dir = metadata.resourceId().getCurrentDirectory(); - uniqueDirs.add(dir); - if (uniqueDirs.size() > 100) { - FileSystems.reportSourceLineage(dir, LineageLevel.TOP_LEVEL); - return; - } - } - for (ResourceId uniqueDir : uniqueDirs) { - FileSystems.reportSourceLineage(uniqueDir); - } - } + List resourceIds = + expandedFiles.stream().map(Metadata::resourceId).collect(Collectors.toList()); + FileSystems.reportSourceLineage(resourceIds); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 7e2940a2c35b..d9d6de8c91cb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -398,6 +399,38 @@ public ResourceId apply(@Nonnull Metadata input) { .delete(resourceIdsToDelete); } + /** + * Report source {@link Lineage} metrics for multiple resource ids. Due to the size limit of Beam + * metrics, report full file name or only dir depend on the number of files. + * + *

- Number of files<=100, report full file paths; + * + *

- Number of directory<=100, report directory names (one level up); + * + *

- Otherwise, report top level only. + */ + public static void reportSourceLineage(List resourceIds) { + final int MAX_LINEAGE_TARGETS = 100; + if (resourceIds.size() <= MAX_LINEAGE_TARGETS) { + for (ResourceId resourceId : resourceIds) { + FileSystems.reportSourceLineage(resourceId); + } + } else { + HashSet uniqueDirs = new HashSet<>(); + for (ResourceId resourceId : resourceIds) { + ResourceId dir = resourceId.getCurrentDirectory(); + uniqueDirs.add(dir); + if (uniqueDirs.size() > MAX_LINEAGE_TARGETS) { + FileSystems.reportSourceLineage(dir, LineageLevel.TOP_LEVEL); + return; + } + } + for (ResourceId uniqueDir : uniqueDirs) { + FileSystems.reportSourceLineage(uniqueDir); + } + } + } + /** Report source {@link Lineage} metrics for resource id. */ public static void reportSourceLineage(ResourceId resourceId) { reportSourceLineage(resourceId, LineageLevel.FILE); diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java index 155bf2d4a77f..e5a0f893eadd 100644 --- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java +++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java @@ -52,6 +52,8 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; import org.apache.beam.sdk.io.hadoop.WritableCoder; import org.apache.beam.sdk.options.PipelineOptions; @@ -97,6 +99,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; @@ -725,6 +728,7 @@ public List>> split(long desiredBundleSizeBytes, Pipeline return ImmutableList.of(this); } computeSplitsIfNecessary(); + reportSourceLineage(inputSplits); LOG.info( "Generated {} splits. Size of first split is {} ", inputSplits.size(), @@ -744,6 +748,19 @@ public List>> split(long desiredBundleSizeBytes, Pipeline .collect(Collectors.toList()); } + /** Report only file-based sources */ + private void reportSourceLineage(final List inputSplits) { + List fileResources = + inputSplits.stream() + .map(SerializableSplit::getSplit) + .filter(FileSplit.class::isInstance) + .map(FileSplit.class::cast) + .map(fileSplit -> FileSystems.matchNewResource(fileSplit.getPath().toString(), false)) + .collect(Collectors.toList()); + + FileSystems.reportSourceLineage(fileResources); + } + @Override public long getEstimatedSizeBytes(PipelineOptions po) throws Exception { if (inputSplit == null) {