Skip to content

Commit d9965b5

Browse files
committed
[Iceberg] Refine the partition specs that really need to be checked
1 parent c7bf4f5 commit d9965b5

File tree

6 files changed

+234
-14
lines changed

6 files changed

+234
-14
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import org.apache.iceberg.DeleteFiles;
7070
import org.apache.iceberg.FileFormat;
7171
import org.apache.iceberg.FileMetadata;
72-
import org.apache.iceberg.ManifestFile;
7372
import org.apache.iceberg.PartitionField;
7473
import org.apache.iceberg.PartitionSpec;
7574
import org.apache.iceberg.PartitionSpecParser;
@@ -125,6 +124,7 @@
125124
import static com.facebook.presto.iceberg.IcebergUtil.getDeleteMode;
126125
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
127126
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles;
127+
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData;
128128
import static com.facebook.presto.iceberg.IcebergUtil.getPartitions;
129129
import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdAsOfTime;
130130
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
@@ -874,11 +874,7 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa
874874

875875
// Get partition specs that really need to be checked
876876
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
877-
Set<Integer> partitionSpecIds = handle.getIcebergTableName().getSnapshotId().map(
878-
snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream()
879-
.map(ManifestFile::partitionSpecId)
880-
.collect(toImmutableSet()))
881-
.orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter.
877+
Set<Integer> partitionSpecIds = getPartitionSpecsIncludingValidData(icebergTable, handle.getIcebergTableName().getSnapshotId());
882878

883879
Set<Integer> enforcedColumnIds = getEnforcedColumns(icebergTable, partitionSpecIds, domainPredicate, session)
884880
.stream()

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,15 @@ public static Map<PartitionField, Integer> getIdentityPartitions(PartitionSpec p
300300
return columns.build();
301301
}
302302

303+
public static Set<Integer> getPartitionSpecsIncludingValidData(Table icebergTable, Optional<Long> snapshotId)
304+
{
305+
return snapshotId.map(snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream()
306+
.filter(manifestFile -> manifestFile.hasAddedFiles() || manifestFile.hasExistingFiles())
307+
.map(ManifestFile::partitionSpecId)
308+
.collect(toImmutableSet()))
309+
.orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter.
310+
}
311+
303312
public static List<Column> toHiveColumns(List<NestedField> columns)
304313
{
305314
return columns.stream()

presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import com.google.common.collect.ImmutableList;
5252
import com.google.common.collect.ImmutableSet;
5353
import com.google.common.collect.Maps;
54-
import org.apache.iceberg.ManifestFile;
5554
import org.apache.iceberg.PartitionField;
5655
import org.apache.iceberg.PartitionSpec;
5756
import org.apache.iceberg.Table;
@@ -71,11 +70,11 @@
7170
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
7271
import static com.facebook.presto.iceberg.IcebergUtil.getAdjacentValue;
7372
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
73+
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData;
7474
import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith;
7575
import static com.google.common.base.Preconditions.checkArgument;
7676
import static com.google.common.base.Verify.verify;
7777
import static com.google.common.collect.ImmutableMap.toImmutableMap;
78-
import static com.google.common.collect.ImmutableSet.toImmutableSet;
7978
import static java.util.Objects.requireNonNull;
8079
import static java.util.stream.Collectors.toList;
8180

@@ -177,12 +176,7 @@ public PlanNode visitFilter(FilterNode filter, RewriteContext<Void> context)
177176
RowExpression subfieldPredicate = rowExpressionService.getDomainTranslator().toPredicate(subfieldTupleDomain);
178177

179178
// Get partition specs that really need to be checked
180-
Set<Integer> partitionSpecIds = tableHandle.getIcebergTableName().getSnapshotId().map(
181-
snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream()
182-
.map(ManifestFile::partitionSpecId)
183-
.collect(toImmutableSet()))
184-
.orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter.
185-
179+
Set<Integer> partitionSpecIds = getPartitionSpecsIncludingValidData(icebergTable, tableHandle.getIcebergTableName().getSnapshotId());
186180
Set<IcebergColumnHandle> enforcedColumns = getEnforcedColumns(icebergTable,
187181
partitionSpecIds,
188182
entireColumnDomain,

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1588,6 +1588,58 @@ public void testDeleteOnPartitionedV1Table()
15881588
dropTable(session, tableName);
15891589
}
15901590

1591+
@Test(dataProvider = "version_and_mode")
1592+
public void testMetadataDeleteOnTableWithUnsupportedSpecsIncludingNoData(String version, String mode)
1593+
{
1594+
String tableName = "test_empty_partition_spec_table";
1595+
try {
1596+
// Create a table with on partition
1597+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')");
1598+
1599+
// Do not insert data, and evaluate the partition spec by adding a partition column `c`
1600+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
1601+
1602+
// Insert data under the new partition spec
1603+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4);
1604+
1605+
// We can do metadata delete on partition column `c`, because the initial partition spec contains no data
1606+
assertUpdate("DELETE FROM " + tableName + " WHERE c in (1, 3)", 2);
1607+
assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', 2), (4, '1004', 4)");
1608+
}
1609+
finally {
1610+
dropTable(getSession(), tableName);
1611+
}
1612+
}
1613+
1614+
@Test(dataProvider = "version_and_mode")
1615+
public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(String version, String mode)
1616+
{
1617+
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
1618+
String tableName = "test_data_deleted_partition_spec_table";
1619+
try {
1620+
// Create a table with partition column `a`, and insert some data under this partition spec
1621+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])");
1622+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
1623+
1624+
// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
1625+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
1626+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
1627+
1628+
// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
1629+
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);
1630+
1631+
// Do metadata delete on column `a`, because all partition specs contains partition column `a`
1632+
assertUpdate("DELETE FROM " + tableName + " WHERE a in (1, 2)", 2);
1633+
1634+
// Then we can do metadata delete on column `c`, because the old partition spec contains no data now
1635+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
1636+
assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3)");
1637+
}
1638+
finally {
1639+
dropTable(getSession(), tableName);
1640+
}
1641+
}
1642+
15911643
@DataProvider(name = "version_and_mode")
15921644
public Object[][] versionAndMode()
15931645
{

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1406,6 +1406,82 @@ public void testMetadataDeleteOnPartitionedTableWithDeleteFiles()
14061406
}
14071407
}
14081408

