Skip to content

Commit 6ec42ed

Browse files
committed
Support Iceberg procedure rewrite_data_files
1 parent 2c19626 commit 6ec42ed

File tree

10 files changed

+946
-10
lines changed

10 files changed

+946
-10
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,47 @@ Examples:
12331233

12341234
CALL iceberg.system.set_table_property('schema_name', 'table_name', 'commit.retry.num-retries', '10');
12351235

1236+
Rewrite Data Files
1237+
^^^^^^^^^^^^^^^^^^
1238+
1239+
Iceberg tracks all data files under different partition specs in a table. More data files requires
1240+
more metadata to be stored in manifest files, and small data files can cause unnecessary amount metadata and
1241+
less efficient queries from file open costs. Also, data files under different partition specs can
1242+
prevent metadata level deletion or thorough predicate push down for Presto.
1243+
1244+
Use `rewrite_data_files` to rewrite the data files of a specified table so that they are
1245+
merged into fewer but larger files under the newest partition spec. If the table is partitioned, the data
1246+
files compaction can act separately on the selected partitions to improve read performance by reducing
1247+
metadata overhead and runtime file open cost.
1248+
1249+
The following arguments are available:
1250+
1251+
===================== ========== =============== =======================================================================
1252+
Argument Name required type Description
1253+
===================== ========== =============== =======================================================================
1254+
``schema`` ✔️ string Schema of the table to update.
1255+
1256+
``table_name`` ✔️ string Name of the table to update.
1257+
1258+
``filter`` string Predicate as a string used for filtering the files. Currently
1259+
only rewrite of whole partitions is supported. Filter on partition
1260+
columns. The default value is `true`.
1261+
1262+
``options`` map Options to be used for data files rewrite. (to be expanded)
1263+
===================== ========== =============== =======================================================================
1264+
1265+
Examples:
1266+
1267+
* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones::
1268+
1269+
CALL iceberg.system.rewrite_data_files('db', 'sample');
1270+
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample');
1271+
1272+
* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec::
1273+
1274+
CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1');
1275+
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1');
1276+
12361277
Presto C++ Support
12371278
^^^^^^^^^^^^^^^^^^
12381279

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ protected void setup(Binder binder)
185185
procedures.addBinding().toProvider(SetTablePropertyProcedure.class).in(Scopes.SINGLETON);
186186
procedures.addBinding().toProvider(StatisticsFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
187187
procedures.addBinding().toProvider(ManifestFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
188+
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);
188189

189190
// for orc
190191
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.airlift.json.JsonCodec;
17+
import com.facebook.presto.common.predicate.TupleDomain;
18+
import com.facebook.presto.common.type.TypeManager;
19+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
20+
import com.facebook.presto.spi.ConnectorSession;
21+
import com.facebook.presto.spi.ConnectorSplitSource;
22+
import com.facebook.presto.spi.FixedSplitSource;
23+
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
24+
import com.facebook.presto.spi.procedure.DistributedProcedure;
25+
import com.facebook.presto.spi.procedure.Procedure.Argument;
26+
import com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure;
27+
import com.google.common.base.VerifyException;
28+
import com.google.common.collect.ImmutableList;
29+
import com.google.common.collect.ImmutableSet;
30+
import io.airlift.slice.Slice;
31+
import org.apache.iceberg.DataFile;
32+
import org.apache.iceberg.DataFiles;
33+
import org.apache.iceberg.DeleteFile;
34+
import org.apache.iceberg.FileContent;
35+
import org.apache.iceberg.FileScanTask;
36+
import org.apache.iceberg.RewriteFiles;
37+
import org.apache.iceberg.Snapshot;
38+
import org.apache.iceberg.Table;
39+
import org.apache.iceberg.TableScan;
40+
import org.apache.iceberg.types.Type;
41+
import org.apache.iceberg.util.TableScanUtil;
42+
43+
import javax.inject.Inject;
44+
import javax.inject.Provider;
45+
46+
import java.util.Collection;
47+
import java.util.HashSet;
48+
import java.util.List;
49+
import java.util.Optional;
50+
import java.util.Set;
51+
import java.util.function.Consumer;
52+
53+
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
54+
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
55+
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
56+
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
57+
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
58+
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
59+
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
60+
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
61+
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA;
62+
import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME;
63+
import static com.google.common.collect.ImmutableList.toImmutableList;
64+
import static java.util.Objects.requireNonNull;
65+
66+
public class RewriteDataFilesProcedure
67+
implements Provider<DistributedProcedure>
68+
{
69+
TypeManager typeManager;
70+
JsonCodec<CommitTaskData> commitTaskCodec;
71+
72+
@Inject
73+
public RewriteDataFilesProcedure(
74+
TypeManager typeManager,
75+
JsonCodec<CommitTaskData> commitTaskCodec)
76+
{
77+
this.typeManager = requireNonNull(typeManager, "typeManager is null");
78+
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
79+
}
80+
81+
@Override
82+
public DistributedProcedure get()
83+
{
84+
return new TableDataRewriteDistributedProcedure(
85+
"system",
86+
"rewrite_data_files",
87+
ImmutableList.of(
88+
new Argument(SCHEMA, VARCHAR),
89+
new Argument(TABLE_NAME, VARCHAR),
90+
new Argument("filter", VARCHAR, false, "TRUE"),
91+
new Argument("options", "map(varchar, varchar)", false, null)),
92+
(session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
93+
((procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergProcedureContext) procedureContext, tableHandle, fragments)),
94+
IcebergProcedureContext::new);
95+
}
96+
97+
private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)
98+
{
99+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
100+
Table icebergTable = procedureContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
101+
IcebergTableHandle tableHandle = layoutHandle.getTable();
102+
103+
ConnectorSplitSource splitSource;
104+
if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) {
105+
splitSource = new FixedSplitSource(ImmutableList.of());
106+
}
107+
else {
108+
TupleDomain<IcebergColumnHandle> predicate = layoutHandle.getValidPredicate();
109+
TableScan tableScan = icebergTable.newScan()
110+
.filter(toIcebergExpression(predicate))
111+
.useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get());
112+
113+
Consumer<FileScanTask> fileScanTaskConsumer = (task) -> {
114+
procedureContext.getScannedDataFiles().add(task.file());
115+
if (!task.deletes().isEmpty()) {
116+
task.deletes().forEach(deleteFile -> {
117+
if (deleteFile.content() == FileContent.EQUALITY_DELETES &&
118+
!icebergTable.specs().get(deleteFile.specId()).isPartitioned() &&
119+
!predicate.isAll()) {
120+
// Equality files with an unpartitioned spec are applied as global deletes
121+
// So they should not be cleaned up unless the whole table is optimized
122+
return;
123+
}
124+
procedureContext.getFullyAppliedDeleteFiles().add(deleteFile);
125+
});
126+
}
127+
};
128+
129+
splitSource = new CallDistributedProcedureSplitSource(
130+
session,
131+
tableScan,
132+
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
133+
Optional.of(fileScanTaskConsumer),
134+
getMinimumAssignedSplitWeight(session));
135+
}
136+
procedureContext.setConnectorSplitSource(splitSource);
137+
138+
return new IcebergDistributedProcedureHandle(
139+
tableHandle.getSchemaName(),
140+
tableHandle.getIcebergTableName(),
141+
toPrestoSchema(icebergTable.schema(), typeManager),
142+
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
143+
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
144+
icebergTable.location(),
145+
getFileFormat(icebergTable),
146+
getCompressionCodec(session),
147+
icebergTable.properties());
148+
}
149+
}
150+
151+
private void finishCallDistributedProcedure(IcebergProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
152+
{
153+
if (fragments.isEmpty() &&
154+
procedureContext.getScannedDataFiles().isEmpty() &&
155+
procedureContext.getFullyAppliedDeleteFiles().isEmpty()) {
156+
return;
157+
}
158+
159+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
160+
IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle;
161+
Table icebergTable = procedureContext.getTransaction().table();
162+
163+
List<CommitTaskData> commitTasks = fragments.stream()
164+
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
165+
.collect(toImmutableList());
166+
167+
org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
168+
.map(field -> field.transform().getResultType(
169+
icebergTable.schema().findType(field.sourceId())))
170+
.toArray(Type[]::new);
171+
172+
Set<DataFile> newFiles = new HashSet<>();
173+
for (CommitTaskData task : commitTasks) {
174+
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
175+
.withPath(task.getPath())
176+
.withFileSizeInBytes(task.getFileSizeInBytes())
177+
.withFormat(handle.getFileFormat().name())
178+
.withMetrics(task.getMetrics().metrics());
179+
180+
if (!icebergTable.spec().fields().isEmpty()) {
181+
String partitionDataJson = task.getPartitionDataJson()
182+
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
183+
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
184+
}
185+
newFiles.add(builder.build());
186+
}
187+
188+
RewriteFiles rewriteFiles = procedureContext.getTransaction().newRewrite();
189+
Set<DataFile> scannedDataFiles = procedureContext.getScannedDataFiles();
190+
Set<DeleteFile> fullyAppliedDeleteFiles = procedureContext.getFullyAppliedDeleteFiles();
191+
rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of());
192+
193+
// Table.snapshot method returns null if there is no matching snapshot
194+
Snapshot snapshot = requireNonNull(
195+
handle.getTableName()
196+
.getSnapshotId()
197+
.map(icebergTable::snapshot)
198+
.orElse(null),
199+
"snapshot is null");
200+
if (icebergTable.currentSnapshot() != null) {
201+
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
202+
}
203+
rewriteFiles.commit();
204+
}
205+
}
206+
}

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import static com.facebook.presto.iceberg.procedure.RegisterTableProcedure.getFileSystem;
7474
import static com.facebook.presto.iceberg.procedure.RegisterTableProcedure.resolveLatestMetadataLocation;
7575
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
76+
import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix;
7677
import static com.google.common.base.Preconditions.checkArgument;
7778
import static com.google.common.collect.Iterables.getOnlyElement;
7879
import static java.lang.String.format;
@@ -2013,6 +2014,66 @@ public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(Str
20132014
}
20142015
}
20152016

