Skip to content

Commit c7bf4f5

Browse files
committed
Fix metadata delete/truncate on Iceberg table with delete files
1 parent f365a2c commit c7bf4f5

File tree

2 files changed

+129
-36
lines changed

2 files changed

+129
-36
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 17 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import org.apache.iceberg.DeleteFiles;
7070
import org.apache.iceberg.FileFormat;
7171
import org.apache.iceberg.FileMetadata;
72-
import org.apache.iceberg.FileScanTask;
7372
import org.apache.iceberg.ManifestFile;
7473
import org.apache.iceberg.PartitionField;
7574
import org.apache.iceberg.PartitionSpec;
@@ -79,16 +78,12 @@
7978
import org.apache.iceberg.Schema;
8079
import org.apache.iceberg.SchemaParser;
8180
import org.apache.iceberg.Table;
82-
import org.apache.iceberg.TableScan;
8381
import org.apache.iceberg.Transaction;
84-
import org.apache.iceberg.expressions.Expression;
85-
import org.apache.iceberg.io.CloseableIterable;
8682
import org.apache.iceberg.types.Type;
8783
import org.apache.iceberg.types.TypeUtil;
8884
import org.apache.iceberg.types.Types;
8985
import org.apache.iceberg.util.CharSequenceSet;
9086

91-
import java.io.IOException;
9287
import java.util.ArrayList;
9388
import java.util.Collection;
9489
import java.util.Collections;
@@ -101,7 +96,6 @@
10196
import java.util.concurrent.ConcurrentHashMap;
10297
import java.util.concurrent.ConcurrentMap;
10398
import java.util.concurrent.atomic.AtomicInteger;
104-
import java.util.concurrent.atomic.AtomicLong;
10599
import java.util.stream.Collectors;
106100

107101
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
@@ -148,7 +142,6 @@
148142
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
149143
import static com.facebook.presto.iceberg.changelog.ChangelogUtil.getRowTypeFromColumnMeta;
150144
import static com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizer.getEnforcedColumns;
151-
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
152145
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
153146
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
154147
import static com.google.common.base.Verify.verify;
@@ -159,6 +152,9 @@
159152
import static java.util.Collections.singletonList;
160153
import static java.util.Objects.requireNonNull;
161154
import static org.apache.iceberg.MetadataColumns.ROW_POSITION;
155+
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
156+
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
157+
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
162158

163159
public abstract class IcebergAbstractMetadata
164160
implements ConnectorMetadata
@@ -769,12 +765,7 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
769765
{
770766
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
771767
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
772-
try (CloseableIterable<FileScanTask> files = icebergTable.newScan().planFiles()) {
773-
removeScanFiles(icebergTable, files);
774-
}
775-
catch (IOException e) {
776-
throw new PrestoException(GENERIC_INTERNAL_ERROR, "failed to scan files for delete", e);
777-
}
768+
removeScanFiles(icebergTable, TupleDomain.all());
778769
}
779770

780771
@Override
@@ -916,39 +907,29 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl
916907
throw new TableNotFoundException(handle.getSchemaTableName());
917908
}
918909

919-
TableScan scan = icebergTable.newScan();
920910
TupleDomain<IcebergColumnHandle> domainPredicate = layoutHandle.getValidPredicate();
921-
922-
if (!domainPredicate.isAll()) {
923-
Expression filterExpression = toIcebergExpression(domainPredicate);
924-
scan = scan.filter(filterExpression);
925-
}
926-
927-
try (CloseableIterable<FileScanTask> files = scan.planFiles()) {
928-
return OptionalLong.of(removeScanFiles(icebergTable, files));
929-
}
930-
catch (IOException e) {
931-
throw new PrestoException(GENERIC_INTERNAL_ERROR, "failed to scan files for delete", e);
932-
}
911+
return removeScanFiles(icebergTable, domainPredicate);
933912
}
934913

935914
/**
936-
* Deletes all the files within a particular scan
915+
* Deletes all the files for a specific predicate
937916
*
938917
* @return the number of rows deleted from all files
939918
*/
940-
private long removeScanFiles(Table icebergTable, Iterable<FileScanTask> scan)
919+
private OptionalLong removeScanFiles(Table icebergTable, TupleDomain<IcebergColumnHandle> predicate)
941920
{
942921
transaction = icebergTable.newTransaction();
943-
DeleteFiles deletes = transaction.newDelete();
944-
AtomicLong rowsDeleted = new AtomicLong(0L);
945-
scan.forEach(t -> {
946-
deletes.deleteFile(t.file());
947-
rowsDeleted.addAndGet(t.estimatedRowsCount());
948-
});
949-
deletes.commit();
922+
DeleteFiles deleteFiles = transaction.newDelete()
923+
.deleteFromRowFilter(toIcebergExpression(predicate));
924+
deleteFiles.commit();
950925
transaction.commitTransaction();
951-
return rowsDeleted.get();
926+
927+
Map<String, String> summary = icebergTable.currentSnapshot().summary();
928+
long deletedRecords = Long.parseLong(summary.getOrDefault(DELETED_RECORDS_PROP, "0"));
929+
long removedPositionDeletes = Long.parseLong(summary.getOrDefault(REMOVED_POS_DELETES_PROP, "0"));
930+
long removedEqualityDeletes = Long.parseLong(summary.getOrDefault(REMOVED_EQ_DELETES_PROP, "0"));
931+
// Removed rows count is inaccurate when existing equality delete files
932+
return OptionalLong.of(deletedRecords - removedPositionDeletes - removedEqualityDeletes);
952933
}
953934

