Skip to content

Commit e2d00d8

Browse files
committed
Refactor Iceberg connector to support call distributed procedure
1 parent 94de465 commit e2d00d8

File tree

17 files changed

+404
-3
lines changed

17 files changed

+404
-3
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.presto.iceberg.delete.DeleteFile;
17+
import com.facebook.presto.spi.ConnectorSession;
18+
import com.facebook.presto.spi.ConnectorSplit;
19+
import com.facebook.presto.spi.ConnectorSplitSource;
20+
import com.facebook.presto.spi.SplitWeight;
21+
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
22+
import com.google.common.collect.ImmutableList;
23+
import com.google.common.io.Closer;
24+
import org.apache.iceberg.FileScanTask;
25+
import org.apache.iceberg.PartitionSpec;
26+
import org.apache.iceberg.PartitionSpecParser;
27+
import org.apache.iceberg.TableScan;
28+
import org.apache.iceberg.io.CloseableIterable;
29+
import org.apache.iceberg.io.CloseableIterator;
30+
31+
import java.io.IOException;
32+
import java.io.UncheckedIOException;
33+
import java.util.ArrayList;
34+
import java.util.Iterator;
35+
import java.util.List;
36+
import java.util.Optional;
37+
import java.util.concurrent.CompletableFuture;
38+
import java.util.function.Consumer;
39+
40+
import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
41+
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
42+
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
43+
import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
44+
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
45+
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
46+
import static com.google.common.collect.ImmutableList.toImmutableList;
47+
import static com.google.common.collect.Iterators.limit;
48+
import static java.util.Objects.requireNonNull;
49+
import static java.util.concurrent.CompletableFuture.completedFuture;
50+
51+
public class CallDistributedProcedureSplitSource
52+
implements ConnectorSplitSource
53+
{
54+
private CloseableIterator<FileScanTask> fileScanTaskIterator;
55+
private Optional<Consumer<FileScanTask>> fileScanTaskConsumer;
56+
57+
private final TableScan tableScan;
58+
private final Closer closer = Closer.create();
59+
private final double minimumAssignedSplitWeight;
60+
private final ConnectorSession session;
61+
62+
public CallDistributedProcedureSplitSource(
63+
ConnectorSession session,
64+
TableScan tableScan,
65+
CloseableIterable<FileScanTask> fileScanTaskIterable,
66+
Optional<Consumer<FileScanTask>> fileScanTaskConsumer,
67+
double minimumAssignedSplitWeight)
68+
{
69+
this.session = requireNonNull(session, "session is null");
70+
this.tableScan = requireNonNull(tableScan, "tableScan is null");
71+
this.fileScanTaskIterator = fileScanTaskIterable.iterator();
72+
this.fileScanTaskConsumer = requireNonNull(fileScanTaskConsumer, "fileScanTaskConsumer is null");
73+
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
74+
closer.register(fileScanTaskIterator);
75+
}
76+
77+
@Override
78+
public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
79+
{
80+
// TODO: move this to a background thread
81+
List<ConnectorSplit> splits = new ArrayList<>();
82+
Iterator<FileScanTask> iterator = limit(fileScanTaskIterator, maxSize);
83+
while (iterator.hasNext()) {
84+
FileScanTask task = iterator.next();
85+
fileScanTaskConsumer.ifPresent(consumer -> consumer.accept(task));
86+
splits.add(toIcebergSplit(task));
87+
}
88+
return completedFuture(new ConnectorSplitBatch(splits, isFinished()));
89+
}
90+
91+
@Override
92+
public boolean isFinished()
93+
{
94+
return !fileScanTaskIterator.hasNext();
95+
}
96+
97+
@Override
98+
public void close()
99+
{
100+
try {
101+
closer.close();
102+
// TODO: remove this after org.apache.iceberg.io.CloseableIterator'withClose
103+
// correct release resources holds by iterator.
104+
fileScanTaskIterator = CloseableIterator.empty();
105+
}
106+
catch (IOException e) {
107+
throw new UncheckedIOException(e);
108+
}
109+
}
110+
111+
private ConnectorSplit toIcebergSplit(FileScanTask task)
112+
{
113+
PartitionSpec spec = task.spec();
114+
Optional<PartitionData> partitionData = partitionDataFromStructLike(spec, task.file().partition());
115+
116+
// TODO: We should leverage residual expression and convert that to TupleDomain.
117+
// The predicate here is used by readers for predicate push down at reader level,
118+
// so when we do not use residual expression, we are just wasting CPU cycles
119+
// on reader side evaluating a condition that we know will always be true.
120+
121+
return new IcebergSplit(
122+
task.file().path().toString(),
123+
task.start(),
124+
task.length(),
125+
fromIcebergFileFormat(task.file().format()),
126+
ImmutableList.of(),
127+
getPartitionKeys(task),
128+
PartitionSpecParser.toJson(spec),
129+
partitionData.map(PartitionData::toJson),
130+
getNodeSelectionStrategy(session),
131+
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
132+
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
133+
Optional.empty(),
134+
getDataSequenceNumber(task.file()),
135+
getAffinitySchedulingFileSectionSize(session).toBytes());
136+
}
137+
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.airlift.json.JsonCodec;
1717
import com.facebook.airlift.log.Logger;
18+
import com.facebook.presto.common.QualifiedObjectName;
1819
import com.facebook.presto.common.RuntimeStats;
1920
import com.facebook.presto.common.Subfield;
2021
import com.facebook.presto.common.predicate.TupleDomain;
@@ -35,10 +36,13 @@
3536
import com.facebook.presto.spi.ColumnHandle;
3637
import com.facebook.presto.spi.ColumnMetadata;
3738
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
39+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
40+
import com.facebook.presto.spi.ConnectorId;
3841
import com.facebook.presto.spi.ConnectorInsertTableHandle;
3942
import com.facebook.presto.spi.ConnectorNewTableLayout;
4043
import com.facebook.presto.spi.ConnectorOutputTableHandle;
4144
import com.facebook.presto.spi.ConnectorSession;
45+
import com.facebook.presto.spi.ConnectorSplitSource;
4246
import com.facebook.presto.spi.ConnectorTableHandle;
4347
import com.facebook.presto.spi.ConnectorTableLayout;
4448
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
@@ -59,6 +63,9 @@
5963
import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionType;
6064
import com.facebook.presto.spi.function.StandardFunctionResolution;
6165
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
66+
import com.facebook.presto.spi.procedure.DistributedProcedure;
67+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
68+
import com.facebook.presto.spi.procedure.Procedure;
6269
import com.facebook.presto.spi.relation.RowExpression;
6370
import com.facebook.presto.spi.relation.RowExpressionService;
6471
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
@@ -218,10 +225,12 @@ public abstract class IcebergAbstractMetadata
218225
protected static final String INFORMATION_SCHEMA = "information_schema";
219226

220227
protected final TypeManager typeManager;
228+
protected final IProcedureRegistry procedureRegistry;
221229
protected final JsonCodec<CommitTaskData> commitTaskCodec;
222230
protected final NodeVersion nodeVersion;
223231
protected final RowExpressionService rowExpressionService;
224232
protected final FilterStatsCalculatorService filterStatsCalculatorService;
233+
protected Optional<IcebergProcedureContext> procedureContext = Optional.empty();
225234
protected Transaction transaction;
226235
protected final StatisticsFileCache statisticsFileCache;
227236
protected final IcebergTableProperties tableProperties;
@@ -231,6 +240,7 @@ public abstract class IcebergAbstractMetadata
231240

232241
public IcebergAbstractMetadata(
233242
TypeManager typeManager,
243+
IProcedureRegistry procedureRegistry,
234244
StandardFunctionResolution functionResolution,
235245
RowExpressionService rowExpressionService,
236246
JsonCodec<CommitTaskData> commitTaskCodec,
@@ -240,6 +250,7 @@ public IcebergAbstractMetadata(
240250
IcebergTableProperties tableProperties)
241251
{
242252
this.typeManager = requireNonNull(typeManager, "typeManager is null");
253+
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
243254
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
244255
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
245256
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
@@ -266,6 +277,11 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName
266277

267278
public abstract void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName);
268279

280+
public Optional<ConnectorSplitSource> getSplitSourceInCurrentCallProcedureTransaction()
281+
{
282+
return procedureContext.flatMap(IcebergProcedureContext::getConnectorSplitSource);
283+
}
284+
269285
/**
270286
* This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
271287
*/
@@ -1040,6 +1056,46 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
10401056
removeScanFiles(icebergTable, TupleDomain.all());
10411057
}
10421058

1059+
@Override
1060+
public ConnectorDistributedProcedureHandle beginCallDistributedProcedure(
1061+
ConnectorSession session,
1062+
QualifiedObjectName procedureName,
1063+
ConnectorTableLayoutHandle tableLayoutHandle,
1064+
Object[] arguments)
1065+
{
1066+
IcebergTableHandle handle = ((IcebergTableLayoutHandle) tableLayoutHandle).getTable();
1067+
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
1068+
1069+
if (handle.isSnapshotSpecified()) {
1070+
throw new PrestoException(NOT_SUPPORTED, "This connector do not allow table execute at specified snapshot");
1071+
}
1072+
1073+
transaction = icebergTable.newTransaction();
1074+
procedureContext = Optional.of(new IcebergProcedureContext(Optional.of(icebergTable), transaction));
1075+
Procedure procedure = procedureRegistry.resolve(
1076+
new ConnectorId(procedureName.getCatalogName()),
1077+
new SchemaTableName(
1078+
procedureName.getSchemaName(),
1079+
procedureName.getObjectName()));
1080+
verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure");
1081+
return ((DistributedProcedure) procedure).getBeginCallDistributedProcedure().begin(session, procedureContext.get(), tableLayoutHandle, arguments);
1082+
}
1083+
1084+
@Override
1085+
public void finishCallDistributedProcedure(ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection<Slice> fragments)
1086+
{
1087+
Procedure procedure = procedureRegistry.resolve(
1088+
new ConnectorId(procedureName.getCatalogName()),
1089+
new SchemaTableName(
1090+
procedureName.getSchemaName(),
1091+
procedureName.getObjectName()));
1092+
verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure");
1093+
verify(procedureContext.isPresent(), "procedure context must be present");
1094+
((DistributedProcedure) procedure).getFinishCallDistributedProcedure().finish(procedureContext.get(), procedureHandle, fragments);
1095+
transaction.commitTransaction();
1096+
procedureContext.get().destroy();
1097+
}
1098+
10431099
@Override
10441100
public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
10451101
{
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.presto.hive.HiveCompressionCodec;
17+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
18+
import com.fasterxml.jackson.annotation.JsonCreator;
19+
import com.fasterxml.jackson.annotation.JsonProperty;
20+
import com.google.common.collect.ImmutableList;
21+
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
public class IcebergDistributedProcedureHandle
26+
extends IcebergWritableTableHandle
27+
implements ConnectorDistributedProcedureHandle
28+
{
29+
@JsonCreator
30+
public IcebergDistributedProcedureHandle(
31+
@JsonProperty("schemaName") String schemaName,
32+
@JsonProperty("tableName") IcebergTableName tableName,
33+
@JsonProperty("schema") PrestoIcebergSchema schema,
34+
@JsonProperty("partitionSpec") PrestoIcebergPartitionSpec partitionSpec,
35+
@JsonProperty("inputColumns") List<IcebergColumnHandle> inputColumns,
36+
@JsonProperty("outputPath") String outputPath,
37+
@JsonProperty("fileFormat") FileFormat fileFormat,
38+
@JsonProperty("compressionCodec") HiveCompressionCodec compressionCodec,
39+
@JsonProperty("storageProperties") Map<String, String> storageProperties)
40+
{
41+
super(
42+
schemaName,
43+
tableName,
44+
schema,
45+
partitionSpec,
46+
inputColumns,
47+
outputPath,
48+
fileFormat,
49+
compressionCodec,
50+
storageProperties,
51+
ImmutableList.of());
52+
}
53+
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.presto.hive.HiveTransactionHandle;
1717
import com.facebook.presto.spi.ColumnHandle;
1818
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
19+
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
1920
import com.facebook.presto.spi.ConnectorHandleResolver;
2021
import com.facebook.presto.spi.ConnectorInsertTableHandle;
2122
import com.facebook.presto.spi.ConnectorOutputTableHandle;
@@ -69,6 +70,12 @@ public Class<? extends ConnectorDeleteTableHandle> getDeleteTableHandleClass()
6970
return IcebergTableHandle.class;
7071
}
7172

73+
@Override
74+
public Class<? extends ConnectorDistributedProcedureHandle> getDistributedProcedureHandleClass()
75+
{
76+
return IcebergDistributedProcedureHandle.class;
77+
}
78+
7279
@Override
7380
public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
7481
{

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import com.facebook.presto.spi.ViewNotFoundException;
5555
import com.facebook.presto.spi.function.StandardFunctionResolution;
5656
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
57+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
5758
import com.facebook.presto.spi.relation.RowExpressionService;
5859
import com.facebook.presto.spi.security.PrestoPrincipal;
5960
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
@@ -175,6 +176,7 @@ public IcebergHiveMetadata(
175176
ExtendedHiveMetastore metastore,
176177
HdfsEnvironment hdfsEnvironment,
177178
TypeManager typeManager,
179+
IProcedureRegistry procedureRegistry,
178180
StandardFunctionResolution functionResolution,
179181
RowExpressionService rowExpressionService,
180182
JsonCodec<CommitTaskData> commitTaskCodec,
@@ -186,7 +188,7 @@ public IcebergHiveMetadata(
186188
IcebergTableProperties tableProperties,
187189
ConnectorSystemConfig connectorSystemConfig)
188190
{
189-
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties);
191+
super(typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties);
190192
this.catalogName = requireNonNull(catalogName, "catalogName is null");
191193
this.metastore = requireNonNull(metastore, "metastore is null");
192194
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.facebook.presto.spi.connector.ConnectorMetadata;
2424
import com.facebook.presto.spi.function.StandardFunctionResolution;
2525
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
26+
import com.facebook.presto.spi.procedure.IProcedureRegistry;
2627
import com.facebook.presto.spi.relation.RowExpressionService;
2728
import jakarta.inject.Inject;
2829

@@ -35,6 +36,7 @@ public class IcebergHiveMetadataFactory
3536
final ExtendedHiveMetastore metastore;
3637
final HdfsEnvironment hdfsEnvironment;
3738
final TypeManager typeManager;
39+
final IProcedureRegistry procedureRegistry;
3840
final JsonCodec<CommitTaskData> commitTaskCodec;
3941
final StandardFunctionResolution functionResolution;
4042
final RowExpressionService rowExpressionService;
@@ -52,6 +54,7 @@ public IcebergHiveMetadataFactory(
5254
ExtendedHiveMetastore metastore,
5355
HdfsEnvironment hdfsEnvironment,
5456
TypeManager typeManager,
57+
IProcedureRegistry procedureRegistry,
5558
StandardFunctionResolution functionResolution,
5659
RowExpressionService rowExpressionService,
5760
JsonCodec<CommitTaskData> commitTaskCodec,
@@ -67,6 +70,7 @@ public IcebergHiveMetadataFactory(
6770
this.metastore = requireNonNull(metastore, "metastore is null");
6871
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
6972
this.typeManager = requireNonNull(typeManager, "typeManager is null");
73+
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
7074
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
7175
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
7276
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
@@ -86,6 +90,7 @@ public ConnectorMetadata create()
8690
metastore,
8791
hdfsEnvironment,
8892
typeManager,
93+
procedureRegistry,
8994
functionResolution,
9095
rowExpressionService,
9196
commitTaskCodec,

0 commit comments

Comments
 (0)