Skip to content

Commit c3eaa96

Browse files
committed
Support Iceberg procedure rewrite_data_files
1 parent 6de611d commit c3eaa96

File tree

7 files changed

+899
-2
lines changed

7 files changed

+899
-2
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,47 @@ Examples:
10001000

10011001
CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]);
10021002

1003+
Rewrite data files
1004+
^^^^^^^^^^^^^^^^^^
1005+
1006+
In Iceberg, we tracks all data files under different partition specs in a table. More data files leads to
1007+
more metadata stored in manifest files, and small data files causes an unnecessary amount of metadata and
1008+
less efficient queries from file open costs. Besides, data files under different partition specs might
1009+
prevent metadata level deletion or predicate thoroughly push down for Presto engine.
1010+
1011+
Procedure `rewrite_data_files` is used for rewriting the data files of the specified table so that they are
1012+
merged into fewer but larger files under the newest partition spec. If the table is partitioned, the data
1013+
files compaction could act separately on the selected partitions. This can improve read performance by reducing
1014+
metadata overhead and runtime file open cost.
1015+
1016+
The following arguments are available:
1017+
1018+
===================== ========== =============== =======================================================================
1019+
Argument Name required type Description
1020+
===================== ========== =============== =======================================================================
1021+
``schema`` ✔️ string Schema of the table to update
1022+
1023+
``table_name`` ✔️ string Name of the table to update
1024+
1025+
``filter`` string predicate as a string used for filtering the files. Note that currently
1026+
we only support rewrite whole partitions, that is, the filter should be
1027+
on partition columns(defaults to `true`)
1028+
1029+
``options`` map Options to be used for data files rewrite(to be expanded)
1030+
===================== ========== =============== =======================================================================
1031+
1032+
Examples:
1033+
1034+
* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones::
1035+
1036+
CALL iceberg.system.rewrite_data_files('db', 'sample');
1037+
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample');
1038+
1039+
* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec::
1040+
1041+
CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1');
1042+
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1');
1043+
10031044
Schema Evolution
10041045
-----------------
10051046

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ public void setup(Binder binder)
154154
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
155155
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
156156
procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON);
157+
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);
157158