954935
private static long getSnapshotIdForTableVersion(Table table, ConnectorTableVersion tableVersion)

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.iceberg.FileScanTask;
5555
import org.apache.iceberg.PartitionSpec;
5656
import org.apache.iceberg.Schema;
57+
import org.apache.iceberg.Snapshot;
5758
import org.apache.iceberg.Table;
5859
import org.apache.iceberg.TableMetadata;
5960
import org.apache.iceberg.TableOperations;
@@ -115,6 +116,8 @@
115116
import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix;
116117
import static java.lang.String.format;
117118
import static java.util.Objects.requireNonNull;
119+
import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP;
120+
import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP;
118121
import static org.testng.Assert.assertNotEquals;
119122
import static org.testng.Assert.assertTrue;
120123

@@ -460,6 +463,35 @@ public void testTruncate()
460463
assertUpdate("DROP TABLE test_truncate");
461464
}
462465

466+
@Test
467+
public void testTruncateTableWithDeleteFiles()
468+
{
469+
String tableName = "test_v2_row_delete_" + randomTableSuffix();
470+
try {
471+
assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar) WITH (format_version = '2', delete_mode = 'merge-on-read')");
472+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (3, '1003')", 3);
473+
474+
// execute row level deletion
475+
assertUpdate("DELETE FROM " + tableName + " WHERE b in ('1002', '1003')", 2);
476+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001')");
477+
478+
Table icebergTable = loadTable(tableName);
479+
assertHasDataFiles(icebergTable.currentSnapshot(), 1);
480+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 1);
481+
482+
// execute truncate table
483+
assertUpdate("TRUNCATE TABLE " + tableName);
484+
assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0");
485+
486+
icebergTable = loadTable(tableName);
487+
assertHasDataFiles(icebergTable.currentSnapshot(), 0);
488+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
489+
}
490+
finally {
491+
assertUpdate("DROP TABLE IF EXISTS " + tableName);
492+
}
493+
}
494+
463495
@Override
464496
public void testShowColumns()
465497
{
@@ -1308,6 +1340,72 @@ public void testPartShowStatsWithFilters()
13081340
assertQuerySucceeds("DROP TABLE showstatsfilters");
13091341
}
13101342

1343+
@Test
1344+
public void testMetadataDeleteOnUnPartitionedTableWithDeleteFiles()
1345+
{
1346+
String tableName = "test_v2_row_delete_" + randomTableSuffix();
1347+
try {
1348+
assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar) WITH (format_version = '2', delete_mode = 'merge-on-read')");
1349+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (3, '1003')", 3);
1350+
1351+
// execute row level deletion
1352+
assertUpdate("DELETE FROM " + tableName + " WHERE b in ('1002', '1003')", 2);
1353+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001')");
1354+
1355+
Table icebergTable = loadTable(tableName);
1356+
assertHasDataFiles(icebergTable.currentSnapshot(), 1);
1357+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 1);
1358+
1359+
// execute whole table metadata deletion
1360+
assertUpdate("DELETE FROM " + tableName, 1);
1361+
assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0");
1362+
1363+
icebergTable = loadTable(tableName);
1364+
assertHasDataFiles(icebergTable.currentSnapshot(), 0);
1365+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
1366+
}
1367+
finally {
1368+
assertUpdate("DROP TABLE IF EXISTS " + tableName);
1369+
}
1370+
}
1371+
1372+
@Test
1373+
public void testMetadataDeleteOnPartitionedTableWithDeleteFiles()
1374+
{
1375+
String tableName = "test_v2_row_delete_" + randomTableSuffix();
1376+
try {
1377+
assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar) WITH (format_version = '2', delete_mode = 'merge-on-read', partitioning = ARRAY['a'])");
1378+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (3, '1003')", 3);
1379+
1380+
// execute row level deletion
1381+
assertUpdate("DELETE FROM " + tableName + " WHERE b in ('1002', '1003')", 2);
1382+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001')");
1383+
1384+
Table icebergTable = loadTable(tableName);
1385+
assertHasDataFiles(icebergTable.currentSnapshot(), 3);
1386+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 2);
1387+
1388+
// execute metadata deletion with filter
1389+
assertUpdate("DELETE FROM " + tableName + " WHERE a in (2, 3)", 0);
1390+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001')");
1391+
1392+
icebergTable = loadTable(tableName);
1393+
assertHasDataFiles(icebergTable.currentSnapshot(), 1);
1394+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
1395+
1396+
// execute whole table metadata deletion
1397+
assertUpdate("DELETE FROM " + tableName, 1);
1398+
assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0");
1399+
1400+
icebergTable = loadTable(tableName);
1401+
assertHasDataFiles(icebergTable.currentSnapshot(), 0);
1402+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
1403+
}
1404+
finally {
1405+
assertUpdate("DROP TABLE IF EXISTS " + tableName);
1406+
}
1407+
}
1408+
13111409
private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List<FileContent> expectedFileContent)
13121410
{
13131411
// check delete file list
@@ -1450,4 +1548,18 @@ private void testWithAllFileFormats(Session session, BiConsumer<Session, FileFor
14501548
test.accept(session, FileFormat.PARQUET);
14511549
test.accept(session, FileFormat.ORC);
14521550
}
1551+
1552+
private void assertHasDataFiles(Snapshot snapshot, int dataFilesCount)
1553+
{
1554+
Map<String, String> map = snapshot.summary();
1555+
int totalDataFiles = Integer.valueOf(map.get(TOTAL_DATA_FILES_PROP));
1556+
assertEquals(totalDataFiles, dataFilesCount);
1557+
}
1558+
1559+
private void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount)
1560+
{
1561+
Map<String, String> map = snapshot.summary();
1562+
int totalDeleteFiles = Integer.valueOf(map.get(TOTAL_DELETE_FILES_PROP));
1563+
assertEquals(totalDeleteFiles, deleteFilesCount);
1564+
}
14531565
}

0 commit comments

Comments
 (0)