Skip to content

Commit 2dafdde

Browse files
committed
Support Iceberg procedure rewrite_data_files
1 parent e2d00d8 commit 2dafdde

File tree

7 files changed

+872
-8
lines changed

7 files changed

+872
-8
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1225,6 +1225,47 @@ Examples:
12251225

12261226
CALL iceberg.system.set_table_property('schema_name', 'table_name', 'commit.retry.num-retries', '10');
12271227

1228+
Rewrite Data Files
1229+
^^^^^^^^^^^^^^^^^^
1230+
1231+
Iceberg tracks all data files under different partition specs in a table. More data files requires
1232+
more metadata to be stored in manifest files, and small data files can cause unnecessary amount metadata and
1233+
less efficient queries from file open costs. Also, data files under different partition specs can
1234+
prevent metadata level deletion or thorough predicate push down for Presto.
1235+
1236+
Use `rewrite_data_files` to rewrite the data files of a specified table so that they are
1237+
merged into fewer but larger files under the newest partition spec. If the table is partitioned, the data
1238+
files compaction can act separately on the selected partitions to improve read performance by reducing
1239+
metadata overhead and runtime file open cost.
1240+
1241+
The following arguments are available:
1242+
1243+
===================== ========== =============== =======================================================================
1244+
Argument Name required type Description
1245+
===================== ========== =============== =======================================================================
1246+
``schema`` ✔️ string Schema of the table to update.
1247+
1248+
``table_name`` ✔️ string Name of the table to update.
1249+
1250+
``filter`` string Predicate as a string used for filtering the files. Currently
1251+
only rewrite of whole partitions is supported. Filter on partition
1252+
columns. The default value is `true`.
1253+
1254+
``options`` map Options to be used for data files rewrite. (to be expanded)
1255+
===================== ========== =============== =======================================================================
1256+
1257+
Examples:
1258+
1259+
* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones::
1260+
1261+
CALL iceberg.system.rewrite_data_files('db', 'sample');
1262+
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample');
1263+
1264+
* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec::
1265+
1266+
CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1');
1267+
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1');
1268+
12281269
Presto C++ Support
12291270
^^^^^^^^^^^^^^^^^^
12301271

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ protected void setup(Binder binder)
185185
procedures.addBinding().toProvider(SetTablePropertyProcedure.class).in(Scopes.SINGLETON);
186186
procedures.addBinding().toProvider(StatisticsFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
187187
procedures.addBinding().toProvider(ManifestFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
188+
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);
188189

189190
// for orc
190191
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.airlift.json.JsonCodec;
17+
import com.facebook.presto.common.predicate.TupleDomain;
18+
import com.facebook.presto.common.type.TypeManager;
19+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
20+
import com.facebook.presto.spi.ConnectorSession;
21+
import com.facebook.presto.spi.ConnectorSplitSource;
22+
import com.facebook.presto.spi.FixedSplitSource;
23+
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
24+
import com.facebook.presto.spi.procedure.DistributedProcedure;
25+
import com.facebook.presto.spi.procedure.Procedure;
26+
import com.google.common.base.VerifyException;
27+
import com.google.common.collect.ImmutableList;
28+
import com.google.common.collect.ImmutableSet;
29+
import io.airlift.slice.Slice;
30+
import org.apache.iceberg.DataFile;
31+
import org.apache.iceberg.DataFiles;
32+
import org.apache.iceberg.DeleteFile;
33+
import org.apache.iceberg.FileContent;
34+
import org.apache.iceberg.FileScanTask;
35+
import org.apache.iceberg.RewriteFiles;
36+
import org.apache.iceberg.Snapshot;
37+
import org.apache.iceberg.Table;
38+
import org.apache.iceberg.TableScan;
39+
import org.apache.iceberg.types.Type;
40+
import org.apache.iceberg.util.TableScanUtil;
41+
42+
import javax.inject.Inject;
43+
import javax.inject.Provider;
44+
45+
import java.util.Collection;
46+
import java.util.HashSet;
47+
import java.util.List;
48+
import java.util.Optional;
49+
import java.util.Set;
50+
import java.util.function.Consumer;
51+
52+
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
53+
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
54+
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
55+
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
56+
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
57+
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
58+
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
59+
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
60+
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
61+
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
62+
import static com.google.common.collect.ImmutableList.toImmutableList;
63+
import static java.util.Objects.requireNonNull;
64+
65+
public class RewriteDataFilesProcedure
66+
implements Provider<Procedure>
67+
{
68+
TypeManager typeManager;
69+
JsonCodec<CommitTaskData> commitTaskCodec;
70+
71+
@Inject
72+
public RewriteDataFilesProcedure(
73+
TypeManager typeManager,
74+
JsonCodec<CommitTaskData> commitTaskCodec)
75+
{
76+
this.typeManager = requireNonNull(typeManager, "typeManager is null");
77+
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
78+
}
79+
80+
@Override
81+
public Procedure get()
82+
{
83+
return new DistributedProcedure(
84+
"system",
85+
"rewrite_data_files",
86+
ImmutableList.of(
87+
new Procedure.Argument(SCHEMA, VARCHAR),
88+
new Procedure.Argument(TABLE_NAME, VARCHAR),
89+
new Procedure.Argument("filter", VARCHAR, false, "TRUE"),
90+
new Procedure.Argument("options", "map(varchar, varchar)", false, null)),
91+
(session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
92+
((procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergProcedureContext) procedureContext, tableHandle, fragments)));
93+
}
94+
95+
private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)
96+
{
97+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
98+
Table icebergTable = procedureContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
99+
IcebergTableHandle tableHandle = layoutHandle.getTable();
100+
101+
ConnectorSplitSource splitSource;
102+
if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) {
103+
splitSource = new FixedSplitSource(ImmutableList.of());
104+
}
105+
else {
106+
TupleDomain<IcebergColumnHandle> predicate = layoutHandle.getValidPredicate();
107+
TableScan tableScan = icebergTable.newScan()
108+
.filter(toIcebergExpression(predicate))
109+
.useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get());
110+
111+
Consumer<FileScanTask> fileScanTaskConsumer = (task) -> {
112+
procedureContext.getScannedDataFiles().add(task.file());
113+
if (!task.deletes().isEmpty()) {
114+
task.deletes().forEach(deleteFile -> {
115+
if (deleteFile.content() == FileContent.EQUALITY_DELETES &&
116+
!icebergTable.specs().get(deleteFile.specId()).isPartitioned() &&
117+
!predicate.isAll()) {
118+
// Equality files with an unpartitioned spec are applied as global deletes
119+
// So they should not be cleaned up unless the whole table is optimized
120+
return;
121+
}
122+
procedureContext.getFullyAppliedDeleteFiles().add(deleteFile);
123+
});
124+
}
125+
};
126+
127+
splitSource = new CallDistributedProcedureSplitSource(
128+
session,
129+
tableScan,
130+
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
131+
Optional.of(fileScanTaskConsumer),
132+
getMinimumAssignedSplitWeight(session));
133+
}
134+
procedureContext.setConnectorSplitSource(splitSource);
135+
136+
return new IcebergDistributedProcedureHandle(
137+
tableHandle.getSchemaName(),
138+
tableHandle.getIcebergTableName(),
139+
toPrestoSchema(icebergTable.schema(), typeManager),
140+
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
141+
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
142+
icebergTable.location(),
143+
getFileFormat(icebergTable),
144+
getCompressionCodec(session),
145+
icebergTable.properties());
146+
}
147+
}
148+
149+
private void finishCallDistributedProcedure(IcebergProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
150+
{
151+
if (fragments.isEmpty() &&
152+
procedureContext.getScannedDataFiles().isEmpty() &&
153+
procedureContext.getFullyAppliedDeleteFiles().isEmpty()) {
154+
return;
155+
}
156+
157+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
158+
IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle;
159+
Table icebergTable = procedureContext.getTransaction().table();
160+
161+
List<CommitTaskData> commitTasks = fragments.stream()
162+
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
163+
.collect(toImmutableList());
164+
165+
org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
166+
.map(field -> field.transform().getResultType(
167+
icebergTable.schema().findType(field.sourceId())))
168+
.toArray(Type[]::new);
169+
170+
Set<DataFile> newFiles = new HashSet<>();
171+
for (CommitTaskData task : commitTasks) {
172+
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
173+
.withPath(task.getPath())
174+
.withFileSizeInBytes(task.getFileSizeInBytes())
175+
.withFormat(handle.getFileFormat().name())
176+
.withMetrics(task.getMetrics().metrics());
177+
178+
if (!icebergTable.spec().fields().isEmpty()) {
179+
String partitionDataJson = task.getPartitionDataJson()
180+
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
181+
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
182+
}
183+
newFiles.add(builder.build());
184+
}
185+
186+
RewriteFiles rewriteFiles = procedureContext.getTransaction().newRewrite();
187+
Set<DataFile> scannedDataFiles = procedureContext.getScannedDataFiles();
188+
Set<DeleteFile> fullyAppliedDeleteFiles = procedureContext.getFullyAppliedDeleteFiles();
189+
rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of());
190+
191+
// Table.snapshot method returns null if there is no matching snapshot
192+
Snapshot snapshot = requireNonNull(
193+
handle.getTableName()
194+
.getSnapshotId()
195+
.map(icebergTable::snapshot)
196+
.orElse(null),
197+
"snapshot is null");
198+
if (icebergTable.currentSnapshot() != null) {
199+
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
200+
}
201+
rewriteFiles.commit();
202+
}
203+
}
204+
}

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1978,6 +1978,66 @@ public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(Str
19781978
}
19791979
}
19801980

