1515
1616import com .facebook .airlift .json .JsonCodec ;
1717import com .facebook .airlift .log .Logger ;
18+ import com .facebook .presto .common .QualifiedObjectName ;
1819import com .facebook .presto .common .Subfield ;
1920import com .facebook .presto .common .predicate .TupleDomain ;
2021import com .facebook .presto .common .type .BigintType ;
2728import com .facebook .presto .iceberg .changelog .ChangelogUtil ;
2829import com .facebook .presto .spi .ColumnHandle ;
2930import com .facebook .presto .spi .ColumnMetadata ;
31+ import com .facebook .presto .spi .ConnectorDistributedProcedureHandle ;
32+ import com .facebook .presto .spi .ConnectorId ;
3033import com .facebook .presto .spi .ConnectorInsertTableHandle ;
3134import com .facebook .presto .spi .ConnectorNewTableLayout ;
3235import com .facebook .presto .spi .ConnectorOutputTableHandle ;
3336import com .facebook .presto .spi .ConnectorSession ;
37+ import com .facebook .presto .spi .ConnectorSplitSource ;
3438import com .facebook .presto .spi .ConnectorTableHandle ;
3539import com .facebook .presto .spi .ConnectorTableLayout ;
3640import com .facebook .presto .spi .ConnectorTableLayoutHandle ;
4953import com .facebook .presto .spi .connector .ConnectorTableVersion .VersionOperator ;
5054import com .facebook .presto .spi .connector .ConnectorTableVersion .VersionType ;
5155import com .facebook .presto .spi .function .StandardFunctionResolution ;
56+ import com .facebook .presto .spi .procedure .DistributedProcedure ;
57+ import com .facebook .presto .spi .procedure .IProcedureRegistry ;
58+ import com .facebook .presto .spi .procedure .Procedure ;
5259import com .facebook .presto .spi .relation .RowExpression ;
5360import com .facebook .presto .spi .relation .RowExpressionService ;
5461import com .facebook .presto .spi .statistics .ColumnStatisticMetadata ;
@@ -165,23 +172,26 @@ public abstract class IcebergAbstractMetadata
165172 implements ConnectorMetadata
166173{
167174 protected final TypeManager typeManager ;
175+ protected final IProcedureRegistry procedureRegistry ;
168176 protected final JsonCodec <CommitTaskData > commitTaskCodec ;
169177 protected final NodeVersion nodeVersion ;
170178 protected final RowExpressionService rowExpressionService ;
171- protected Transaction transaction ;
179+ protected IcebergTransactionContext transactionContext ;
172180
173181 private final StandardFunctionResolution functionResolution ;
174182 private final ConcurrentMap <SchemaTableName , Table > icebergTables = new ConcurrentHashMap <>();
175183 private static final Logger log = Logger .get (IcebergAbstractMetadata .class );
176184
177185 public IcebergAbstractMetadata (
178186 TypeManager typeManager ,
187+ IProcedureRegistry procedureRegistry ,
179188 StandardFunctionResolution functionResolution ,
180189 RowExpressionService rowExpressionService ,
181190 JsonCodec <CommitTaskData > commitTaskCodec ,
182191 NodeVersion nodeVersion )
183192 {
184193 this .typeManager = requireNonNull (typeManager , "typeManager is null" );
194+ this .procedureRegistry = requireNonNull (procedureRegistry , "procedureRegistry is null" );
185195 this .commitTaskCodec = requireNonNull (commitTaskCodec , "commitTaskCodec is null" );
186196 this .functionResolution = requireNonNull (functionResolution , "functionResolution is null" );
187197 this .rowExpressionService = requireNonNull (rowExpressionService , "rowExpressionService is null" );
@@ -203,6 +213,11 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName
203213
204214 public abstract void unregisterTable (ConnectorSession clientSession , SchemaTableName schemaTableName );
205215
216+ public Optional <ConnectorSplitSource > getSplitSourceInCurrentCallProcedureTransaction ()
217+ {
218+ return transactionContext == null ? Optional .empty () : transactionContext .getConnectorSplitSource ();
219+ }
220+
206221 /**
207222 * This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
208223 */
@@ -432,7 +447,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
432447
433448 protected ConnectorInsertTableHandle beginIcebergTableInsert (IcebergTableHandle table , Table icebergTable )
434449 {
435- transaction = icebergTable .newTransaction ();
450+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
436451
437452 return new IcebergWritableTableHandle (
438453 table .getSchemaName (),
@@ -449,12 +464,13 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(IcebergTableHandle
449464 public Optional <ConnectorOutputMetadata > finishInsert (ConnectorSession session , ConnectorInsertTableHandle insertHandle , Collection <Slice > fragments , Collection <ComputedStatistics > computedStatistics )
450465 {
451466 if (fragments .isEmpty ()) {
452- transaction .commitTransaction ();
467+ transactionContext .getTransaction ().commitTransaction ();
468+ transactionContext .destroy ();
453469 return Optional .empty ();
454470 }
455471
456472 IcebergWritableTableHandle table = (IcebergWritableTableHandle ) insertHandle ;
457- Table icebergTable = transaction .table ();
473+ Table icebergTable = transactionContext . getTransaction () .table ();
458474
459475 List <CommitTaskData > commitTasks = fragments .stream ()
460476 .map (slice -> commitTaskCodec .fromJson (slice .getBytes ()))
@@ -465,7 +481,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
465481 icebergTable .schema ().findType (field .sourceId ())))
466482 .toArray (Type []::new );
467483
468- AppendFiles appendFiles = transaction .newFastAppend ();
484+ AppendFiles appendFiles = transactionContext . getTransaction () .newFastAppend ();
469485 for (CommitTaskData task : commitTasks ) {
470486 DataFiles .Builder builder = DataFiles .builder (icebergTable .spec ())
471487 .withPath (task .getPath ())
@@ -483,7 +499,8 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
483499 }
484500
485501 appendFiles .commit ();
486- transaction .commitTransaction ();
502+ transactionContext .getTransaction ().commitTransaction ();
503+ transactionContext .destroy ();
487504
488505 return Optional .of (new HiveWrittenPartitions (commitTasks .stream ()
489506 .map (CommitTaskData ::getPath )
@@ -789,6 +806,44 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
789806 removeScanFiles (icebergTable , TupleDomain .all ());
790807 }
791808
809+ @ Override
810+ public ConnectorDistributedProcedureHandle beginCallDistributedProcedure (
811+ ConnectorSession session ,
812+ QualifiedObjectName procedureName ,
813+ ConnectorTableLayoutHandle tableLayoutHandle ,
814+ Object [] arguments )
815+ {
816+ IcebergTableHandle handle = ((IcebergTableLayoutHandle ) tableLayoutHandle ).getTable ();
817+ Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
818+
819+ if (handle .isSnapshotSpecified ()) {
820+ throw new PrestoException (NOT_SUPPORTED , "This connector do not allow table execute at specified snapshot" );
821+ }
822+
823+ transactionContext = new IcebergTransactionContext (Optional .of (icebergTable ), icebergTable .newTransaction ());
824+ Procedure procedure = procedureRegistry .resolve (
825+ new ConnectorId (procedureName .getCatalogName ()),
826+ new SchemaTableName (
827+ procedureName .getSchemaName (),
828+ procedureName .getObjectName ()));
829+ verify (procedure instanceof DistributedProcedure , "procedure must be DistributedProcedure" );
830+ return ((DistributedProcedure ) procedure ).getBeginCallDistributedProcedure ().begin (session , transactionContext , tableLayoutHandle , arguments );
831+ }
832+
833+ @ Override
834+ public void finishCallDistributedProcedure (ConnectorSession session , ConnectorDistributedProcedureHandle procedureHandle , QualifiedObjectName procedureName , Collection <Slice > fragments )
835+ {
836+ Procedure procedure = procedureRegistry .resolve (
837+ new ConnectorId (procedureName .getCatalogName ()),
838+ new SchemaTableName (
839+ procedureName .getSchemaName (),
840+ procedureName .getObjectName ()));
841+ verify (procedure instanceof DistributedProcedure , "procedure must be DistributedProcedure" );
842+ ((DistributedProcedure ) procedure ).getFinishCallDistributedProcedure ().finish (transactionContext , procedureHandle , fragments );
843+ transactionContext .getTransaction ().commitTransaction ();
844+ transactionContext .destroy ();
845+ }
846+
792847 @ Override
793848 public ConnectorTableHandle beginDelete (ConnectorSession session , ConnectorTableHandle tableHandle )
794849 {
@@ -809,7 +864,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
809864 }
810865
811866 validateTableMode (session , icebergTable );
812- transaction = icebergTable .newTransaction ();
867+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
813868
814869 return handle ;
815870 }
@@ -820,7 +875,7 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
820875 IcebergTableHandle handle = (IcebergTableHandle ) tableHandle ;
821876 Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
822877
823- RowDelta rowDelta = transaction .newRowDelta ();
878+ RowDelta rowDelta = transactionContext . getTransaction () .newRowDelta ();
824879
825880 List <CommitTaskData > commitTasks = fragments .stream ()
826881 .map (slice -> commitTaskCodec .fromJson (slice .getBytes ()))
@@ -857,7 +912,8 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
857912 }
858913
859914 rowDelta .commit ();
860- transaction .commitTransaction ();
915+ transactionContext .getTransaction ().commitTransaction ();
916+ transactionContext .destroy ();
861917 }
862918
863919 @ Override
@@ -935,11 +991,12 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl
935991 */
936992 private OptionalLong removeScanFiles (Table icebergTable , TupleDomain <IcebergColumnHandle > predicate )
937993 {
938- transaction = icebergTable .newTransaction ();
939- DeleteFiles deleteFiles = transaction .newDelete ()
994+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
995+ DeleteFiles deleteFiles = transactionContext . getTransaction () .newDelete ()
940996 .deleteFromRowFilter (toIcebergExpression (predicate ));
941997 deleteFiles .commit ();
942- transaction .commitTransaction ();
998+ transactionContext .getTransaction ().commitTransaction ();
999+ transactionContext .destroy ();
9431000
9441001 Map <String , String > summary = icebergTable .currentSnapshot ().summary ();
9451002 long deletedRecords = Long .parseLong (summary .getOrDefault (DELETED_RECORDS_PROP , "0" ));
0 commit comments