Skip to content

Commit 2c19626

Browse files
committed
Refactor Iceberg connector to support call distributed procedure
1 parent 06d21b0 commit 2c19626

13 files changed

+388
-3
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.presto.iceberg.delete.DeleteFile;
17+
import com.facebook.presto.spi.ConnectorSession;
18+
import com.facebook.presto.spi.ConnectorSplit;
19+
import com.facebook.presto.spi.ConnectorSplitSource;
20+
import com.facebook.presto.spi.SplitWeight;
21+
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
22+
import com.google.common.collect.ImmutableList;
23+
import com.google.common.io.Closer;
24+
import org.apache.iceberg.FileScanTask;
25+
import org.apache.iceberg.PartitionSpec;
26+
import org.apache.iceberg.PartitionSpecParser;
27+
import org.apache.iceberg.TableScan;
28+
import org.apache.iceberg.io.CloseableIterable;
29+
import org.apache.iceberg.io.CloseableIterator;
30+
31+
import java.io.IOException;
32+
import java.io.UncheckedIOException;
33+
import java.util.ArrayList;
34+
import java.util.Iterator;
35+
import java.util.List;
36+
import java.util.Optional;
37+
import java.util.concurrent.CompletableFuture;
38+
import java.util.function.Consumer;
39+
40+
import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
41+
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
42+
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
43+
import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
44+
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
45+
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
46+
import static com.google.common.collect.ImmutableList.toImmutableList;
47+
import static com.google.common.collect.Iterators.limit;
48+
import static java.util.Objects.requireNonNull;
49+
import static java.util.concurrent.CompletableFuture.completedFuture;
50+
51+
public class CallDistributedProcedureSplitSource
52+
implements ConnectorSplitSource
53+
{
54+
private CloseableIterator<FileScanTask> fileScanTaskIterator;
55+
private Optional<Consumer<FileScanTask>> fileScanTaskConsumer;
56+
57+
private final TableScan tableScan;
58+
private final Closer closer = Closer.create();
59+
private final double minimumAssignedSplitWeight;
60+
private final ConnectorSession session;
61+
62+
public CallDistributedProcedureSplitSource(
63+
ConnectorSession session,
64+
TableScan tableScan,
65+
CloseableIterable<FileScanTask> fileScanTaskIterable,
66+
Optional<Consumer<FileScanTask>> fileScanTaskConsumer,
67+
double minimumAssignedSplitWeight)
68+
{
69+
this.session = requireNonNull(session, "session is null");
70+
this.tableScan = requireNonNull(tableScan, "tableScan is null");
71+
this.fileScanTaskIterator = fileScanTaskIterable.iterator();
72+
this.fileScanTaskConsumer = requireNonNull(fileScanTaskConsumer, "fileScanTaskConsumer is null");
73+
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
74+
closer.register(fileScanTaskIterator);
75+
}
76+
77+
@Override
78+
public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
79+
{
80+
// TODO: move this to a background thread
81+
List<ConnectorSplit> splits = new ArrayList<>();
82+
Iterator<FileScanTask> iterator = limit(fileScanTaskIterator, maxSize);
83+
while (iterator.hasNext()) {
84+
FileScanTask task = iterator.next();
85+
fileScanTaskConsumer.ifPresent(consumer -> consumer.accept(task));
86+
splits.add(toIcebergSplit(task));
87+
}
88+
return completedFuture(new ConnectorSplitBatch(splits, isFinished()));
89+
}
90+
91+
@Override
92+
public boolean isFinished()
93+
{
94+
return !fileScanTaskIterator.hasNext();
95+
}
96+
97+
@Override
98+
public void close()
99+
{
100+
try {
101+
closer.close();
102+
// TODO: remove this after org.apache.iceberg.io.CloseableIterator'withClose
103+
// correct release resources holds by iterator.
104+
fileScanTaskIterator = CloseableIterator.empty();
105+
}
106+
catch (IOException e) {
107+
throw new UncheckedIOException(e);
108+
}
109+
}
110+
111+
private ConnectorSplit toIcebergSplit(FileScanTask task)
112+
{
113+
PartitionSpec spec = task.spec();
114+
Optional<PartitionData> partitionData = partitionDataFromStructLike(spec, task.file().partition());
115+
116+
// TODO: We should leverage residual expression and convert that to TupleDomain.
117+
// The predicate here is used by readers for predicate push down at reader level,
118+
// so when we do not use residual expression, we are just wasting CPU cycles
119+
// on reader side evaluating a condition that we know will always be true.
120+
121+
return new IcebergSplit(
122+
task.file().path().toString(),
123+
task.start(),
124+
task.length(),
125+
fromIcebergFileFormat(task.file().format()),
126+
ImmutableList.of(),
127+
getPartitionKeys(task),
128+
PartitionSpecParser.toJson(spec),
129+
partitionData.map(PartitionData::toJson),
130+
getNodeSelectionStrategy(session),
131+
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
132+
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
133+
Optional.empty(),
134+
getDataSequenceNumber(task.file()),
135+
getAffinitySchedulingFileSectionSize(session).toBytes());
136+
}
137+
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.airlift.json.JsonCodec;
1717
import com.facebook.airlift.log.Logger;
18+
import com.facebook.presto.common.QualifiedObjectName;
1819
import com.facebook.presto.common.RuntimeStats;
1920
import com.facebook.presto.common.Subfield;
2021
import com.facebook.presto.common.predicate.TupleDomain;
@@ -35,10 +36,13 @@
3536
import com.facebook.presto.spi.ColumnHandle;
3637
import com.facebook.presto.spi.ColumnMetadata;
3738
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
39+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
40+
import com.facebook.presto.spi.ConnectorId;
3841
import com.facebook.presto.spi.ConnectorInsertTableHandle;
3942
import com.facebook.presto.spi.ConnectorNewTableLayout;
4043
import com.facebook.presto.spi.ConnectorOutputTableHandle;
4144
import com.facebook.presto.spi.ConnectorSession;
45+
import com.facebook.presto.spi.ConnectorSplitSource;
4246
import com.facebook.presto.spi.ConnectorTableHandle;
4347
import com.facebook.presto.spi.ConnectorTableLayout;
4448
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
@@ -59,6 +63,9 @@
5963
import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionType;
6064
import com.facebook.presto.spi.function.StandardFunctionResolution;
6165
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
66+
import com.facebook.presto.spi.procedure.DistributedProcedure;
67+
import com.facebook.presto.spi.procedure.Procedure;
68+
import com.facebook.presto.spi.procedure.ProcedureRegistry;
6269
import com.facebook.presto.spi.relation.RowExpression;
6370
import com.facebook.presto.spi.relation.RowExpressionService;
6471
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
@@ -219,10 +226,12 @@ public abstract class IcebergAbstractMetadata
219226
protected static final String INFORMATION_SCHEMA = "information_schema";
220227

221228
protected final TypeManager typeManager;
229+
protected final ProcedureRegistry procedureRegistry;
222230
protected final JsonCodec<CommitTaskData> commitTaskCodec;
223231
protected final NodeVersion nodeVersion;
224232
protected final RowExpressionService rowExpressionService;
225233
protected final FilterStatsCalculatorService filterStatsCalculatorService;
234+
protected Optional<IcebergProcedureContext> procedureContext = Optional.empty();
226235
protected Transaction transaction;
227236
protected final StatisticsFileCache statisticsFileCache;
228237
protected final IcebergTableProperties tableProperties;
@@ -232,6 +241,7 @@ public abstract class IcebergAbstractMetadata
232241

233242
public IcebergAbstractMetadata(
234243
TypeManager typeManager,
244+
ProcedureRegistry procedureRegistry,
235245
StandardFunctionResolution functionResolution,
236246
RowExpressionService rowExpressionService,
237247
JsonCodec<CommitTaskData> commitTaskCodec,
@@ -241,6 +251,7 @@ public IcebergAbstractMetadata(
241251
IcebergTableProperties tableProperties)
242252
{
243253
this.typeManager = requireNonNull(typeManager, "typeManager is null");
254+
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
244255
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
245256
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
246257
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
@@ -267,6 +278,11 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName
267278

268279
public abstract void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName);
269280

281+
public Optional<ConnectorSplitSource> getSplitSourceInCurrentCallProcedureTransaction()
282+
{
283+
return procedureContext.flatMap(IcebergProcedureContext::getConnectorSplitSource);
284+
}
285+
270286
/**
271287
* This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
272288
*/
@@ -1041,6 +1057,48 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
10411057
removeScanFiles(icebergTable, TupleDomain.all());
10421058
}
10431059

1060+
@Override
1061+
public ConnectorDistributedProcedureHandle beginCallDistributedProcedure(
1062+
ConnectorSession session,
1063+
QualifiedObjectName procedureName,
1064+
ConnectorTableLayoutHandle tableLayoutHandle,
1065+
Object[] arguments)
1066+
{
1067+
IcebergTableHandle handle = ((IcebergTableLayoutHandle) tableLayoutHandle).getTable();
1068+
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
1069+
1070+
if (handle.isSnapshotSpecified()) {
1071+
throw new PrestoException(NOT_SUPPORTED, "This connector do not allow table execute at specified snapshot");
1072+
}
1073+
1074+
transaction = icebergTable.newTransaction();
1075+
Procedure procedure = procedureRegistry.resolve(
1076+
new ConnectorId(procedureName.getCatalogName()),
1077+
new SchemaTableName(
1078+
procedureName.getSchemaName(),
1079+
procedureName.getObjectName()));
1080+
verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure");
1081+
procedureContext = Optional.of((IcebergProcedureContext) ((DistributedProcedure) procedure).createContext());
1082+
procedureContext.get().setTable(icebergTable);
1083+
procedureContext.get().setTransaction(transaction);
1084+
return ((DistributedProcedure) procedure).begin(session, procedureContext.get(), tableLayoutHandle, arguments);
1085+
}
1086+
1087+
@Override
1088+
public void finishCallDistributedProcedure(ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection<Slice> fragments)
1089+
{
1090+
Procedure procedure = procedureRegistry.resolve(
1091+
new ConnectorId(procedureName.getCatalogName()),
1092+
new SchemaTableName(
1093+
procedureName.getSchemaName(),
1094+
procedureName.getObjectName()));
1095+
verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure");
1096+
verify(procedureContext.isPresent(), "procedure context must be present");
1097+
((DistributedProcedure) procedure).finish(procedureContext.get(), procedureHandle, fragments);
1098+
transaction.commitTransaction();
1099+
procedureContext.get().destroy();
1100+
}
1101+
10441102
@Override
10451103
public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
10461104
{
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.presto.hive.HiveCompressionCodec;
17+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
18+
import com.fasterxml.jackson.annotation.JsonCreator;
19+
import com.fasterxml.jackson.annotation.JsonProperty;
20+
import com.google.common.collect.ImmutableList;
21+
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
public class IcebergDistributedProcedureHandle
26+
extends IcebergWritableTableHandle
27+
implements ConnectorDistributedProcedureHandle
28+
{
29+
@JsonCreator
30+
public IcebergDistributedProcedureHandle(
31+
@JsonProperty("schemaName") String schemaName,
32+
@JsonProperty("tableName") IcebergTableName tableName,
33+
@JsonProperty("schema") PrestoIcebergSchema schema,
34+
@JsonProperty("partitionSpec") PrestoIcebergPartitionSpec partitionSpec,
35+
@JsonProperty("inputColumns") List<IcebergColumnHandle> inputColumns,
36+
@JsonProperty("outputPath") String outputPath,
37+
@JsonProperty("fileFormat") FileFormat fileFormat,
38+
@JsonProperty("compressionCodec") HiveCompressionCodec compressionCodec,
39+
@JsonProperty("storageProperties") Map<String, String> storageProperties)
40+
{
41+
super(
42+
schemaName,
43+
tableName,
44+
schema,
45+
partitionSpec,
46+
inputColumns,
47+
outputPath,
48+
fileFormat,
49+
compressionCodec,
50+
storageProperties,
51+
ImmutableList.of());
52+
}
53+
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.presto.hive.HiveTransactionHandle;
1717
import com.facebook.presto.spi.ColumnHandle;
1818
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
19+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
1920
import com.facebook.presto.spi.ConnectorHandleResolver;
2021
import com.facebook.presto.spi.ConnectorInsertTableHandle;
2122
import com.facebook.presto.spi.ConnectorOutputTableHandle;
@@ -69,6 +70,12 @@ public Class<? extends ConnectorDeleteTableHandle> getDeleteTableHandleClass()
6970
return IcebergTableHandle.class;
7071
}
7172

73+
@Override
74+
public Class<? extends ConnectorDistributedProcedureHandle> getDistributedProcedureHandleClass()
75+
{
76+
return IcebergDistributedProcedureHandle.class;
77+
}
78+
7279
@Override
7380
public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
7481
{

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import com.facebook.presto.spi.ViewNotFoundException;
5555
import com.facebook.presto.spi.function.StandardFunctionResolution;
5656
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
57+
import com.facebook.presto.spi.procedure.ProcedureRegistry;
5758
import com.facebook.presto.spi.relation.RowExpressionService;
5859
import com.facebook.presto.spi.security.PrestoPrincipal;
5960
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
@@ -175,6 +176,7 @@ public IcebergHiveMetadata(
175176
ExtendedHiveMetastore metastore,
176177
HdfsEnvironment hdfsEnvironment,
177178
TypeManager typeManager,
179+
ProcedureRegistry procedureRegistry,
178180
StandardFunctionResolution functionResolution,
179181
RowExpressionService rowExpressionService,
180182
JsonCodec<CommitTaskData> commitTaskCodec,
@@ -186,7 +188,7 @@ public IcebergHiveMetadata(
186188
IcebergTableProperties tableProperties,
187189
ConnectorSystemConfig connectorSystemConfig)
188190
{
189-
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties);
191+
super(typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties);
190192
this.catalogName = requireNonNull(catalogName, "catalogName is null");
191193
this.metastore = requireNonNull(metastore, "metastore is null");
192194
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.facebook.presto.spi.connector.ConnectorMetadata;
2424
import com.facebook.presto.spi.function.StandardFunctionResolution;
2525
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
26+
import com.facebook.presto.spi.procedure.ProcedureRegistry;
2627
import com.facebook.presto.spi.relation.RowExpressionService;
2728
import jakarta.inject.Inject;
2829

@@ -35,6 +36,7 @@ public class IcebergHiveMetadataFactory
3536
final ExtendedHiveMetastore metastore;
3637
final HdfsEnvironment hdfsEnvironment;
3738
final TypeManager typeManager;
39+
final ProcedureRegistry procedureRegistry;
3840
final JsonCodec<CommitTaskData> commitTaskCodec;
3941
final StandardFunctionResolution functionResolution;
4042
final RowExpressionService rowExpressionService;
@@ -52,6 +54,7 @@ public IcebergHiveMetadataFactory(
5254
ExtendedHiveMetastore metastore,
5355
HdfsEnvironment hdfsEnvironment,
5456
TypeManager typeManager,
57+
ProcedureRegistry procedureRegistry,
5558
StandardFunctionResolution functionResolution,
5659
RowExpressionService rowExpressionService,
5760
JsonCodec<CommitTaskData> commitTaskCodec,
@@ -67,6 +70,7 @@ public IcebergHiveMetadataFactory(
6770
this.metastore = requireNonNull(metastore, "metastore is null");
6871
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
6972
this.typeManager = requireNonNull(typeManager, "typeManager is null");
73+
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
7074
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
7175
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
7276
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
@@ -86,6 +90,7 @@ public ConnectorMetadata create()
8690
metastore,
8791
hdfsEnvironment,
8892
typeManager,
93+
procedureRegistry,
8994
functionResolution,
9095
rowExpressionService,
9196
commitTaskCodec,

0 commit comments

Comments
 (0)