Skip to content

Commit 14bd9f0

Browse files
authored
OFD to only soft delete when data_manifest exists (#418)
## Summary <!--- HINT: Replace #nnn with corresponding Issue number, if you are fixing an existing issue --> Uploader could not discover partitions in backup folder without data_manifest.json. This led the orphan files to stay in backup folder for long time. This PR made OFD to purge these data when data_manifest.json does not exist. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [x] Bug Fixes - [ ] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done <!--- Check any relevant boxes with "x" --> - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [x] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request.
1 parent 93886d8 commit 14bd9f0

File tree

2 files changed

+55
-51
lines changed

2 files changed

+55
-51
lines changed

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ public DeleteOrphanFiles.Result deleteOrphanFiles(
118118
} else if (file.contains(backupDirRoot.toString())) {
119119
// files present in .backup dir should not be considered orphan
120120
log.info("Skipped deleting backup file {}", file);
121-
} else if (file.contains(dataDirRoot.toString()) && backupEnabled) {
121+
} else if (file.contains(dataDirRoot.toString())
122+
&& backupEnabled
123+
&& isExistBackupDataManifests(table, file, backupDir)) {
122124
// move data files to backup dir if backup is enabled
123125
Path backupFilePath = getTrashPath(table, file, backupDir);
124126
log.info("Moving orphan file {} to {}", file, backupFilePath);
@@ -141,6 +143,17 @@ public DeleteOrphanFiles.Result deleteOrphanFiles(
141143
return operation.execute();
142144
}
143145

146+
private boolean isExistBackupDataManifests(Table table, String file, String backupDir) {
147+
try {
148+
Path backupFilePath = getTrashPath(table, file, backupDir);
149+
Path pattern = new Path(backupFilePath.getParent(), "data_manifest*");
150+
FileStatus[] matches = fs().globStatus(pattern);
151+
return matches != null && matches.length > 0;
152+
} catch (IOException e) {
153+
return false;
154+
}
155+
}
156+
144157
/**
145158
* Run deleteOrphanDirectory operation on the given table directory path with time filter, moves
146159
* files to the given trash subdirectory if the table is created older than the provided

apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java

Lines changed: 41 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -319,24 +319,14 @@ public void testOrphanFilesDeletionJavaAPI() throws Exception {
319319
populateTable(ops, tableName, numInserts);
320320
Table table = ops.getTable(tableName);
321321
log.info("Loaded table {}, location {}", table.name(), table.location());
322-
List<Row> snapshots =
323-
ops.spark().sql(String.format("SELECT * from %s.history", tableName)).collectAsList();
324-
Assertions.assertEquals(numInserts, snapshots.size());
325-
log.info("Found {} snapshots", snapshots.size());
326-
for (Row metadataFileRow : snapshots) {
327-
log.info(metadataFileRow.toString());
328-
}
329322
Path orphanFilePath = new Path(table.location(), testOrphanFileName);
323+
Path dataManifestPath = new Path(table.location(), ".backup/data/data_manifest_123.json");
330324
FileSystem fs = ops.fs();
331325
fs.createNewFile(orphanFilePath);
332-
log.info("Created orphan file {}", testOrphanFileName);
326+
fs.createNewFile(dataManifestPath);
333327
DeleteOrphanFiles.Result result =
334328
ops.deleteOrphanFiles(table, System.currentTimeMillis(), true, BACKUP_DIR);
335329
List<String> orphanFiles = Lists.newArrayList(result.orphanFileLocations().iterator());
336-
log.info("Detected {} orphan files", orphanFiles.size());
337-
for (String of : orphanFiles) {
338-
log.info("File {}", of);
339-
}
340330
Assertions.assertTrue(
341331
fs.exists(new Path(table.location(), new Path(BACKUP_DIR, testOrphanFileName))));
342332
Assertions.assertEquals(1, orphanFiles.size());
@@ -357,25 +347,19 @@ public void testOrphanFilesDeletionIgnoresFilesInBackupDir() throws Exception {
357347
Table table = ops.getTable(tableName);
358348
log.info("Loaded table {}, location {}", table.name(), table.location());
359349
Path orphanFilePath = new Path(table.location(), testOrphanFileName);
350+
Path dataManifestPath = new Path(table.location(), ".backup/data/data_manifest_123.json");
360351
FileSystem fs = ops.fs();
361352
fs.createNewFile(orphanFilePath);
362-
log.info("Created orphan file {}", testOrphanFileName);
353+
fs.createNewFile(dataManifestPath);
354+
ops.deleteOrphanFiles(table, System.currentTimeMillis(), true, BACKUP_DIR);
355+
Path backupFilePath = new Path(table.location(), new Path(BACKUP_DIR, testOrphanFileName));
356+
Assertions.assertTrue(fs.exists(backupFilePath));
357+
// run delete operation again and verify that files in .backup are not listed as Orphan
363358
DeleteOrphanFiles.Result result =
364359
ops.deleteOrphanFiles(table, System.currentTimeMillis(), true, BACKUP_DIR);
365360
List<String> orphanFiles = Lists.newArrayList(result.orphanFileLocations().iterator());
366-
log.info("Detected {} orphan files", orphanFiles.size());
367-
for (String of : orphanFiles) {
368-
log.info("File {}", of);
369-
}
370-
Path trashFilePath = new Path(table.location(), new Path(BACKUP_DIR, testOrphanFileName));
371-
Assertions.assertTrue(fs.exists(trashFilePath));
372-
// run delete operation again and verify that files in .trash are not listed as Orphan
373-
DeleteOrphanFiles.Result result2 =
374-
ops.deleteOrphanFiles(table, System.currentTimeMillis(), true, BACKUP_DIR);
375-
List<String> orphanFiles2 = Lists.newArrayList(result2.orphanFileLocations().iterator());
376-
log.info("Detected {} orphan files", orphanFiles2.size());
377-
Assertions.assertEquals(0, orphanFiles2.size());
378-
Assertions.assertTrue(fs.exists(trashFilePath));
361+
Assertions.assertEquals(0, orphanFiles.size());
362+
Assertions.assertTrue(fs.exists(backupFilePath));
379363
}
380364
}
381365

@@ -389,24 +373,14 @@ public void testOrphanFilesDeletionDeleteNonDataFiles() throws Exception {
389373
populateTable(ops, tableName, numInserts);
390374
Table table = ops.getTable(tableName);
391375
log.info("Loaded table {}, location {}", table.name(), table.location());
392-
List<Row> snapshots =
393-
ops.spark().sql(String.format("SELECT * from %s.history", tableName)).collectAsList();
394-
Assertions.assertEquals(numInserts, snapshots.size());
395-
log.info("Found {} snapshots", snapshots.size());
396-
for (Row metadataFileRow : snapshots) {
397-
log.info(metadataFileRow.toString());
398-
}
399376
Path orphanFilePath = new Path(table.location(), testOrphanFileName);
377+
Path dataManifestPath = new Path(table.location(), ".backup/data/data_manifest_123.json");
400378
FileSystem fs = ops.fs();
401379
fs.createNewFile(orphanFilePath);
402-
log.info("Created orphan file {}", testOrphanFileName);
380+
fs.createNewFile(dataManifestPath);
403381
DeleteOrphanFiles.Result result =
404382
ops.deleteOrphanFiles(table, System.currentTimeMillis(), true, BACKUP_DIR);
405383
List<String> orphanFiles = Lists.newArrayList(result.orphanFileLocations().iterator());
406-
log.info("Detected {} orphan files", orphanFiles.size());
407-
for (String of : orphanFiles) {
408-
log.info("File {}", of);
409-
}
410384
Assertions.assertFalse(
411385
fs.exists(new Path(table.location(), new Path(BACKUP_DIR, testOrphanFileName))));
412386
Assertions.assertEquals(1, orphanFiles.size());
@@ -426,24 +400,39 @@ public void testOrphanFilesDeletionBackupDisabled() throws Exception {
426400
populateTable(ops, tableName, numInserts);
427401
Table table = ops.getTable(tableName);
428402
log.info("Loaded table {}, location {}", table.name(), table.location());
429-
List<Row> snapshots =
430-
ops.spark().sql(String.format("SELECT * from %s.history", tableName)).collectAsList();
431-
Assertions.assertEquals(numInserts, snapshots.size());
432-
log.info("Found {} snapshots", snapshots.size());
433-
for (Row metadataFileRow : snapshots) {
434-
log.info(metadataFileRow.toString());
435-
}
436403
Path orphanFilePath = new Path(table.location(), testOrphanFileName);
404+
Path dataManifestPath = new Path(table.location(), ".backup/data/data_manifest_123.json");
437405
FileSystem fs = ops.fs();
438406
fs.createNewFile(orphanFilePath);
439-
log.info("Created orphan file {}", testOrphanFileName);
407+
fs.createNewFile(dataManifestPath);
440408
DeleteOrphanFiles.Result result =
441409
ops.deleteOrphanFiles(table, System.currentTimeMillis(), false, BACKUP_DIR);
442410
List<String> orphanFiles = Lists.newArrayList(result.orphanFileLocations().iterator());
443-
log.info("Detected {} orphan files", orphanFiles.size());
444-
for (String of : orphanFiles) {
445-
log.info("File {}", of);
446-
}
411+
Assertions.assertFalse(
412+
fs.exists(new Path(table.location(), new Path(BACKUP_DIR, testOrphanFileName))));
413+
Assertions.assertEquals(1, orphanFiles.size());
414+
Assertions.assertTrue(
415+
orphanFiles.get(0).endsWith(table.location() + "/" + testOrphanFileName));
416+
Assertions.assertFalse(fs.exists(orphanFilePath));
417+
}
418+
}
419+
420+
@Test
421+
public void testOrphanFilesDeletionDeleteDataWhenDataManifestNotExists() throws Exception {
422+
final String tableName = "db.test_ofd_java";
423+
final String testOrphanFileName = "data/test_orphan_file.orc";
424+
final int numInserts = 3;
425+
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
426+
prepareTable(ops, tableName);
427+
populateTable(ops, tableName, numInserts);
428+
Table table = ops.getTable(tableName);
429+
log.info("Loaded table {}, location {}", table.name(), table.location());
430+
Path orphanFilePath = new Path(table.location(), testOrphanFileName);
431+
FileSystem fs = ops.fs();
432+
fs.createNewFile(orphanFilePath);
433+
DeleteOrphanFiles.Result result =
434+
ops.deleteOrphanFiles(table, System.currentTimeMillis(), true, BACKUP_DIR);
435+
List<String> orphanFiles = Lists.newArrayList(result.orphanFileLocations().iterator());
447436
Assertions.assertFalse(
448437
fs.exists(new Path(table.location(), new Path(BACKUP_DIR, testOrphanFileName))));
449438
Assertions.assertEquals(1, orphanFiles.size());
@@ -698,9 +687,11 @@ public void testStagedFilesDelete() throws Exception {
698687
log.info("Loaded table {}, location {}", table.name(), table.location());
699688
Path orphanFilePath1 = new Path(table.location(), testOrphanFile1);
700689
Path orphanFilePath2 = new Path(table.location(), testOrphanFile2);
690+
Path dataManifestPath = new Path(table.location(), ".trash/data/data_manifest_123.json");
701691
FileSystem fs = ops.fs();
702692
fs.createNewFile(orphanFilePath1);
703693
fs.createNewFile(orphanFilePath2);
694+
fs.createNewFile(dataManifestPath);
704695
log.info("Created orphan file {}", testOrphanFile1);
705696
log.info("Created orphan file {}", testOrphanFile2);
706697
ops.deleteOrphanFiles(table, System.currentTimeMillis(), true, TRASH_DIR);

0 commit comments

Comments
 (0)