Skip to content

Commit 05de3c8

Browse files
committed
Support Iceberg procedure rewrite_data_files
1 parent 64a002f commit 05de3c8

File tree

7 files changed

+897
-2
lines changed

7 files changed

+897
-2
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,46 @@ Examples:
772772

773773
CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]);
774774

775+
Rewrite Data Files
776+
^^^^^^^^^^^^^^^^^^
777+
778+
In Iceberg, we tracks all data files under different partition specs in a table. More data files leads to
779+
more metadata stored in manifest files, and small data files causes an unnecessary amount of metadata and
780+
less efficient queries from file open costs. Besides, data files under different partition specs might
781+
prevent metadata level deletion or predicate thoroughly push down for Presto engine.
782+
783+
Procedure `rewrite_data_files` is used for rewriting the data files of the specified table so that they are
784+
merged into fewer but larger files under the newest partition spec. If the table is partitioned, the data
785+
files compaction could act separately on the selected partitions. This can improve read performance by reducing
786+
metadata overhead and runtime file open cost.
787+
788+
The following arguments are available:
789+
790+
===================== ========== =============== =======================================================================
791+
Argument Name required type Description
792+
===================== ========== =============== =======================================================================
793+
``schema`` ✔️ string Schema of the table to update
794+
795+
``table_name`` ✔️ string Name of the table to update
796+
797+
``filter`` string predicate as a string used for filtering the files. Note that currently
798+
we only support rewrite whole partitions, that is, the filter should be
799+
on partition columns(defaults to `true`)
800+
801+
``options`` map Options to be used for data files rewrite(to be expanded)
802+
===================== ========== =============== =======================================================================
803+
804+
Examples:
805+
806+
* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones::
807+
808+
CALL iceberg.system.rewrite_data_files('db', 'sample');
809+
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample');
810+
811+
* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec::
812+
813+
CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1');
814+
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1');
775815

776816
SQL Support
777817
-----------

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ public void setup(Binder binder)
154154
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
155155
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
156156
procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON);
157+
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);
157158