1409+
@Test
1410+
public void testMetadataDeleteOnV2MorTableWithEmptyUnsupportedSpecs()
1411+
{
1412+
String tableName = "test_empty_partition_spec_table";
1413+
try {
1414+
// Create a table with on partition
1415+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '2', delete_mode = 'merge-on-read')");
1416+
1417+
// Do not insert data, and evaluate the partition spec by adding a partition column `c`
1418+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
1419+
1420+
// Insert data under the new partition spec
1421+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4);
1422+
1423+
Table icebergTable = loadTable(tableName);
1424+
assertHasDataFiles(icebergTable.currentSnapshot(), 4);
1425+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
1426+
1427+
// Do metadata delete on partition column `c`, because the initial partition spec contains no data
1428+
assertUpdate("DELETE FROM " + tableName + " WHERE c in (1, 3)", 2);
1429+
assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', 2), (4, '1004', 4)");
1430+
1431+
icebergTable = loadTable(tableName);
1432+
assertHasDataFiles(icebergTable.currentSnapshot(), 2);
1433+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
1434+
}
1435+
finally {
1436+
assertUpdate("DROP TABLE IF EXISTS " + tableName);
1437+
}
1438+
}
1439+
1440+
@Test
1441+
public void testMetadataDeleteOnV2MorTableWithUnsupportedSpecsWhoseDataAllDeleted()
1442+
{
1443+
String tableName = "test_data_deleted_partition_spec_table";
1444+
try {
1445+
// Create a table with partition column `a`, and insert some data under this partition spec
1446+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '2', delete_mode = 'merge-on-read', partitioning = ARRAY['a'])");
1447+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
1448+
1449+
Table icebergTable = loadTable(tableName);
1450+
assertHasDataFiles(icebergTable.currentSnapshot(), 2);
1451+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
1452+
1453+
// Evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
1454+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
1455+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
1456+
1457+
icebergTable = loadTable(tableName);
1458+
assertHasDataFiles(icebergTable.currentSnapshot(), 5);
1459+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
1460+
1461+
// Execute row level delete with filter on column `c`, because we have data with old partition spec
1462+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
1463+
icebergTable = loadTable(tableName);
1464+
assertHasDataFiles(icebergTable.currentSnapshot(), 5);
1465+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 2);
1466+
1467+
// Do metadata delete on column `a`, because all partition specs contains partition column `a`
1468+
assertUpdate("DELETE FROM " + tableName + " WHERE a in (1, 2)", 2);
1469+
icebergTable = loadTable(tableName);
1470+
assertHasDataFiles(icebergTable.currentSnapshot(), 3);
1471+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 2);
1472+
1473+
// Then do metadata delete on column `c`, because the old partition spec contains no data now
1474+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 0);
1475+
assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3)");
1476+
icebergTable = loadTable(tableName);
1477+
assertHasDataFiles(icebergTable.currentSnapshot(), 1);
1478+
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
1479+
}
1480+
finally {
1481+
assertUpdate("DROP TABLE IF EXISTS " + tableName);
1482+
}
1483+
}
1484+
14091485
private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List<FileContent> expectedFileContent)
14101486
{
14111487
// check delete file list

presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import com.facebook.presto.common.Subfield;
1919
import com.facebook.presto.common.predicate.Domain;
2020
import com.facebook.presto.common.predicate.TupleDomain;
21+
import com.facebook.presto.common.predicate.ValueSet;
2122
import com.facebook.presto.common.type.TimeZoneKey;
2223
import com.facebook.presto.cost.StatsProvider;
2324
import com.facebook.presto.metadata.FunctionAndTypeManager;
2425
import com.facebook.presto.metadata.Metadata;
2526
import com.facebook.presto.spi.ColumnHandle;
27+
import com.facebook.presto.spi.ConnectorId;
2628
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
2729
import com.facebook.presto.spi.plan.AggregationNode;
2830
import com.facebook.presto.spi.plan.FilterNode;
@@ -88,6 +90,7 @@
8890
import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
8991
import static com.facebook.presto.iceberg.IcebergSessionProperties.PARQUET_DEREFERENCE_PUSHDOWN_ENABLED;
9092
import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED;
93+
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
9194
import static com.facebook.presto.parquet.ParquetTypeUtils.pushdownColumnNameForSubfield;
9295
import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.AND;
9396
import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.OR;
@@ -635,6 +638,96 @@ public void testFiltersWithPushdownDisable()
635638
assertUpdate("DROP TABLE test_filters_with_pushdown_disable");
636639
}
637640

