Skip to content

Commit befe9a7

Browse files
committed
Support Iceberg procedure rewrite_data_files
1 parent 1c47d5b commit befe9a7

File tree

7 files changed

+901
-2
lines changed

7 files changed

+901
-2
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,47 @@ Examples:
839839

840840
CALL iceberg.system.remove_orphan_files(schema => 'db', table_name => 'sample');
841841

842+
Rewrite Data Files
843+
^^^^^^^^^^^^^^^^^^
844+
845+
Iceberg tracks all data files under different partition specs in a table. More data files requires
846+
more metadata to be stored in manifest files, and small data files can cause unnecessary amount metadata and
847+
less efficient queries from file open costs. Also, data files under different partition specs can
848+
prevent metadata level deletion or thorough predicate push down for Presto.
849+
850+
Use `rewrite_data_files` to rewrite the data files of a specified table so that they are
851+
merged into fewer but larger files under the newest partition spec. If the table is partitioned, the data
852+
files compaction can act separately on the selected partitions to improve read performance by reducing
853+
metadata overhead and runtime file open cost.
854+
855+
The following arguments are available:
856+
857+
===================== ========== =============== =======================================================================
858+
Argument Name required type Description
859+
===================== ========== =============== =======================================================================
860+
``schema`` ✔️ string Schema of the table to update.
861+
862+
``table_name`` ✔️ string Name of the table to update.
863+
864+
``filter`` string Predicate as a string used for filtering the files. Currently
865+
only rewrite of whole partitions is supported. Filter on partition
866+
columns. The default value is `true`.
867+
868+
``options`` map Options to be used for data files rewrite. (to be expanded)
869+
===================== ========== =============== =======================================================================
870+
871+
Examples:
872+
873+
* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones::
874+
875+
CALL iceberg.system.rewrite_data_files('db', 'sample');
876+
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample');
877+
878+
* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec::
879+
880+
CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1');
881+
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1');
882+
842883
SQL Support
843884
-----------
844885

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public void setup(Binder binder)
156156
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
157157
procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON);
158158
procedures.addBinding().toProvider(RemoveOrphanFiles.class).in(Scopes.SINGLETON);
159+
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);
159160

