Skip to content

Commit 68054be

Browse files
committed
Refactor Iceberg connector to support call distributed procedure
1 parent bffc17e commit 68054be

13 files changed

+475
-21
lines changed
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.presto.iceberg.delete.DeleteFile;
17+
import com.facebook.presto.spi.ConnectorSession;
18+
import com.facebook.presto.spi.ConnectorSplit;
19+
import com.facebook.presto.spi.ConnectorSplitSource;
20+
import com.facebook.presto.spi.SplitWeight;
21+
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
22+
import com.google.common.collect.ImmutableList;
23+
import com.google.common.io.Closer;
24+
import org.apache.iceberg.FileScanTask;
25+
import org.apache.iceberg.PartitionSpec;
26+
import org.apache.iceberg.PartitionSpecParser;
27+
import org.apache.iceberg.TableScan;
28+
import org.apache.iceberg.io.CloseableIterable;
29+
import org.apache.iceberg.io.CloseableIterator;
30+
31+
import java.io.IOException;
32+
import java.io.UncheckedIOException;
33+
import java.util.ArrayList;
34+
import java.util.Iterator;
35+
import java.util.List;
36+
import java.util.Optional;
37+
import java.util.concurrent.CompletableFuture;
38+
import java.util.function.Consumer;
39+
40+
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
41+
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
42+
import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
43+
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
44+
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
45+
import static com.google.common.collect.ImmutableList.toImmutableList;
46+
import static com.google.common.collect.Iterators.limit;
47+
import static java.util.Objects.requireNonNull;
48+
import static java.util.concurrent.CompletableFuture.completedFuture;
49+
50+
public class CallDistributedProcedureSplitSource
51+
implements ConnectorSplitSource
52+
{
53+
private CloseableIterator<FileScanTask> fileScanTaskIterator;
54+
private Optional<Consumer<FileScanTask>> fileScanTaskConsumer;
55+
56+
private final TableScan tableScan;
57+
private final Closer closer = Closer.create();
58+
private final double minimumAssignedSplitWeight;
59+
private final ConnectorSession session;
60+
61+
public CallDistributedProcedureSplitSource(
62+
ConnectorSession session,
63+
TableScan tableScan,
64+
CloseableIterable<FileScanTask> fileScanTaskIterable,
65+
Optional<Consumer<FileScanTask>> fileScanTaskConsumer,
66+
double minimumAssignedSplitWeight)
67+
{
68+
this.session = requireNonNull(session, "session is null");
69+
this.tableScan = requireNonNull(tableScan, "tableScan is null");
70+
this.fileScanTaskIterator = fileScanTaskIterable.iterator();
71+
this.fileScanTaskConsumer = requireNonNull(fileScanTaskConsumer, "fileScanTaskConsumer is null");
72+
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
73+
closer.register(fileScanTaskIterator);
74+
}
75+
76+
@Override
77+
public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
78+
{
79+
// TODO: move this to a background thread
80+
List<ConnectorSplit> splits = new ArrayList<>();
81+
Iterator<FileScanTask> iterator = limit(fileScanTaskIterator, maxSize);
82+
while (iterator.hasNext()) {
83+
FileScanTask task = iterator.next();
84+
fileScanTaskConsumer.ifPresent(consumer -> consumer.accept(task));
85+
splits.add(toIcebergSplit(task));
86+
}
87+
return completedFuture(new ConnectorSplitBatch(splits, isFinished()));
88+
}
89+
90+
@Override
91+
public boolean isFinished()
92+
{
93+
return !fileScanTaskIterator.hasNext();
94+
}
95+
96+
@Override
97+
public void close()
98+
{
99+
try {
100+
closer.close();
101+
// TODO: remove this after org.apache.iceberg.io.CloseableIterator'withClose
102+
// correct release resources holds by iterator.
103+
fileScanTaskIterator = CloseableIterator.empty();
104+
}
105+
catch (IOException e) {
106+
throw new UncheckedIOException(e);
107+
}
108+
}
109+
110+
private ConnectorSplit toIcebergSplit(FileScanTask task)
111+
{
112+
PartitionSpec spec = task.spec();
113+
Optional<PartitionData> partitionData = partitionDataFromStructLike(spec, task.file().partition());
114+
115+
// TODO: We should leverage residual expression and convert that to TupleDomain.
116+
// The predicate here is used by readers for predicate push down at reader level,
117+
// so when we do not use residual expression, we are just wasting CPU cycles
118+
// on reader side evaluating a condition that we know will always be true.
119+
120+
return new IcebergSplit(
121+
task.file().path().toString(),
122+
task.start(),
123+
task.length(),
124+
fromIcebergFileFormat(task.file().format()),
125+
ImmutableList.of(),
126+
getPartitionKeys(task),
127+
PartitionSpecParser.toJson(spec),
128+
partitionData.map(PartitionData::toJson),
129+
getNodeSelectionStrategy(session),
130+
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
131+
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
132+
Optional.empty(),
133+
getDataSequenceNumber(task.file()));
134+
}
135+
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.airlift.json.JsonCodec;
1717
import com.facebook.airlift.log.Logger;
18+
import com.facebook.presto.common.QualifiedObjectName;
1819
import com.facebook.presto.common.Subfield;
1920
import com.facebook.presto.common.predicate.TupleDomain;
2021
import com.facebook.presto.common.type.BigintType;
@@ -27,10 +28,13 @@
2728
import com.facebook.presto.iceberg.changelog.ChangelogUtil;
2829
import com.facebook.presto.spi.ColumnHandle;
2930
import com.facebook.presto.spi.ColumnMetadata;
31+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
32+
import com.facebook.presto.spi.ConnectorId;
3033
import com.facebook.presto.spi.ConnectorInsertTableHandle;
3134
import com.facebook.presto.spi.ConnectorNewTableLayout;
3235
import com.facebook.presto.spi.ConnectorOutputTableHandle;
3336
import com.facebook.presto.spi.ConnectorSession;
37+
import com.facebook.presto.spi.ConnectorSplitSource;
3438
import com.facebook.presto.spi.ConnectorTableHandle;
3539
import com.facebook.presto.spi.ConnectorTableLayout;
3640
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
@@ -47,6 +51,9 @@
4751
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
4852
import com.facebook.presto.spi.connector.ConnectorTableVersion;
4953
import com.facebook.presto.spi.function.StandardFunctionResolution;
54+
import com.facebook.presto.spi.procedure.DistributedProcedure;
55+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
56+
import com.facebook.presto.spi.procedure.Procedure;
5057
import com.facebook.presto.spi.relation.RowExpression;
5158
import com.facebook.presto.spi.relation.RowExpressionService;
5259
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
@@ -70,6 +77,7 @@
7077
import org.apache.iceberg.FileFormat;
7178
import org.apache.iceberg.FileMetadata;
7279
import org.apache.iceberg.FileScanTask;
80+
import org.apache.iceberg.ManifestFile;
7381
import org.apache.iceberg.PartitionField;
7482
import org.apache.iceberg.PartitionSpec;
7583
import org.apache.iceberg.PartitionSpecParser;
@@ -161,23 +169,26 @@ public abstract class IcebergAbstractMetadata
161169
implements ConnectorMetadata
162170
{
163171
protected final TypeManager typeManager;
172+
protected final IProcedureRegistry procedureRegistry;
164173
protected final JsonCodec<CommitTaskData> commitTaskCodec;
165174
protected final NodeVersion nodeVersion;
166175
protected final RowExpressionService rowExpressionService;
167-
protected Transaction transaction;
176+
protected IcebergTransactionContext transactionContext;
168177

169178
private final StandardFunctionResolution functionResolution;
170179
private final ConcurrentMap<SchemaTableName, Table> icebergTables = new ConcurrentHashMap<>();
171180
private static final Logger log = Logger.get(IcebergAbstractMetadata.class);
172181

173182
public IcebergAbstractMetadata(
174183
TypeManager typeManager,
184+
IProcedureRegistry procedureRegistry,
175185
StandardFunctionResolution functionResolution,
176186
RowExpressionService rowExpressionService,
177187
JsonCodec<CommitTaskData> commitTaskCodec,
178188
NodeVersion nodeVersion)
179189
{
180190
this.typeManager = requireNonNull(typeManager, "typeManager is null");
191+
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
181192
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
182193
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
183194
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
@@ -199,6 +210,11 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName
199210

200211
protected abstract void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName);
201212

213+
public Optional<ConnectorSplitSource> getSplitSourceInCurrentCallProcedureTransaction()
214+
{
215+
return transactionContext == null ? Optional.empty() : transactionContext.getConnectorSplitSource();
216+
}
217+
202218
/**
203219
* This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
204220
*/
@@ -420,7 +436,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
420436

421437
protected ConnectorInsertTableHandle beginIcebergTableInsert(IcebergTableHandle table, Table icebergTable)
422438
{
423-
transaction = icebergTable.newTransaction();
439+
transactionContext = new IcebergTransactionContext(Optional.of(icebergTable), icebergTable.newTransaction());
424440

425441
return new IcebergWritableTableHandle(
426442
table.getSchemaName(),
@@ -437,12 +453,13 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(IcebergTableHandle
437453
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
438454
{
439455
if (fragments.isEmpty()) {
440-
transaction.commitTransaction();
456+
transactionContext.getTransaction().commitTransaction();
457+
transactionContext.destroy();
441458
return Optional.empty();
442459
}
443460

444461
IcebergWritableTableHandle table = (IcebergWritableTableHandle) insertHandle;
445-
Table icebergTable = transaction.table();
462+
Table icebergTable = transactionContext.getTransaction().table();
446463

447464
List<CommitTaskData> commitTasks = fragments.stream()
448465
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
@@ -453,7 +470,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
453470
icebergTable.schema().findType(field.sourceId())))
454471
.toArray(Type[]::new);
455472

456-
AppendFiles appendFiles = transaction.newFastAppend();
473+
AppendFiles appendFiles = transactionContext.getTransaction().newFastAppend();
457474
for (CommitTaskData task : commitTasks) {
458475
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
459476
.withPath(task.getPath())
@@ -471,7 +488,8 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
471488
}
472489

473490
appendFiles.commit();
474-
transaction.commitTransaction();
491+
transactionContext.getTransaction().commitTransaction();
492+
transactionContext.destroy();
475493

476494
return Optional.of(new HiveWrittenPartitions(commitTasks.stream()
477495
.map(CommitTaskData::getPath)
@@ -762,6 +780,44 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
762780
}
763781
}
764782

783+
@Override
784+
public ConnectorDistributedProcedureHandle beginCallDistributedProcedure(
785+
ConnectorSession session,
786+
QualifiedObjectName procedureName,
787+
ConnectorTableLayoutHandle tableLayoutHandle,
788+
Object[] arguments)
789+
{
790+
IcebergTableHandle handle = ((IcebergTableLayoutHandle) tableLayoutHandle).getTable();
791+
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
792+
793+
if (handle.isSnapshotSpecified()) {
794+
throw new PrestoException(NOT_SUPPORTED, "This connector do not allow table execute at specified snapshot");
795+
}
796+
797+
transactionContext = new IcebergTransactionContext(Optional.of(icebergTable), icebergTable.newTransaction());
798+
Procedure procedure = procedureRegistry.resolve(
799+
new ConnectorId(procedureName.getCatalogName()),
800+
new SchemaTableName(
801+
procedureName.getSchemaName(),
802+
procedureName.getObjectName()));
803+
verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure");
804+
return ((DistributedProcedure) procedure).getBeginCallDistributedProcedure().begin(session, transactionContext, tableLayoutHandle, arguments);
805+
}
806+
807+
@Override
808+
public void finishCallDistributedProcedure(ConnectorSession session, ConnectorDistributedProcedureHandle tableHandle, QualifiedObjectName procedureName, Collection<Slice> fragments)
809+
{
810+
Procedure procedure = procedureRegistry.resolve(
811+
new ConnectorId(procedureName.getCatalogName()),
812+
new SchemaTableName(
813+
procedureName.getSchemaName(),
814+
procedureName.getObjectName()));
815+
verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure");
816+
((DistributedProcedure) procedure).getFinishCallDistributedProcedure().finish(transactionContext, tableHandle, fragments);
817+
transactionContext.getTransaction().commitTransaction();
818+
transactionContext.destroy();
819+
}
820+
765821
@Override
766822
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
767823
{
@@ -782,7 +838,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
782838
}
783839

784840
validateTableMode(session, icebergTable);
785-
transaction = icebergTable.newTransaction();
841+
transactionContext = new IcebergTransactionContext(Optional.of(icebergTable), icebergTable.newTransaction());
786842

787843
return handle;
788844
}
@@ -793,7 +849,7 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
793849
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
794850
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
795851

796-
RowDelta rowDelta = transaction.newRowDelta();
852+
RowDelta rowDelta = transactionContext.getTransaction().newRowDelta();
797853

798854
List<CommitTaskData> commitTasks = fragments.stream()
799855
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
@@ -830,7 +886,8 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
830886
}
831887

832888
rowDelta.commit();
833-
transaction.commitTransaction();
889+
transactionContext.getTransaction().commitTransaction();
890+
transactionContext.destroy();
834891
}
835892

836893
@Override
@@ -864,7 +921,15 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa
864921
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
865922

866923
boolean supportsMetadataDelete = true;
867-
for (PartitionSpec spec : icebergTable.specs().values()) {
924+
// Get partition specs that really need to be checked
925+
Set<PartitionSpec> partitionSpecs = handle.getIcebergTableName().getSnapshotId().map(
926+
snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream()
927+
.map(ManifestFile::partitionSpecId)
928+
.map(icebergTable.specs()::get)
929+
.collect(toImmutableSet()))
930+
.orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().values())); // No snapshot, so no data. This case doesn't matter.
931+
932+
for (PartitionSpec spec : partitionSpecs) {
868933
// Currently we do not support delete when any partition columns in predicate is not transform by identity()
869934
Set<Integer> partitionColumnSourceIds = spec.fields().stream()
870935
.filter(field -> field.transform().isIdentity())
@@ -920,15 +985,16 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl
920985
*/
921986
private long removeScanFiles(Table icebergTable, Iterable<FileScanTask> scan)
922987
{
923-
transaction = icebergTable.newTransaction();
924-
DeleteFiles deletes = transaction.newDelete();
988+
transactionContext = new IcebergTransactionContext(Optional.of(icebergTable), icebergTable.newTransaction());
989+
DeleteFiles deletes = transactionContext.getTransaction().newDelete();
925990
AtomicLong rowsDeleted = new AtomicLong(0L);
926991
scan.forEach(t -> {
927992
deletes.deleteFile(t.file());
928993
rowsDeleted.addAndGet(t.estimatedRowsCount());
929994
});
930995
deletes.commit();
931-
transaction.commitTransaction();
996+
transactionContext.getTransaction().commitTransaction();
997+
transactionContext.destroy();
932998
return rowsDeleted.get();
933999
}
9341000

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public void setup(Binder binder)
146146
procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON);
147147
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
148148
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
149+
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);
149150

150151
// for orc
151152
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);

0 commit comments

Comments
 (0)