Skip to content

Commit 8e80331

Browse files
authored
Spark: Remove dependency on hadoop's filesystem class from remove orphan files (apache#12254)
1 parent a1a9ba1 commit 8e80331

File tree

4 files changed

+240
-51
lines changed

4 files changed

+240
-51
lines changed

docs/docs/spark-procedures.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ Used to remove files which are not referenced in any metadata files of an Iceber
317317
| `equal_schemes` | | map<string, string> | Mapping of file system schemes to be considered equal. Key is a comma-separated list of schemes and value is a scheme (defaults to `map('s3a,s3n','s3')`). |
318318
| `equal_authorities` | | map<string, string> | Mapping of file system authorities to be considered equal. Key is a comma-separated list of authorities and value is an authority. |
319319
| `prefix_mismatch_mode` | | string | Action behavior when location prefixes (schemes/authorities) mismatch: <ul><li>ERROR - throw an exception. (default) </li><li>IGNORE - no action.</li><li>DELETE - delete files.</li></ul> |
320+
| `prefix_listing` | | boolean | When true, use prefix-based file listing via the `SupportsPrefixOperations` interface. The Table FileIO implementation must support `SupportsPrefixOperations` when this flag is enabled (defaults to false) |
320321

321322
#### Output
322323

@@ -370,6 +371,11 @@ CALL catalog_name.system.remove_orphan_files(table => 'db.sample', equal_schemes
370371
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', equal_authorities => map('ns1', 'ns2'));
371372
```
372373

374+
List all the files that are candidates for removal using prefix listing.
375+
```sql
376+
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', prefix_listing => true);
377+
```
378+
373379
### `rewrite_data_files`
374380

375381
Iceberg tracks each data file in a table. More data files leads to more metadata stored in manifest files, and small data files causes an unnecessary amount of metadata and less efficient queries from file open costs.

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java

Lines changed: 84 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.iceberg.hadoop.HiddenPathFilter;
5151
import org.apache.iceberg.io.BulkDeletionFailureException;
5252
import org.apache.iceberg.io.SupportsBulkOperations;
53+
import org.apache.iceberg.io.SupportsPrefixOperations;
5354
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
5455
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
5556
import org.apache.iceberg.relocated.com.google.common.base.Strings;
@@ -121,6 +122,7 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
121122
private Dataset<Row> compareToFileList;
122123
private Consumer<String> deleteFunc = null;
123124
private ExecutorService deleteExecutorService = null;
125+
private boolean usePrefixListing = false;
124126

125127
DeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
126128
super(spark);
@@ -206,6 +208,11 @@ public DeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
206208
return this;
207209
}
208210

211+
public DeleteOrphanFilesSparkAction usePrefixListing(boolean newUsePrefixListing) {
212+
this.usePrefixListing = newUsePrefixListing;
213+
return this;
214+
}
215+
209216
private Dataset<String> filteredCompareToFileList() {
210217
Dataset<Row> files = compareToFileList;
211218
if (location != null) {
@@ -303,39 +310,90 @@ private Dataset<String> listedFileDS() {
303310
List<String> subDirs = Lists.newArrayList();
304311
List<String> matchingFiles = Lists.newArrayList();
305312

306-
Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
307313
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
308314

309-
// list at most MAX_DRIVER_LISTING_DEPTH levels and only dirs that have
310-
// less than MAX_DRIVER_LISTING_DIRECT_SUB_DIRS direct sub dirs on the driver
311-
listDirRecursively(
312-
location,
313-
predicate,
314-
hadoopConf.value(),
315-
MAX_DRIVER_LISTING_DEPTH,
316-
MAX_DRIVER_LISTING_DIRECT_SUB_DIRS,
317-
subDirs,
318-
pathFilter,
319-
matchingFiles);
320-
321-
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
322-
323-
if (subDirs.isEmpty()) {
315+
if (usePrefixListing) {
316+
Preconditions.checkArgument(
317+
table.io() instanceof SupportsPrefixOperations,
318+
"Cannot use prefix listing with FileIO {} which does not support prefix operations.",
319+
table.io());
320+
321+
Predicate<org.apache.iceberg.io.FileInfo> predicate =
322+
fileInfo -> fileInfo.createdAtMillis() < olderThanTimestamp;
323+
listDirRecursivelyWithFileIO(
324+
(SupportsPrefixOperations) table.io(), location, predicate, pathFilter, matchingFiles);
325+
326+
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
324327
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
328+
} else {
329+
Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
330+
// list at most MAX_DRIVER_LISTING_DEPTH levels and only dirs that have
331+
// less than MAX_DRIVER_LISTING_DIRECT_SUB_DIRS direct sub dirs on the driver
332+
listDirRecursivelyWithHadoop(
333+
location,
334+
predicate,
335+
hadoopConf.value(),
336+
MAX_DRIVER_LISTING_DEPTH,
337+
MAX_DRIVER_LISTING_DIRECT_SUB_DIRS,
338+
subDirs,
339+
pathFilter,
340+
matchingFiles);
341+
342+
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
343+
344+
if (subDirs.isEmpty()) {
345+
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
346+
}
347+
348+
int parallelism = Math.min(subDirs.size(), listingParallelism);
349+
JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);
350+
351+
Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
352+
ListDirsRecursively listDirs = new ListDirsRecursively(conf, olderThanTimestamp, pathFilter);
353+
JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirs);
354+
355+
JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
356+
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
357+
}
358+
}
359+
360+
private static void listDirRecursivelyWithFileIO(
361+
SupportsPrefixOperations io,
362+
String dir,
363+
Predicate<org.apache.iceberg.io.FileInfo> predicate,
364+
PathFilter pathFilter,
365+
List<String> matchingFiles) {
366+
String listPath = dir;
367+
if (!dir.endsWith("/")) {
368+
listPath = dir + "/";
369+
}
370+
371+
Iterable<org.apache.iceberg.io.FileInfo> files = io.listPrefix(listPath);
372+
for (org.apache.iceberg.io.FileInfo file : files) {
373+
Path path = new Path(file.location());
374+
if (!isHiddenPath(dir, path, pathFilter) && predicate.test(file)) {
375+
matchingFiles.add(file.location());
376+
}
325377
}
378+
}
326379

327-
int parallelism = Math.min(subDirs.size(), listingParallelism);
328-
JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);
380+
private static boolean isHiddenPath(String baseDir, Path path, PathFilter pathFilter) {
381+
boolean isHiddenPath = false;
382+
Path currentPath = path;
383+
while (currentPath.getParent().toString().contains(baseDir)) {
384+
if (!pathFilter.accept(currentPath)) {
385+
isHiddenPath = true;
386+
break;
387+
}
329388

330-
Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
331-
ListDirsRecursively listDirs = new ListDirsRecursively(conf, olderThanTimestamp, pathFilter);
332-
JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirs);
389+
currentPath = currentPath.getParent();
390+
}
333391

334-
JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
335-
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
392+
return isHiddenPath;
336393
}
337394

338-
private static void listDirRecursively(
395+
@VisibleForTesting
396+
static void listDirRecursivelyWithHadoop(
339397
String dir,
340398
Predicate<FileStatus> predicate,
341399
Configuration conf,
@@ -372,7 +430,7 @@ private static void listDirRecursively(
372430
}
373431

374432
for (String subDir : subDirs) {
375-
listDirRecursively(
433+
listDirRecursivelyWithHadoop(
376434
subDir,
377435
predicate,
378436
conf,
@@ -458,7 +516,7 @@ public Iterator<String> call(Iterator<String> dirs) throws Exception {
458516
Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
459517

460518
while (dirs.hasNext()) {
461-
listDirRecursively(
519+
listDirRecursivelyWithHadoop(
462520
dirs.next(),
463521
predicate,
464522
hadoopConf.value().value(),

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
6363
ProcedureParameter.optional("equal_schemes", STRING_MAP),
6464
ProcedureParameter.optional("equal_authorities", STRING_MAP),
6565
ProcedureParameter.optional("prefix_mismatch_mode", DataTypes.StringType),
66+
// List files with prefix operations. Default is false.
67+
ProcedureParameter.optional("prefix_listing", DataTypes.BooleanType)
6668
};
6769

6870
private static final StructType OUTPUT_TYPE =
@@ -136,6 +138,8 @@ public InternalRow[] call(InternalRow args) {
136138
PrefixMismatchMode prefixMismatchMode =
137139
args.isNullAt(8) ? null : PrefixMismatchMode.fromString(args.getString(8));
138140

141+
boolean prefixListing = args.isNullAt(9) ? false : args.getBoolean(9);
142+
139143
return withIcebergTable(
140144
tableIdent,
141145
table -> {
@@ -182,6 +186,8 @@ public InternalRow[] call(InternalRow args) {
182186
action.prefixMismatchMode(prefixMismatchMode);
183187
}
184188

189+
action.usePrefixListing(prefixListing);
190+
185191
DeleteOrphanFiles.Result result = action.execute();
186192

187193
return toOutputRows(result);

0 commit comments

Comments
 (0)