158159
// for orc
159160
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.airlift.json.JsonCodec;
17+
import com.facebook.presto.common.predicate.TupleDomain;
18+
import com.facebook.presto.common.type.TypeManager;
19+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
20+
import com.facebook.presto.spi.ConnectorSession;
21+
import com.facebook.presto.spi.ConnectorSplitSource;
22+
import com.facebook.presto.spi.FixedSplitSource;
23+
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
24+
import com.facebook.presto.spi.procedure.DistributedProcedure;
25+
import com.facebook.presto.spi.procedure.Procedure;
26+
import com.google.common.base.VerifyException;
27+
import com.google.common.collect.ImmutableList;
28+
import com.google.common.collect.ImmutableSet;
29+
import io.airlift.slice.Slice;
30+
import org.apache.iceberg.DataFile;
31+
import org.apache.iceberg.DataFiles;
32+
import org.apache.iceberg.DeleteFile;
33+
import org.apache.iceberg.FileContent;
34+
import org.apache.iceberg.FileScanTask;
35+
import org.apache.iceberg.PartitionSpecParser;
36+
import org.apache.iceberg.RewriteFiles;
37+
import org.apache.iceberg.SchemaParser;
38+
import org.apache.iceberg.Snapshot;
39+
import org.apache.iceberg.Table;
40+
import org.apache.iceberg.TableScan;
41+
import org.apache.iceberg.types.Type;
42+
import org.apache.iceberg.util.TableScanUtil;
43+
44+
import javax.inject.Inject;
45+
import javax.inject.Provider;
46+
47+
import java.util.Collection;
48+
import java.util.HashSet;
49+
import java.util.List;
50+
import java.util.Optional;
51+
import java.util.Set;
52+
import java.util.function.Consumer;
53+
54+
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
55+
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
56+
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
57+
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
58+
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
59+
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
60+
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
61+
import static com.google.common.collect.ImmutableList.toImmutableList;
62+
import static java.util.Objects.requireNonNull;
63+
64+
public class RewriteDataFilesProcedure
65+
implements Provider<Procedure>
66+
{
67+
TypeManager typeManager;
68+
JsonCodec<CommitTaskData> commitTaskCodec;
69+
70+
@Inject
71+
public RewriteDataFilesProcedure(
72+
TypeManager typeManager,
73+
JsonCodec<CommitTaskData> commitTaskCodec)
74+
{
75+
this.typeManager = requireNonNull(typeManager, "typeManager is null");
76+
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
77+
}
78+
79+
@Override
80+
public Procedure get()
81+
{
82+
return new DistributedProcedure(
83+
"system",
84+
"rewrite_data_files",
85+
ImmutableList.of(
86+
new Procedure.Argument(SCHEMA, VARCHAR),
87+
new Procedure.Argument(TABLE_NAME, VARCHAR),
88+
new Procedure.Argument("filter", VARCHAR, false, "TRUE"),
89+
new Procedure.Argument("options", "map(varchar, varchar)", false, null)),
90+
(session, transactionContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergTransactionContext) transactionContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
91+
((transactionContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergTransactionContext) transactionContext, tableHandle, fragments)));
92+
}
93+
94+
private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergTransactionContext transactionContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)
95+
{
96+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
97+
Table icebergTable = transactionContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
98+
IcebergTableHandle tableHandle = layoutHandle.getTable();
99+
100+
ConnectorSplitSource splitSource;
101+
if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) {
102+
splitSource = new FixedSplitSource(ImmutableList.of());
103+
}
104+
else {
105+
TupleDomain<IcebergColumnHandle> predicate = layoutHandle.getValidPredicate();
106+
TableScan tableScan = icebergTable.newScan()
107+
.filter(toIcebergExpression(predicate))
108+
.useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get());
109+
110+
Consumer<FileScanTask> fileScanTaskConsumer = (task) -> {
111+
transactionContext.getScannedDataFiles().add(task.file());
112+
if (!task.deletes().isEmpty()) {
113+
task.deletes().forEach(deleteFile -> {
114+
if (deleteFile.content() == FileContent.EQUALITY_DELETES &&
115+
!icebergTable.specs().get(deleteFile.specId()).isPartitioned() &&
116+
!predicate.isAll()) {
117+
// Equality files with an unpartitioned spec are applied as global deletes
118+
// So they should not be cleaned up unless the whole table is optimized
119+
return;
120+
}
121+
transactionContext.getFullyAppliedDeleteFiles().add(deleteFile);
122+
});
123+
}
124+
};
125+
126+
splitSource = new CallDistributedProcedureSplitSource(
127+
session,
128+
tableScan,
129+
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
130+
Optional.of(fileScanTaskConsumer),
131+
getMinimumAssignedSplitWeight(session));
132+
}
133+
transactionContext.setConnectorSplitSource(splitSource);
134+
135+
return new IcebergDistributedProcedureHandle(
136+
tableHandle.getSchemaName(),
137+
tableHandle.getIcebergTableName(),
138+
SchemaParser.toJson(icebergTable.schema()),
139+
PartitionSpecParser.toJson(icebergTable.spec()),
140+
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
141+
icebergTable.location(),
142+
getFileFormat(icebergTable),
143+
icebergTable.properties());
144+
}
145+
}
146+
147+
private void finishCallDistributedProcedure(IcebergTransactionContext transactionContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
148+
{
149+
if (fragments.isEmpty() &&
150+
transactionContext.getScannedDataFiles().isEmpty() &&
151+
transactionContext.getFullyAppliedDeleteFiles().isEmpty()) {
152+
return;
153+
}
154+
155+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
156+
IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle;
157+
Table icebergTable = transactionContext.getTransaction().table();
158+
159+
List<CommitTaskData> commitTasks = fragments.stream()
160+
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
161+
.collect(toImmutableList());
162+
163+
org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
164+
.map(field -> field.transform().getResultType(
165+
icebergTable.schema().findType(field.sourceId())))
166+
.toArray(Type[]::new);
167+
168+
Set<DataFile> newFiles = new HashSet<>();
169+
for (CommitTaskData task : commitTasks) {
170+
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
171+
.withPath(task.getPath())
172+
.withFileSizeInBytes(task.getFileSizeInBytes())
173+
.withFormat(handle.getFileFormat().name())
174+
.withMetrics(task.getMetrics().metrics());
175+
176+
if (!icebergTable.spec().fields().isEmpty()) {
177+
String partitionDataJson = task.getPartitionDataJson()
178+
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
179+
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
180+
}
181+
newFiles.add(builder.build());
182+
}
183+
184+
RewriteFiles rewriteFiles = transactionContext.getTransaction().newRewrite();
185+
Set<DataFile> scannedDataFiles = transactionContext.getScannedDataFiles();
186+
Set<DeleteFile> fullyAppliedDeleteFiles = transactionContext.getFullyAppliedDeleteFiles();
187+
rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of());
188+
189+
// Table.snapshot method returns null if there is no matching snapshot
190+
Snapshot snapshot = requireNonNull(
191+
handle.getTableName()
192+
.getSnapshotId()
193+
.map(icebergTable::snapshot)
194+
.orElse(null),
195+
"snapshot is null");
196+
if (icebergTable.currentSnapshot() != null) {
197+
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
198+
}
199+
rewriteFiles.commit();
200+
}
201+
}
202+
}

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1641,6 +1641,64 @@ public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(Str
16411641
}
16421642
}
16431643

1644+
@Test(dataProvider = "version_and_mode")
1645+
public void testMetadataDeleteOnTableAfterWholeRewriteDataFiles(String version, String mode)
1646+
{
1647+
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
1648+
String tableName = "test_rewrite_data_files_table";
1649+
try {
1650+
// Create a table with partition column `a`, and insert some data under this partition spec
1651+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')");
1652+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
1653+
1654+
// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
1655+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
1656+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
1657+
1658+
// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
1659+
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);
1660+
1661+
// Call procedure rewrite_data_files without filter to rewrite all data files
1662+
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch')", 5);
1663+
1664+
// Then we can do metadata delete on column `c`, because all data files are rewritten under new partition spec
1665+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
1666+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
1667+
}
1668+
finally {
1669+
dropTable(getSession(), tableName);
1670+
}
1671+
}
1672+
1673+
@Test(dataProvider = "version_and_mode")
1674+
public void testMetadataDeleteOnTableAfterPartialRewriteDataFiles(String version, String mode)
1675+
{
1676+
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
1677+
String tableName = "test_rewrite_data_files_table";
1678+
try {
1679+
// Create a table with partition column `a`, and insert some data under this partition spec
1680+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])");
1681+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
1682+
1683+
// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
1684+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
1685+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
1686+
1687+
// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
1688+
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);
1689+
1690+
// Call procedure rewrite_data_files with filter to rewrite data files under the prior partition spec
1691+
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'a in (1, 2)')", 2);
1692+
1693+
// Then we can do metadata delete on column `c`, because all data files are now under new partition spec
1694+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
1695+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
1696+
}
1697+
finally {
1698+
dropTable(getSession(), tableName);
1699+
}
1700+
}
1701+
16441702
@DataProvider(name = "version_and_mode")
16451703
public Object[][] versionAndMode()
16461704
{

0 commit comments

Comments
 (0)