1981+
@Test(dataProvider = "version_and_mode")
1982+
public void testMetadataDeleteOnTableAfterWholeRewriteDataFiles(String version, String mode)
1983+
{
1984+
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
1985+
String schemaName = getSession().getSchema().get();
1986+
String tableName = "test_rewrite_data_files_table";
1987+
try {
1988+
// Create a table with partition column `a`, and insert some data under this partition spec
1989+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')");
1990+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
1991+
1992+
// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
1993+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
1994+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
1995+
1996+
// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
1997+
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);
1998+
1999+
// Call procedure rewrite_data_files without filter to rewrite all data files
2000+
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => '" + schemaName + "')", 5);
2001+
2002+
// Then we can do metadata delete on column `c`, because all data files are rewritten under new partition spec
2003+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
2004+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
2005+
}
2006+
finally {
2007+
dropTable(getSession(), tableName);
2008+
}
2009+
}
2010+
2011+
@Test(dataProvider = "version_and_mode")
2012+
public void testMetadataDeleteOnTableAfterPartialRewriteDataFiles(String version, String mode)
2013+
{
2014+
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
2015+
String schemaName = getSession().getSchema().get();
2016+
String tableName = "test_rewrite_data_files_table";
2017+
try {
2018+
// Create a table with partition column `a`, and insert some data under this partition spec
2019+
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])");
2020+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
2021+
2022+
// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
2023+
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
2024+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
2025+
2026+
// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
2027+
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);
2028+
2029+
// Call procedure rewrite_data_files with filter to rewrite data files under the prior partition spec
2030+
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => '" + schemaName + "', filter => 'a in (1, 2)')", 2);
2031+
2032+
// Then we can do metadata delete on column `c`, because all data files are now under new partition spec
2033+
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
2034+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
2035+
}
2036+
finally {
2037+
dropTable(getSession(), tableName);
2038+
}
2039+
}
2040+
19812041
@DataProvider(name = "version_and_mode")
19822042
public Object[][] versionAndMode()
19832043
{

0 commit comments

Comments
 (0)