Skip to content

Commit 7ec819c

Browse files
committed
Support Iceberg procedure rewrite_data_files
1 parent 68054be commit 7ec819c

File tree

2 files changed

+713
-0
lines changed

2 files changed

+713
-0
lines changed
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.airlift.json.JsonCodec;
17+
import com.facebook.presto.common.predicate.TupleDomain;
18+
import com.facebook.presto.common.type.TypeManager;
19+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
20+
import com.facebook.presto.spi.ConnectorSession;
21+
import com.facebook.presto.spi.ConnectorSplitSource;
22+
import com.facebook.presto.spi.FixedSplitSource;
23+
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
24+
import com.facebook.presto.spi.procedure.DistributedProcedure;
25+
import com.facebook.presto.spi.procedure.Procedure;
26+
import com.google.common.base.VerifyException;
27+
import com.google.common.collect.ImmutableList;
28+
import com.google.common.collect.ImmutableSet;
29+
import io.airlift.slice.Slice;
30+
import org.apache.iceberg.DataFile;
31+
import org.apache.iceberg.DataFiles;
32+
import org.apache.iceberg.DeleteFile;
33+
import org.apache.iceberg.FileContent;
34+
import org.apache.iceberg.FileScanTask;
35+
import org.apache.iceberg.PartitionSpecParser;
36+
import org.apache.iceberg.RewriteFiles;
37+
import org.apache.iceberg.SchemaParser;
38+
import org.apache.iceberg.Table;
39+
import org.apache.iceberg.TableScan;
40+
import org.apache.iceberg.types.Type;
41+
import org.apache.iceberg.util.TableScanUtil;
42+
43+
import javax.inject.Inject;
44+
import javax.inject.Provider;
45+
46+
import java.util.Collection;
47+
import java.util.HashSet;
48+
import java.util.List;
49+
import java.util.Optional;
50+
import java.util.Set;
51+
import java.util.function.Consumer;
52+
53+
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
54+
import static com.facebook.presto.hive.rule.FilterPushdownUtils.isEntireColumn;
55+
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
56+
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
57+
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
58+
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
59+
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
60+
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
61+
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
62+
import static com.google.common.collect.ImmutableList.toImmutableList;
63+
import static java.util.Objects.requireNonNull;
64+
65+
public class RewriteDataFilesProcedure
66+
implements Provider<Procedure>
67+
{
68+
TypeManager typeManager;
69+
JsonCodec<CommitTaskData> commitTaskCodec;
70+
71+
@Inject
72+
public RewriteDataFilesProcedure(
73+
TypeManager typeManager,
74+
JsonCodec<CommitTaskData> commitTaskCodec)
75+
{
76+
this.typeManager = requireNonNull(typeManager, "typeManager is null");
77+
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
78+
}
79+
80+
@Override
81+
public Procedure get()
82+
{
83+
return new DistributedProcedure(
84+
"system",
85+
"rewrite_data_files",
86+
ImmutableList.of(
87+
new Procedure.Argument(SCHEMA, VARCHAR),
88+
new Procedure.Argument(TABLE_NAME, VARCHAR),
89+
new Procedure.Argument("filter", VARCHAR, false, "TRUE"),
90+
new Procedure.Argument("options", "map(varchar, varchar)", false, null)),
91+
(session, transactionContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergTransactionContext) transactionContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
92+
((transactionContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergTransactionContext) transactionContext, tableHandle, fragments)));
93+
}
94+
95+
private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergTransactionContext transactionContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)
96+
{
97+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
98+
Table icebergTable = transactionContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
99+
IcebergTableHandle tableHandle = layoutHandle.getTable();
100+
101+
ConnectorSplitSource splitSource;
102+
if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) {
103+
splitSource = new FixedSplitSource(ImmutableList.of());
104+
}
105+
else {
106+
TupleDomain<IcebergColumnHandle> predicate = isPushdownFilterEnabled(session) ?
107+
layoutHandle.getPartitionColumnPredicate()
108+
.transform(IcebergColumnHandle.class::cast)
109+
.intersect(layoutHandle.getDomainPredicate()
110+
.transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null)
111+
.transform(layoutHandle.getPredicateColumns()::get)) :
112+
tableHandle.getPredicate();
113+
TableScan tableScan = icebergTable.newScan()
114+
.filter(toIcebergExpression(predicate))
115+
.useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get());
116+
117+
Consumer<FileScanTask> fileScanTaskConsumer = (task) -> {
118+
transactionContext.getScannedDataFiles().add(task.file());
119+
if (!task.deletes().isEmpty()) {
120+
task.deletes().forEach(deleteFile -> {
121+
if (deleteFile.content() == FileContent.EQUALITY_DELETES &&
122+
!icebergTable.specs().get(deleteFile.specId()).isPartitioned() &&
123+
!predicate.isAll()) {
124+
// Equality files with an unpartitioned spec are applied as global deletes
125+
// So they should not be cleaned up unless the whole table is optimized
126+
return;
127+
}
128+
transactionContext.getFullyAppliedDeleteFiles().add(deleteFile);
129+
});
130+
transactionContext.getFullyAppliedDeleteFiles().addAll(task.deletes());
131+
}
132+
};
133+
134+
splitSource = new CallDistributedProcedureSplitSource(
135+
session,
136+
tableScan,
137+
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
138+
Optional.of(fileScanTaskConsumer),
139+
getMinimumAssignedSplitWeight(session));
140+
}
141+
transactionContext.setConnectorSplitSource(splitSource);
142+
143+
return new IcebergDistributedProcedureHandle(
144+
tableHandle.getSchemaName(),
145+
tableHandle.getIcebergTableName(),
146+
SchemaParser.toJson(icebergTable.schema()),
147+
PartitionSpecParser.toJson(icebergTable.spec()),
148+
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
149+
icebergTable.location(),
150+
getFileFormat(icebergTable),
151+
icebergTable.properties());
152+
}
153+
}
154+
155+
private void finishCallDistributedProcedure(IcebergTransactionContext transactionContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
156+
{
157+
if (fragments.isEmpty() &&
158+
transactionContext.getScannedDataFiles().isEmpty() &&
159+
transactionContext.getFullyAppliedDeleteFiles().isEmpty()) {
160+
return;
161+
}
162+
163+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
164+
IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle;
165+
Table icebergTable = transactionContext.getTransaction().table();
166+
167+
List<CommitTaskData> commitTasks = fragments.stream()
168+
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
169+
.collect(toImmutableList());
170+
171+
org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
172+
.map(field -> field.transform().getResultType(
173+
icebergTable.schema().findType(field.sourceId())))
174+
.toArray(Type[]::new);
175+
176+
Set<DataFile> newFiles = new HashSet<>();
177+
for (CommitTaskData task : commitTasks) {
178+
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
179+
.withPath(task.getPath())
180+
.withFileSizeInBytes(task.getFileSizeInBytes())
181+
.withFormat(handle.getFileFormat().name())
182+
.withMetrics(task.getMetrics().metrics());
183+
184+
if (!icebergTable.spec().fields().isEmpty()) {
185+
String partitionDataJson = task.getPartitionDataJson()
186+
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
187+
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
188+
}
189+
newFiles.add(builder.build());
190+
}
191+
192+
RewriteFiles rewriteFiles = transactionContext.getTransaction().newRewrite();
193+
Set<DataFile> scannedDataFiles = transactionContext.getScannedDataFiles();
194+
Set<DeleteFile> fullyAppliedDeleteFiles = transactionContext.getFullyAppliedDeleteFiles();
195+
rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of());
196+
197+
// Table.snapshot method returns null if there is no matching snapshot
198+
if (icebergTable.currentSnapshot() != null) {
199+
rewriteFiles.validateFromSnapshot(icebergTable.currentSnapshot().snapshotId());
200+
}
201+
rewriteFiles.commit();
202+
}
203+
}
204+
}

0 commit comments

Comments
 (0)