1515
1616import com .facebook .airlift .json .JsonCodec ;
1717import com .facebook .airlift .log .Logger ;
18+ import com .facebook .presto .common .QualifiedObjectName ;
1819import com .facebook .presto .common .RuntimeStats ;
1920import com .facebook .presto .common .Subfield ;
2021import com .facebook .presto .common .predicate .TupleDomain ;
3536import com .facebook .presto .spi .ColumnHandle ;
3637import com .facebook .presto .spi .ColumnMetadata ;
3738import com .facebook .presto .spi .ConnectorDeleteTableHandle ;
39+ import com .facebook .presto .spi .ConnectorDistributedProcedureHandle ;
40+ import com .facebook .presto .spi .ConnectorId ;
3841import com .facebook .presto .spi .ConnectorInsertTableHandle ;
3942import com .facebook .presto .spi .ConnectorNewTableLayout ;
4043import com .facebook .presto .spi .ConnectorOutputTableHandle ;
4144import com .facebook .presto .spi .ConnectorSession ;
45+ import com .facebook .presto .spi .ConnectorSplitSource ;
4246import com .facebook .presto .spi .ConnectorTableHandle ;
4347import com .facebook .presto .spi .ConnectorTableLayout ;
4448import com .facebook .presto .spi .ConnectorTableLayoutHandle ;
5963import com .facebook .presto .spi .connector .ConnectorTableVersion .VersionType ;
6064import com .facebook .presto .spi .function .StandardFunctionResolution ;
6165import com .facebook .presto .spi .plan .FilterStatsCalculatorService ;
66+ import com .facebook .presto .spi .procedure .DistributedProcedure ;
67+ import com .facebook .presto .spi .procedure .Procedure ;
68+ import com .facebook .presto .spi .procedure .ProcedureRegistry ;
6269import com .facebook .presto .spi .relation .RowExpression ;
6370import com .facebook .presto .spi .relation .RowExpressionService ;
6471import com .facebook .presto .spi .statistics .ColumnStatisticMetadata ;
@@ -218,10 +225,12 @@ public abstract class IcebergAbstractMetadata
218225 protected static final String INFORMATION_SCHEMA = "information_schema" ;
219226
220227 protected final TypeManager typeManager ;
228+ protected final ProcedureRegistry procedureRegistry ;
221229 protected final JsonCodec <CommitTaskData > commitTaskCodec ;
222230 protected final NodeVersion nodeVersion ;
223231 protected final RowExpressionService rowExpressionService ;
224232 protected final FilterStatsCalculatorService filterStatsCalculatorService ;
233+ protected Optional <IcebergProcedureContext > procedureContext = Optional .empty ();
225234 protected Transaction transaction ;
226235 protected final StatisticsFileCache statisticsFileCache ;
227236 protected final IcebergTableProperties tableProperties ;
@@ -231,6 +240,7 @@ public abstract class IcebergAbstractMetadata
231240
232241 public IcebergAbstractMetadata (
233242 TypeManager typeManager ,
243+ ProcedureRegistry procedureRegistry ,
234244 StandardFunctionResolution functionResolution ,
235245 RowExpressionService rowExpressionService ,
236246 JsonCodec <CommitTaskData > commitTaskCodec ,
@@ -240,6 +250,7 @@ public IcebergAbstractMetadata(
240250 IcebergTableProperties tableProperties )
241251 {
242252 this .typeManager = requireNonNull (typeManager , "typeManager is null" );
253+ this .procedureRegistry = requireNonNull (procedureRegistry , "procedureRegistry is null" );
243254 this .commitTaskCodec = requireNonNull (commitTaskCodec , "commitTaskCodec is null" );
244255 this .functionResolution = requireNonNull (functionResolution , "functionResolution is null" );
245256 this .rowExpressionService = requireNonNull (rowExpressionService , "rowExpressionService is null" );
@@ -266,6 +277,11 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName
266277
267278 public abstract void unregisterTable (ConnectorSession clientSession , SchemaTableName schemaTableName );
268279
280+ public Optional <ConnectorSplitSource > getSplitSourceInCurrentCallProcedureTransaction ()
281+ {
282+ return procedureContext .flatMap (IcebergProcedureContext ::getConnectorSplitSource );
283+ }
284+
269285 /**
270286 * This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
271287 */
@@ -1040,6 +1056,48 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
10401056 removeScanFiles (icebergTable , TupleDomain .all ());
10411057 }
10421058
1059+ @ Override
1060+ public ConnectorDistributedProcedureHandle beginCallDistributedProcedure (
1061+ ConnectorSession session ,
1062+ QualifiedObjectName procedureName ,
1063+ ConnectorTableLayoutHandle tableLayoutHandle ,
1064+ Object [] arguments )
1065+ {
1066+ IcebergTableHandle handle = ((IcebergTableLayoutHandle ) tableLayoutHandle ).getTable ();
1067+ Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
1068+
1069+ if (handle .isSnapshotSpecified ()) {
1070+ throw new PrestoException (NOT_SUPPORTED , "This connector do not allow table execute at specified snapshot" );
1071+ }
1072+
1073+ transaction = icebergTable .newTransaction ();
1074+ Procedure procedure = procedureRegistry .resolve (
1075+ new ConnectorId (procedureName .getCatalogName ()),
1076+ new SchemaTableName (
1077+ procedureName .getSchemaName (),
1078+ procedureName .getObjectName ()));
1079+ verify (procedure instanceof DistributedProcedure , "procedure must be DistributedProcedure" );
1080+ procedureContext = Optional .of ((IcebergProcedureContext ) ((DistributedProcedure ) procedure ).createContext ());
1081+ procedureContext .get ().setTable (icebergTable );
1082+ procedureContext .get ().setTransaction (transaction );
1083+ return ((DistributedProcedure ) procedure ).begin (session , procedureContext .get (), tableLayoutHandle , arguments );
1084+ }
1085+
1086+ @ Override
1087+ public void finishCallDistributedProcedure (ConnectorSession session , ConnectorDistributedProcedureHandle procedureHandle , QualifiedObjectName procedureName , Collection <Slice > fragments )
1088+ {
1089+ Procedure procedure = procedureRegistry .resolve (
1090+ new ConnectorId (procedureName .getCatalogName ()),
1091+ new SchemaTableName (
1092+ procedureName .getSchemaName (),
1093+ procedureName .getObjectName ()));
1094+ verify (procedure instanceof DistributedProcedure , "procedure must be DistributedProcedure" );
1095+ verify (procedureContext .isPresent (), "procedure context must be present" );
1096+ ((DistributedProcedure ) procedure ).finish (procedureContext .get (), procedureHandle , fragments );
1097+ transaction .commitTransaction ();
1098+ procedureContext .get ().destroy ();
1099+ }
1100+
10431101 @ Override
10441102 public ConnectorDeleteTableHandle beginDelete (ConnectorSession session , ConnectorTableHandle tableHandle )
10451103 {
0 commit comments