158159
// for orc
159160
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.airlift.json.JsonCodec;
17+
import com.facebook.presto.common.predicate.TupleDomain;
18+
import com.facebook.presto.common.type.TypeManager;
19+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
20+
import com.facebook.presto.spi.ConnectorSession;
21+
import com.facebook.presto.spi.ConnectorSplitSource;
22+
import com.facebook.presto.spi.FixedSplitSource;
23+
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
24+
import com.facebook.presto.spi.procedure.DistributedProcedure;
25+
import com.facebook.presto.spi.procedure.Procedure;
26+
import com.google.common.base.VerifyException;
27+
import com.google.common.collect.ImmutableList;
28+
import com.google.common.collect.ImmutableSet;
29+
import io.airlift.slice.Slice;
30+
import org.apache.iceberg.DataFile;
31+
import org.apache.iceberg.DataFiles;
32+
import org.apache.iceberg.DeleteFile;
33+
import org.apache.iceberg.FileContent;
34+
import org.apache.iceberg.FileScanTask;
35+
import org.apache.iceberg.PartitionSpecParser;
36+
import org.apache.iceberg.RewriteFiles;
37+
import org.apache.iceberg.SchemaParser;
38+
import org.apache.iceberg.Snapshot;
39+
import org.apache.iceberg.Table;
40+
import org.apache.iceberg.TableScan;
41+
import org.apache.iceberg.types.Type;
42+
import org.apache.iceberg.util.TableScanUtil;
43+
44+
import javax.inject.Inject;
45+
import javax.inject.Provider;
46+
47+
import java.util.Collection;
48+
import java.util.HashSet;
49+
import java.util.List;
50+
import java.util.Optional;
51+
import java.util.Set;
52+
import java.util.function.Consumer;
53+
54+
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
55+
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
56+
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
57+
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
58+
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
59+
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
60+
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
61+
import static com.google.common.collect.ImmutableList.toImmutableList;
62+
import static java.util.Objects.requireNonNull;
63+
64+
public class RewriteDataFilesProcedure
65+
implements Provider<Procedure>
66+
{
67+
TypeManager typeManager;
68+
JsonCodec<CommitTaskData> commitTaskCodec;
69+
70+
@Inject
71+
public RewriteDataFilesProcedure(
72+
TypeManager typeManager,
73+
JsonCodec<CommitTaskData> commitTaskCodec)
74+
{
75+
this.typeManager = requireNonNull(typeManager, "typeManager is null");
76+
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
77+
}
78+
79+
@Override
80+
public Procedure get()
81+
{
82+
return new DistributedProcedure(
83+
"system",
84+
"rewrite_data_files",
85+
ImmutableList.of(
86+
new Procedure.Argument(SCHEMA, VARCHAR),
87+
new Procedure.Argument(TABLE_NAME, VARCHAR),
88+
new Procedure.Argument("filter", VARCHAR, false, "TRUE"),
89+
new Procedure.Argument("options", "map(varchar, varchar)", false, null)),
90+
(session, transactionContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergTransactionContext) transactionContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
91+
((transactionContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergTransactionContext) transactionContext, tableHandle, fragments)));
92+
}
93+
94+
private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergTransactionContext transactionContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)
95+
{
96+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
97+
Table icebergTable = transactionContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
98+
IcebergTableHandle tableHandle = layoutHandle.getTable();
99+
100+
ConnectorSplitSource splitSource;
101+
if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) {
102+
splitSource = new FixedSplitSource(ImmutableList.of());
103+
}
104+
else {
105+
TupleDomain<IcebergColumnHandle> predicate = layoutHandle.getValidPredicate();
106+
TableScan tableScan = icebergTable.newScan()
107+
.filter(toIcebergExpression(predicate))
108+
.useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get());
109+
110+
Consumer<FileScanTask> fileScanTaskConsumer = (task) -> {
111+
transactionContext.getScannedDataFiles().add(task.file());
112+
if (!task.deletes().isEmpty()) {
113+
task.deletes().forEach(deleteFile -> {
114+
if (deleteFile.content() == FileContent.EQUALITY_DELETES &&
115+
!icebergTable.specs().get(deleteFile.specId()).isPartitioned() &&
116+
!predicate.isAll()) {
117+
// Equality files with an unpartitioned spec are applied as global deletes
118+
// So they should not be cleaned up unless the whole table is optimized
119+
return;
120+
}
121+
transactionContext.getFullyAppliedDeleteFiles().add(deleteFile);
122+
});
123+
transactionContext.getFullyAppliedDeleteFiles().addAll(task.deletes());
124+
}
125+
};
126+
127+
splitSource = new CallDistributedProcedureSplitSource(
128+
session,
129+
tableScan,
130+
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
131+
Optional.of(fileScanTaskConsumer),
132+
getMinimumAssignedSplitWeight(session));
133+
}
134+
transactionContext.setConnectorSplitSource(splitSource);
135+
136+
return new IcebergDistributedProcedureHandle(
137+
tableHandle.getSchemaName(),
138+
tableHandle.getIcebergTableName(),
139+
SchemaParser.toJson(icebergTable.schema()),
140+
PartitionSpecParser.toJson(icebergTable.spec()),
141+
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
142+
icebergTable.location(),
143+
getFileFormat(icebergTable),
144+
icebergTable.properties());
145+
}
146+
}
147+
148+
private void finishCallDistributedProcedure(IcebergTransactionContext transactionContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
149+
{
150+
if (fragments.isEmpty() &&
151+
transactionContext.getScannedDataFiles().isEmpty() &&
152+
transactionContext.getFullyAppliedDeleteFiles().isEmpty()) {
153+
return;
154+
}
155+
156+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
157+
IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle;
158+
Table icebergTable = transactionContext.getTransaction().table();
159+
160+
List<CommitTaskData> commitTasks = fragments.stream()
161+
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
162+
.collect(toImmutableList());
163+
164+
org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
165+
.map(field -> field.transform().getResultType(
166+
icebergTable.schema().findType(field.sourceId())))
167+
.toArray(Type[]::new);
168+
169+
Set<DataFile> newFiles = new HashSet<>();
170+
for (CommitTaskData task : commitTasks) {
171+
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
172+
.withPath(task.getPath())
173+
.withFileSizeInBytes(task.getFileSizeInBytes())
174+
.withFormat(handle.getFileFormat().name())
175+
.withMetrics(task.getMetrics().metrics());
176+
177+
if (!icebergTable.spec().fields().isEmpty()) {
178+
String partitionDataJson = task.getPartitionDataJson()
179+
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
180+
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
181+
}
182+
newFiles.add(builder.build());
183+
}
184+
185+
RewriteFiles rewriteFiles = transactionContext.getTransaction().newRewrite();
186+
Set<DataFile> scannedDataFiles = transactionContext.getScannedDataFiles();
187+
Set<DeleteFile> fullyAppliedDeleteFiles = transactionContext.getFullyAppliedDeleteFiles();
188+
rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of());
189+
190+
// Table.snapshot method returns null if there is no matching snapshot
191+
Snapshot snapshot = requireNonNull(
192+
handle.getTableName()
193+
.getSnapshotId()
194+
.map(icebergTable::snapshot)
195+
.orElse(null),
196+
"snapshot is null");
197+
if (icebergTable.currentSnapshot() != null) {
198+
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
199+
}
200+
rewriteFiles.commit();
201+
}
202+
}
203+
}

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1640,6 +1640,64 @@ public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(Str
16401640
}
16411641
}
16421642

1643+
@Test(dataProvider = "version_and_mode")
1644+
public void testMetadataDeleteOnTableAfterWholeRewriteDataFiles(String version, String mode)
1645+
{
1646+
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
1647+
String tableName = "test_rewrite_data_files_table";
1648+
try {
1649+
// Create a table with partition column `a`, and insert some data under this partition spec
1650+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')");
1651+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
1652+
1653+
// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
1654+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
1655+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
1656+
1657+
// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
1658+
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);
1659+
1660+
// Call procedure rewrite_data_files without filter to rewrite all data files
1661+
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch')", 5);
1662+
1663+
// Then we can do metadata delete on column `c`, because all data files are rewritten under new partition spec
1664+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
1665+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
1666+
}
1667+
finally {
1668+
dropTable(getSession(), tableName);
1669+
}
1670+
}
1671+
1672+
@Test(dataProvider = "version_and_mode")
1673+
public void testMetadataDeleteOnTableAfterPartialRewriteDataFiles(String version, String mode)
1674+
{
1675+
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
1676+
String tableName = "test_rewrite_data_files_table";
1677+
try {
1678+
// Create a table with partition column `a`, and insert some data under this partition spec
1679+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])");
1680+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
1681+
1682+
// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
1683+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
1684+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
1685+
1686+
// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
1687+
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);
1688+
1689+
// Call procedure rewrite_data_files with filter to rewrite data files under the prior partition spec
1690+
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'a in (1, 2)')", 2);
1691+
1692+
// Then we can do metadata delete on column `c`, because all data files are now under new partition spec
1693+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
1694+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
1695+
}
1696+
finally {
1697+
dropTable(getSession(), tableName);
1698+
}
1699+
}
1700+
16431701
@DataProvider(name = "version_and_mode")
16441702
public Object[][] versionAndMode()
16451703
{

0 commit comments

Comments
 (0)