Skip to content

Commit 1938428

Browse files
committed
Extract shared code into FileSystems public API
1 parent b4fe85e commit 1938428

File tree

3 files changed

+38
-49
lines changed

3 files changed

+38
-49
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,11 @@
2626
import java.nio.channels.ReadableByteChannel;
2727
import java.nio.channels.SeekableByteChannel;
2828
import java.util.ArrayList;
29-
import java.util.HashSet;
3029
import java.util.List;
3130
import java.util.ListIterator;
3231
import java.util.NoSuchElementException;
3332
import java.util.concurrent.atomic.AtomicReference;
34-
import org.apache.beam.sdk.io.FileSystem.LineageLevel;
33+
import java.util.stream.Collectors;
3534
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
3635
import org.apache.beam.sdk.io.fs.MatchResult;
3736
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
@@ -318,35 +317,10 @@ public final List<? extends FileBasedSource<T>> split(
318317
}
319318
}
320319

321-
/**
322-
* Report source Lineage. Due to the size limit of Beam metrics, report full file name or only dir
323-
* depend on the number of files.
324-
*
325-
* <p>- Number of files<=100, report full file paths;
326-
*
327-
* <p>- Number of directory<=100, report directory names (one level up);
328-
*
329-
* <p>- Otherwise, report top level only.
330-
*/
331320
private static void reportSourceLineage(List<Metadata> expandedFiles) {
332-
if (expandedFiles.size() <= 100) {
333-
for (Metadata metadata : expandedFiles) {
334-
FileSystems.reportSourceLineage(metadata.resourceId());
335-
}
336-
} else {
337-
HashSet<ResourceId> uniqueDirs = new HashSet<>();
338-
for (Metadata metadata : expandedFiles) {
339-
ResourceId dir = metadata.resourceId().getCurrentDirectory();
340-
uniqueDirs.add(dir);
341-
if (uniqueDirs.size() > 100) {
342-
FileSystems.reportSourceLineage(dir, LineageLevel.TOP_LEVEL);
343-
return;
344-
}
345-
}
346-
for (ResourceId uniqueDir : uniqueDirs) {
347-
FileSystems.reportSourceLineage(uniqueDir);
348-
}
349-
}
321+
List<ResourceId> resourceIds =
322+
expandedFiles.stream().map(Metadata::resourceId).collect(Collectors.toList());
323+
FileSystems.reportSourceLineage(resourceIds);
350324
}
351325

352326
/**

sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.ArrayList;
3030
import java.util.Collection;
3131
import java.util.Collections;
32+
import java.util.HashSet;
3233
import java.util.List;
3334
import java.util.Map;
3435
import java.util.Map.Entry;
@@ -398,6 +399,37 @@ public ResourceId apply(@Nonnull Metadata input) {
398399
.delete(resourceIdsToDelete);
399400
}
400401

402+
/**
403+
* Report source {@link Lineage} metrics for multiple resource ids. Due to the size limit of Beam
404+
* metrics, report full file name or only dir depend on the number of files.
405+
*
406+
* <p>- Number of files<=100, report full file paths;
407+
*
408+
* <p>- Number of directory<=100, report directory names (one level up);
409+
*
410+
* <p>- Otherwise, report top level only.
411+
*/
412+
public static void reportSourceLineage(List<ResourceId> resourceIds) {
413+
if (resourceIds.size() <= 100) {
414+
for (ResourceId resourceId : resourceIds) {
415+
FileSystems.reportSourceLineage(resourceId);
416+
}
417+
} else {
418+
HashSet<ResourceId> uniqueDirs = new HashSet<>();
419+
for (ResourceId resourceId : resourceIds) {
420+
ResourceId dir = resourceId.getCurrentDirectory();
421+
uniqueDirs.add(dir);
422+
if (uniqueDirs.size() > 100) {
423+
FileSystems.reportSourceLineage(dir, LineageLevel.TOP_LEVEL);
424+
return;
425+
}
426+
}
427+
for (ResourceId uniqueDir : uniqueDirs) {
428+
FileSystems.reportSourceLineage(uniqueDir);
429+
}
430+
}
431+
}
432+
401433
/** Report source {@link Lineage} metrics for resource id. */
402434
public static void reportSourceLineage(ResourceId resourceId) {
403435
reportSourceLineage(resourceId, LineageLevel.FILE);

sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.apache.beam.sdk.coders.CoderRegistry;
5353
import org.apache.beam.sdk.coders.KvCoder;
5454
import org.apache.beam.sdk.io.BoundedSource;
55-
import org.apache.beam.sdk.io.FileSystem;
5655
import org.apache.beam.sdk.io.FileSystems;
5756
import org.apache.beam.sdk.io.fs.ResourceId;
5857
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
@@ -753,6 +752,7 @@ public List<BoundedSource<KV<K, V>>> split(long desiredBundleSizeBytes, Pipeline
753752
.collect(Collectors.toList());
754753
}
755754

755+
/** Report only file-based sources */
756756
private void reportSourceLineage(final List<SerializableSplit> inputSplits) {
757757
List<ResourceId> fileResources = new ArrayList<>();
758758

@@ -766,24 +766,7 @@ private void reportSourceLineage(final List<SerializableSplit> inputSplits) {
766766
}
767767
}
768768

769-
if (fileResources.size() <= 100) {
770-
for (ResourceId resource : fileResources) {
771-
FileSystems.reportSourceLineage(resource);
772-
}
773-
} else {
774-
HashSet<ResourceId> uniqueDirs = new HashSet<>();
775-
for (ResourceId resource : fileResources) {
776-
ResourceId dir = resource.getCurrentDirectory();
777-
uniqueDirs.add(dir);
778-
if (uniqueDirs.size() > 100) {
779-
FileSystems.reportSourceLineage(dir, FileSystem.LineageLevel.TOP_LEVEL);
780-
return;
781-
}
782-
}
783-
for (ResourceId dir : uniqueDirs) {
784-
FileSystems.reportSourceLineage(dir);
785-
}
786-
}
769+
FileSystems.reportSourceLineage(fileResources);
787770
}
788771

789772
@Override

0 commit comments

Comments
 (0)