641+
@Test
642+
public void testThoroughlyPushdownForTableWithUnsupportedSpecsIncludingNoData()
643+
{
644+
// The filter pushdown session property is disabled by default
645+
Session sessionWithoutFilterPushdown = getQueryRunner().getDefaultSession();
646+
assertEquals(isPushdownFilterEnabled(sessionWithoutFilterPushdown.toConnectorSession(new ConnectorId(ICEBERG_CATALOG))), false);
647+
648+
String tableName = "test_empty_partition_spec_table";
649+
try {
650+
// Create a table with on partition
651+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '1')");
652+
653+
// Do not insert data, and evaluate the partition spec by adding a partition column `c`
654+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
655+
656+
// Insert data under the new partition spec
657+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4);
658+
659+
// Only identity partition column predicates, would be enforced totally by tableScan
660+
assertPlan(sessionWithoutFilterPushdown, "SELECT a, b FROM " + tableName + " WHERE c > 2",
661+
output(exchange(
662+
strictTableScan(tableName, identityMap("a", "b")))),
663+
plan -> assertTableLayout(
664+
plan,
665+
tableName,
666+
withColumnDomains(ImmutableMap.of(new Subfield(
667+
"c",
668+
ImmutableList.of()),
669+
Domain.create(ValueSet.ofRanges(greaterThan(INTEGER, 2L)), false))),
670+
TRUE_CONSTANT,
671+
ImmutableSet.of("c")));
672+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)");
673+
}
674+
finally {
675+
assertUpdate("DROP TABLE " + tableName);
676+
}
677+
}
678+
679+
@Test
680+
public void testThoroughlyPushdownForTableWithUnsupportedSpecsWhoseDataAllDeleted()
681+
{
682+
// The filter pushdown session property is disabled by default
683+
Session sessionWithoutFilterPushdown = getQueryRunner().getDefaultSession();
684+
assertEquals(isPushdownFilterEnabled(sessionWithoutFilterPushdown.toConnectorSession(new ConnectorId(ICEBERG_CATALOG))), false);
685+
686+
String tableName = "test_data_deleted_partition_spec_table";
687+
try {
688+
// Create a table with partition column `a`, and insert some data under this partition spec
689+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '1', partitioning = ARRAY['a'])");
690+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
691+
692+
// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
693+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
694+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
695+
696+
// The predicate was enforced partially by tableScan, filter on `c` could not be thoroughly pushed down, so the filterNode drop it's filter condition `a > 2`
697+
assertPlan(sessionWithoutFilterPushdown, "SELECT b FROM " + tableName + " WHERE a > 2 and c = 4",
698+
output(exchange(project(
699+
filter("c = 4",
700+
strictTableScan(tableName, identityMap("b", "c")))))));
701+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)");
702+
703+
// Do metadata delete on column `a`, because all partition specs contains partition column `a`
704+
assertUpdate("DELETE FROM " + tableName + " WHERE a IN (1, 2)", 2);
705+
706+
// Only identity partition column predicates, would be enforced totally by tableScan
707+
assertPlan(sessionWithoutFilterPushdown, "SELECT b FROM " + tableName + " WHERE a > 2 and c = 4",
708+
output(exchange(
709+
strictTableScan(tableName, identityMap("b")))),
710+
plan -> assertTableLayout(
711+
plan,
712+
tableName,
713+
withColumnDomains(ImmutableMap.of(
714+
new Subfield(
715+
"a",
716+
ImmutableList.of()),
717+
Domain.create(ValueSet.ofRanges(greaterThan(INTEGER, 2L)), false),
718+
new Subfield(
719+
"c",
720+
ImmutableList.of()),
721+
singleValue(INTEGER, 4L))),
722+
TRUE_CONSTANT,
723+
ImmutableSet.of("a", "c")));
724+
assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)");
725+
}
726+
finally {
727+
assertUpdate("DROP TABLE " + tableName);
728+
}
729+
}
730+
638731
@DataProvider(name = "timezones")
639732
public Object[][] timezones()
640733
{

0 commit comments

Comments
 (0)