160161
// for orc
161162
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.airlift.json.JsonCodec;
17+
import com.facebook.presto.common.predicate.TupleDomain;
18+
import com.facebook.presto.common.type.TypeManager;
19+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
20+
import com.facebook.presto.spi.ConnectorSession;
21+
import com.facebook.presto.spi.ConnectorSplitSource;
22+
import com.facebook.presto.spi.FixedSplitSource;
23+
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
24+
import com.facebook.presto.spi.procedure.DistributedProcedure;
25+
import com.facebook.presto.spi.procedure.Procedure;
26+
import com.google.common.base.VerifyException;
27+
import com.google.common.collect.ImmutableList;
28+
import com.google.common.collect.ImmutableSet;
29+
import io.airlift.slice.Slice;
30+
import org.apache.iceberg.DataFile;
31+
import org.apache.iceberg.DataFiles;
32+
import org.apache.iceberg.DeleteFile;
33+
import org.apache.iceberg.FileContent;
34+
import org.apache.iceberg.FileScanTask;
35+
import org.apache.iceberg.PartitionSpecParser;
36+
import org.apache.iceberg.RewriteFiles;
37+
import org.apache.iceberg.SchemaParser;
38+
import org.apache.iceberg.Snapshot;
39+
import org.apache.iceberg.Table;
40+
import org.apache.iceberg.TableScan;
41+
import org.apache.iceberg.types.Type;
42+
import org.apache.iceberg.util.TableScanUtil;
43+
44+
import javax.inject.Inject;
45+
import javax.inject.Provider;
46+
47+
import java.util.Collection;
48+
import java.util.HashSet;
49+
import java.util.List;
50+
import java.util.Optional;
51+
import java.util.Set;
52+
import java.util.function.Consumer;
53+
54+
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
55+
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
56+
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
57+
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
58+
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
59+
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
60+
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
61+
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
62+
import static com.google.common.collect.ImmutableList.toImmutableList;
63+
import static java.util.Objects.requireNonNull;
64+
65+
public class RewriteDataFilesProcedure
66+
implements Provider<Procedure>
67+
{
68+
TypeManager typeManager;
69+
JsonCodec<CommitTaskData> commitTaskCodec;
70+
71+
@Inject
72+
public RewriteDataFilesProcedure(
73+
TypeManager typeManager,
74+
JsonCodec<CommitTaskData> commitTaskCodec)
75+
{
76+
this.typeManager = requireNonNull(typeManager, "typeManager is null");
77+
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
78+
}
79+
80+
@Override
81+
public Procedure get()
82+
{
83+
return new DistributedProcedure(
84+
"system",
85+
"rewrite_data_files",
86+
ImmutableList.of(
87+
new Procedure.Argument(SCHEMA, VARCHAR),
88+
new Procedure.Argument(TABLE_NAME, VARCHAR),
89+
new Procedure.Argument("filter", VARCHAR, false, "TRUE"),
90+
new Procedure.Argument("options", "map(varchar, varchar)", false, null)),
91+
(session, transactionContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergTransactionContext) transactionContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
92+
((transactionContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergTransactionContext) transactionContext, tableHandle, fragments)));
93+
}
94+
95+
private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergTransactionContext transactionContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)
96+
{
97+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
98+
Table icebergTable = transactionContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
99+
IcebergTableHandle tableHandle = layoutHandle.getTable();
100+
101+
ConnectorSplitSource splitSource;
102+
if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) {
103+
splitSource = new FixedSplitSource(ImmutableList.of());
104+
}
105+
else {
106+
TupleDomain<IcebergColumnHandle> predicate = layoutHandle.getValidPredicate();
107+
TableScan tableScan = icebergTable.newScan()
108+
.filter(toIcebergExpression(predicate))
109+
.useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get());
110+
111+
Consumer<FileScanTask> fileScanTaskConsumer = (task) -> {
112+
transactionContext.getScannedDataFiles().add(task.file());
113+
if (!task.deletes().isEmpty()) {
114+
task.deletes().forEach(deleteFile -> {
115+
if (deleteFile.content() == FileContent.EQUALITY_DELETES &&
116+
!icebergTable.specs().get(deleteFile.specId()).isPartitioned() &&
117+
!predicate.isAll()) {
118+
// Equality files with an unpartitioned spec are applied as global deletes
119+
// So they should not be cleaned up unless the whole table is optimized
120+
return;
121+
}
122+
transactionContext.getFullyAppliedDeleteFiles().add(deleteFile);
123+
});
124+
}
125+
};
126+
127+
splitSource = new CallDistributedProcedureSplitSource(
128+
session,
129+
tableScan,
130+
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
131+
Optional.of(fileScanTaskConsumer),
132+
getMinimumAssignedSplitWeight(session));
133+
}
134+
transactionContext.setConnectorSplitSource(splitSource);
135+
136+
return new IcebergDistributedProcedureHandle(
137+
tableHandle.getSchemaName(),
138+
tableHandle.getIcebergTableName(),
139+
SchemaParser.toJson(icebergTable.schema()),
140+
PartitionSpecParser.toJson(icebergTable.spec()),
141+
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
142+
icebergTable.location(),
143+
getFileFormat(icebergTable),
144+
getCompressionCodec(session),
145+
icebergTable.properties());
146+
}
147+
}
148+
149+
private void finishCallDistributedProcedure(IcebergTransactionContext transactionContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
150+
{
151+
if (fragments.isEmpty() &&
152+
transactionContext.getScannedDataFiles().isEmpty() &&
153+
transactionContext.getFullyAppliedDeleteFiles().isEmpty()) {
154+
return;
155+
}
156+
157+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
158+
IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle;
159+
Table icebergTable = transactionContext.getTransaction().table();
160+
161+
List<CommitTaskData> commitTasks = fragments.stream()
162+
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
163+
.collect(toImmutableList());
164+
165+
org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
166+
.map(field -> field.transform().getResultType(
167+
icebergTable.schema().findType(field.sourceId())))
168+
.toArray(Type[]::new);
169+
170+
Set<DataFile> newFiles = new HashSet<>();
171+
for (CommitTaskData task : commitTasks) {
172+
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
173+
.withPath(task.getPath())
174+
.withFileSizeInBytes(task.getFileSizeInBytes())
175+
.withFormat(handle.getFileFormat().name())
176+
.withMetrics(task.getMetrics().metrics());
177+
178+
if (!icebergTable.spec().fields().isEmpty()) {
179+
String partitionDataJson = task.getPartitionDataJson()
180+
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
181+
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
182+
}
183+
newFiles.add(builder.build());
184+
}
185+
186+
RewriteFiles rewriteFiles = transactionContext.getTransaction().newRewrite();
187+
Set<DataFile> scannedDataFiles = transactionContext.getScannedDataFiles();
188+
Set<DeleteFile> fullyAppliedDeleteFiles = transactionContext.getFullyAppliedDeleteFiles();
189+
rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of());
190+
191+
// Table.snapshot method returns null if there is no matching snapshot
192+
Snapshot snapshot = requireNonNull(
193+
handle.getTableName()
194+
.getSnapshotId()
195+
.map(icebergTable::snapshot)
196+
.orElse(null),
197+
"snapshot is null");
198+
if (icebergTable.currentSnapshot() != null) {
199+
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
200+
}
201+
rewriteFiles.commit();
202+
}
203+
}
204+
}

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1658,6 +1658,64 @@ public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(Str
16581658
}
16591659
}
16601660

1661+
@Test(dataProvider = "version_and_mode")
1662+
public void testMetadataDeleteOnTableAfterWholeRewriteDataFiles(String version, String mode)
1663+
{
1664+
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
1665+
String tableName = "test_rewrite_data_files_table";
1666+
try {
1667+
// Create a table with partition column `a`, and insert some data under this partition spec
1668+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')");
1669+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
1670+
1671+
// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
1672+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
1673+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
1674+
1675+
// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
1676+
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);
1677+
1678+
// Call procedure rewrite_data_files without filter to rewrite all data files
1679+
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch')", 5);
1680+
1681+
// Then we can do metadata delete on column `c`, because all data files are rewritten under new partition spec
1682+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
1683+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
1684+
}
1685+
finally {
1686+
dropTable(getSession(), tableName);
1687+
}
1688+
}
1689+
1690+
@Test(dataProvider = "version_and_mode")
1691+
public void testMetadataDeleteOnTableAfterPartialRewriteDataFiles(String version, String mode)
1692+
{
1693+
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
1694+
String tableName = "test_rewrite_data_files_table";
1695+
try {
1696+
// Create a table with partition column `a`, and insert some data under this partition spec
1697+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])");
1698+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
1699+
1700+
// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
1701+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
1702+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
1703+
1704+
// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
1705+
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);
1706+
1707+
// Call procedure rewrite_data_files with filter to rewrite data files under the prior partition spec
1708+
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'a in (1, 2)')", 2);
1709+
1710+
// Then we can do metadata delete on column `c`, because all data files are now under new partition spec
1711+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
1712+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
1713+
}
1714+
finally {
1715+
dropTable(getSession(), tableName);
1716+
}
1717+
}
1718+
16611719
@DataProvider(name = "version_and_mode")
16621720
public Object[][] versionAndMode()
16631721
{

0 commit comments

Comments
 (0)