1515
1616import com .facebook .airlift .json .JsonCodec ;
1717import com .facebook .airlift .log .Logger ;
18+ import com .facebook .presto .common .QualifiedObjectName ;
1819import com .facebook .presto .common .Subfield ;
1920import com .facebook .presto .common .predicate .TupleDomain ;
2021import com .facebook .presto .common .type .BigintType ;
2728import com .facebook .presto .iceberg .changelog .ChangelogUtil ;
2829import com .facebook .presto .spi .ColumnHandle ;
2930import com .facebook .presto .spi .ColumnMetadata ;
31+ import com .facebook .presto .spi .ConnectorDistributedProcedureHandle ;
32+ import com .facebook .presto .spi .ConnectorId ;
3033import com .facebook .presto .spi .ConnectorInsertTableHandle ;
3134import com .facebook .presto .spi .ConnectorNewTableLayout ;
3235import com .facebook .presto .spi .ConnectorOutputTableHandle ;
3336import com .facebook .presto .spi .ConnectorSession ;
37+ import com .facebook .presto .spi .ConnectorSplitSource ;
3438import com .facebook .presto .spi .ConnectorTableHandle ;
3539import com .facebook .presto .spi .ConnectorTableLayout ;
3640import com .facebook .presto .spi .ConnectorTableLayoutHandle ;
4953import com .facebook .presto .spi .connector .ConnectorTableVersion .VersionOperator ;
5054import com .facebook .presto .spi .connector .ConnectorTableVersion .VersionType ;
5155import com .facebook .presto .spi .function .StandardFunctionResolution ;
56+ import com .facebook .presto .spi .procedure .DistributedProcedure ;
57+ import com .facebook .presto .spi .procedure .IProcedureRegistry ;
58+ import com .facebook .presto .spi .procedure .Procedure ;
5259import com .facebook .presto .spi .relation .RowExpression ;
5360import com .facebook .presto .spi .relation .RowExpressionService ;
5461import com .facebook .presto .spi .statistics .ColumnStatisticMetadata ;
@@ -169,23 +176,26 @@ public abstract class IcebergAbstractMetadata
169176 implements ConnectorMetadata
170177{
171178 protected final TypeManager typeManager ;
179+ protected final IProcedureRegistry procedureRegistry ;
172180 protected final JsonCodec <CommitTaskData > commitTaskCodec ;
173181 protected final NodeVersion nodeVersion ;
174182 protected final RowExpressionService rowExpressionService ;
175- protected Transaction transaction ;
183+ protected IcebergTransactionContext transactionContext ;
176184
177185 private final StandardFunctionResolution functionResolution ;
178186 private final ConcurrentMap <SchemaTableName , Table > icebergTables = new ConcurrentHashMap <>();
179187 private static final Logger log = Logger .get (IcebergAbstractMetadata .class );
180188
181189 public IcebergAbstractMetadata (
182190 TypeManager typeManager ,
191+ IProcedureRegistry procedureRegistry ,
183192 StandardFunctionResolution functionResolution ,
184193 RowExpressionService rowExpressionService ,
185194 JsonCodec <CommitTaskData > commitTaskCodec ,
186195 NodeVersion nodeVersion )
187196 {
188197 this .typeManager = requireNonNull (typeManager , "typeManager is null" );
198+ this .procedureRegistry = requireNonNull (procedureRegistry , "procedureRegistry is null" );
189199 this .commitTaskCodec = requireNonNull (commitTaskCodec , "commitTaskCodec is null" );
190200 this .functionResolution = requireNonNull (functionResolution , "functionResolution is null" );
191201 this .rowExpressionService = requireNonNull (rowExpressionService , "rowExpressionService is null" );
@@ -207,6 +217,11 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName
207217
208218 public abstract void unregisterTable (ConnectorSession clientSession , SchemaTableName schemaTableName );
209219
220+ public Optional <ConnectorSplitSource > getSplitSourceInCurrentCallProcedureTransaction ()
221+ {
222+ return transactionContext == null ? Optional .empty () : transactionContext .getConnectorSplitSource ();
223+ }
224+
210225 /**
211226 * This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
212227 */
@@ -436,7 +451,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
436451
437452 protected ConnectorInsertTableHandle beginIcebergTableInsert (ConnectorSession session , IcebergTableHandle table , Table icebergTable )
438453 {
439- transaction = icebergTable .newTransaction ();
454+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
440455
441456 return new IcebergInsertTableHandle (
442457 table .getSchemaName (),
@@ -459,11 +474,12 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
459474 private Optional <ConnectorOutputMetadata > finishWrite (ConnectorSession session , IcebergWritableTableHandle writableTableHandle , Collection <Slice > fragments , Collection <ComputedStatistics > computedStatistics )
460475 {
461476 if (fragments .isEmpty ()) {
462- transaction .commitTransaction ();
477+ transactionContext .getTransaction ().commitTransaction ();
478+ transactionContext .destroy ();
463479 return Optional .empty ();
464480 }
465481
466- Table icebergTable = transaction .table ();
482+ Table icebergTable = transactionContext . getTransaction () .table ();
467483
468484 List <CommitTaskData > commitTasks = fragments .stream ()
469485 .map (slice -> commitTaskCodec .fromJson (slice .getBytes ()))
@@ -474,7 +490,7 @@ private Optional<ConnectorOutputMetadata> finishWrite(ConnectorSession session,
474490 icebergTable .schema ().findType (field .sourceId ())))
475491 .toArray (Type []::new );
476492
477- AppendFiles appendFiles = transaction .newFastAppend ();
493+ AppendFiles appendFiles = transactionContext . getTransaction () .newFastAppend ();
478494 for (CommitTaskData task : commitTasks ) {
479495 DataFiles .Builder builder = DataFiles .builder (icebergTable .spec ())
480496 .withPath (task .getPath ())
@@ -492,7 +508,8 @@ private Optional<ConnectorOutputMetadata> finishWrite(ConnectorSession session,
492508 }
493509
494510 appendFiles .commit ();
495- transaction .commitTransaction ();
511+ transactionContext .getTransaction ().commitTransaction ();
512+ transactionContext .destroy ();
496513
497514 return Optional .of (new HiveWrittenPartitions (commitTasks .stream ()
498515 .map (CommitTaskData ::getPath )
@@ -805,6 +822,44 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
805822 removeScanFiles (icebergTable , TupleDomain .all ());
806823 }
807824
825+ @ Override
826+ public ConnectorDistributedProcedureHandle beginCallDistributedProcedure (
827+ ConnectorSession session ,
828+ QualifiedObjectName procedureName ,
829+ ConnectorTableLayoutHandle tableLayoutHandle ,
830+ Object [] arguments )
831+ {
832+ IcebergTableHandle handle = ((IcebergTableLayoutHandle ) tableLayoutHandle ).getTable ();
833+ Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
834+
835+ if (handle .isSnapshotSpecified ()) {
836+ throw new PrestoException (NOT_SUPPORTED , "This connector do not allow table execute at specified snapshot" );
837+ }
838+
839+ transactionContext = new IcebergTransactionContext (Optional .of (icebergTable ), icebergTable .newTransaction ());
840+ Procedure procedure = procedureRegistry .resolve (
841+ new ConnectorId (procedureName .getCatalogName ()),
842+ new SchemaTableName (
843+ procedureName .getSchemaName (),
844+ procedureName .getObjectName ()));
845+ verify (procedure instanceof DistributedProcedure , "procedure must be DistributedProcedure" );
846+ return ((DistributedProcedure ) procedure ).getBeginCallDistributedProcedure ().begin (session , transactionContext , tableLayoutHandle , arguments );
847+ }
848+
849+ @ Override
850+ public void finishCallDistributedProcedure (ConnectorSession session , ConnectorDistributedProcedureHandle procedureHandle , QualifiedObjectName procedureName , Collection <Slice > fragments )
851+ {
852+ Procedure procedure = procedureRegistry .resolve (
853+ new ConnectorId (procedureName .getCatalogName ()),
854+ new SchemaTableName (
855+ procedureName .getSchemaName (),
856+ procedureName .getObjectName ()));
857+ verify (procedure instanceof DistributedProcedure , "procedure must be DistributedProcedure" );
858+ ((DistributedProcedure ) procedure ).getFinishCallDistributedProcedure ().finish (transactionContext , procedureHandle , fragments );
859+ transactionContext .getTransaction ().commitTransaction ();
860+ transactionContext .destroy ();
861+ }
862+
808863 @ Override
809864 public ConnectorTableHandle beginDelete (ConnectorSession session , ConnectorTableHandle tableHandle )
810865 {
@@ -825,7 +880,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
825880 }
826881
827882 validateTableMode (session , icebergTable );
828- transaction = icebergTable .newTransaction ();
883+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
829884
830885 return handle ;
831886 }
@@ -836,7 +891,7 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
836891 IcebergTableHandle handle = (IcebergTableHandle ) tableHandle ;
837892 Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
838893
839- RowDelta rowDelta = transaction .newRowDelta ();
894+ RowDelta rowDelta = transactionContext . getTransaction () .newRowDelta ();
840895
841896 List <CommitTaskData > commitTasks = fragments .stream ()
842897 .map (slice -> commitTaskCodec .fromJson (slice .getBytes ()))
@@ -873,7 +928,8 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
873928 }
874929
875930 rowDelta .commit ();
876- transaction .commitTransaction ();
931+ transactionContext .getTransaction ().commitTransaction ();
932+ transactionContext .destroy ();
877933 }
878934
879935 @ Override
@@ -951,11 +1007,12 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl
9511007 */
9521008 private OptionalLong removeScanFiles (Table icebergTable , TupleDomain <IcebergColumnHandle > predicate )
9531009 {
954- transaction = icebergTable .newTransaction ();
955- DeleteFiles deleteFiles = transaction .newDelete ()
1010+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
1011+ DeleteFiles deleteFiles = transactionContext . getTransaction () .newDelete ()
9561012 .deleteFromRowFilter (toIcebergExpression (predicate ));
9571013 deleteFiles .commit ();
958- transaction .commitTransaction ();
1014+ transactionContext .getTransaction ().commitTransaction ();
1015+ transactionContext .destroy ();
9591016
9601017 Map <String , String > summary = icebergTable .currentSnapshot ().summary ();
9611018 long deletedRecords = Long .parseLong (summary .getOrDefault (DELETED_RECORDS_PROP , "0" ));
0 commit comments