2017+
@Test(dataProvider = "version_and_mode")
2018+
public void testMetadataDeleteOnTableAfterWholeRewriteDataFiles(String version, String mode)
2019+
{
2020+
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
2021+
String schemaName = getSession().getSchema().get();
2022+
String tableName = "test_rewrite_data_files_table_" + randomTableSuffix();
2023+
try {
2024+
// Create a table with partition column `a`, and insert some data under this partition spec
2025+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')");
2026+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
2027+
2028+
// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
2029+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
2030+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
2031+
2032+
// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
2033+
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);
2034+
2035+
// Call procedure rewrite_data_files without filter to rewrite all data files
2036+
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => '" + schemaName + "')", 5);
2037+
2038+
// Then we can do metadata delete on column `c`, because all data files are rewritten under new partition spec
2039+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
2040+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
2041+
}
2042+
finally {
2043+
dropTable(getSession(), tableName);
2044+
}
2045+
}
2046+
2047+
@Test(dataProvider = "version_and_mode")
2048+
public void testMetadataDeleteOnTableAfterPartialRewriteDataFiles(String version, String mode)
2049+
{
2050+
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
2051+
String schemaName = getSession().getSchema().get();
2052+
String tableName = "test_rewrite_data_files_table_" + randomTableSuffix();
2053+
try {
2054+
// Create a table with partition column `a`, and insert some data under this partition spec
2055+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])");
2056+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
2057+
2058+
// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
2059+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
2060+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
2061+
2062+
// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
2063+
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);
2064+
2065+
// Call procedure rewrite_data_files with filter to rewrite data files under the prior partition spec
2066+
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => '" + schemaName + "', filter => 'a in (1, 2)')", 2);
2067+
2068+
// Then we can do metadata delete on column `c`, because all data files are now under new partition spec
2069+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
2070+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
2071+
}
2072+
finally {
2073+
dropTable(getSession(), tableName);
2074+
}
2075+
}
2076+
20162077
@DataProvider(name = "version_and_mode")
20172078
public Object[][] versionAndMode()
20182079
{

0 commit comments

Comments
 (0)