1515
1616import com .facebook .airlift .json .JsonCodec ;
1717import com .facebook .airlift .log .Logger ;
18+ import com .facebook .presto .common .QualifiedObjectName ;
1819import com .facebook .presto .common .Subfield ;
1920import com .facebook .presto .common .predicate .TupleDomain ;
2021import com .facebook .presto .common .type .BigintType ;
2728import com .facebook .presto .iceberg .changelog .ChangelogUtil ;
2829import com .facebook .presto .spi .ColumnHandle ;
2930import com .facebook .presto .spi .ColumnMetadata ;
31+ import com .facebook .presto .spi .ConnectorDistributedProcedureHandle ;
32+ import com .facebook .presto .spi .ConnectorId ;
3033import com .facebook .presto .spi .ConnectorInsertTableHandle ;
3134import com .facebook .presto .spi .ConnectorNewTableLayout ;
3235import com .facebook .presto .spi .ConnectorOutputTableHandle ;
3336import com .facebook .presto .spi .ConnectorSession ;
37+ import com .facebook .presto .spi .ConnectorSplitSource ;
3438import com .facebook .presto .spi .ConnectorTableHandle ;
3539import com .facebook .presto .spi .ConnectorTableLayout ;
3640import com .facebook .presto .spi .ConnectorTableLayoutHandle ;
4751import com .facebook .presto .spi .connector .ConnectorOutputMetadata ;
4852import com .facebook .presto .spi .connector .ConnectorTableVersion ;
4953import com .facebook .presto .spi .function .StandardFunctionResolution ;
54+ import com .facebook .presto .spi .procedure .DistributedProcedure ;
55+ import com .facebook .presto .spi .procedure .IProcedureRegistry ;
56+ import com .facebook .presto .spi .procedure .Procedure ;
5057import com .facebook .presto .spi .relation .RowExpression ;
5158import com .facebook .presto .spi .relation .RowExpressionService ;
5259import com .facebook .presto .spi .statistics .ColumnStatisticMetadata ;
@@ -160,23 +167,26 @@ public abstract class IcebergAbstractMetadata
160167 implements ConnectorMetadata
161168{
162169 protected final TypeManager typeManager ;
170+ protected final IProcedureRegistry procedureRegistry ;
163171 protected final JsonCodec <CommitTaskData > commitTaskCodec ;
164172 protected final NodeVersion nodeVersion ;
165173 protected final RowExpressionService rowExpressionService ;
166- protected Transaction transaction ;
174+ protected IcebergTransactionContext transactionContext ;
167175
168176 private final StandardFunctionResolution functionResolution ;
169177 private final ConcurrentMap <SchemaTableName , Table > icebergTables = new ConcurrentHashMap <>();
170178 private static final Logger log = Logger .get (IcebergAbstractMetadata .class );
171179
172180 public IcebergAbstractMetadata (
173181 TypeManager typeManager ,
182+ IProcedureRegistry procedureRegistry ,
174183 StandardFunctionResolution functionResolution ,
175184 RowExpressionService rowExpressionService ,
176185 JsonCodec <CommitTaskData > commitTaskCodec ,
177186 NodeVersion nodeVersion )
178187 {
179188 this .typeManager = requireNonNull (typeManager , "typeManager is null" );
189+ this .procedureRegistry = requireNonNull (procedureRegistry , "procedureRegistry is null" );
180190 this .commitTaskCodec = requireNonNull (commitTaskCodec , "commitTaskCodec is null" );
181191 this .functionResolution = requireNonNull (functionResolution , "functionResolution is null" );
182192 this .rowExpressionService = requireNonNull (rowExpressionService , "rowExpressionService is null" );
@@ -198,6 +208,11 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName
198208
199209 public abstract void unregisterTable (ConnectorSession clientSession , SchemaTableName schemaTableName );
200210
211+ public Optional <ConnectorSplitSource > getSplitSourceInCurrentCallProcedureTransaction ()
212+ {
213+ return transactionContext == null ? Optional .empty () : transactionContext .getConnectorSplitSource ();
214+ }
215+
201216 /**
202217 * This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
203218 */
@@ -427,7 +442,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
427442
428443 protected ConnectorInsertTableHandle beginIcebergTableInsert (IcebergTableHandle table , Table icebergTable )
429444 {
430- transaction = icebergTable .newTransaction ();
445+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
431446
432447 return new IcebergWritableTableHandle (
433448 table .getSchemaName (),
@@ -444,12 +459,13 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(IcebergTableHandle
444459 public Optional <ConnectorOutputMetadata > finishInsert (ConnectorSession session , ConnectorInsertTableHandle insertHandle , Collection <Slice > fragments , Collection <ComputedStatistics > computedStatistics )
445460 {
446461 if (fragments .isEmpty ()) {
447- transaction .commitTransaction ();
462+ transactionContext .getTransaction ().commitTransaction ();
463+ transactionContext .destroy ();
448464 return Optional .empty ();
449465 }
450466
451467 IcebergWritableTableHandle table = (IcebergWritableTableHandle ) insertHandle ;
452- Table icebergTable = transaction .table ();
468+ Table icebergTable = transactionContext . getTransaction () .table ();
453469
454470 List <CommitTaskData > commitTasks = fragments .stream ()
455471 .map (slice -> commitTaskCodec .fromJson (slice .getBytes ()))
@@ -460,7 +476,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
460476 icebergTable .schema ().findType (field .sourceId ())))
461477 .toArray (Type []::new );
462478
463- AppendFiles appendFiles = transaction .newFastAppend ();
479+ AppendFiles appendFiles = transactionContext . getTransaction () .newFastAppend ();
464480 for (CommitTaskData task : commitTasks ) {
465481 DataFiles .Builder builder = DataFiles .builder (icebergTable .spec ())
466482 .withPath (task .getPath ())
@@ -478,7 +494,8 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
478494 }
479495
480496 appendFiles .commit ();
481- transaction .commitTransaction ();
497+ transactionContext .getTransaction ().commitTransaction ();
498+ transactionContext .destroy ();
482499
483500 return Optional .of (new HiveWrittenPartitions (commitTasks .stream ()
484501 .map (CommitTaskData ::getPath )
@@ -768,6 +785,44 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
768785 removeScanFiles (icebergTable , TupleDomain .all ());
769786 }
770787
788+ @ Override
789+ public ConnectorDistributedProcedureHandle beginCallDistributedProcedure (
790+ ConnectorSession session ,
791+ QualifiedObjectName procedureName ,
792+ ConnectorTableLayoutHandle tableLayoutHandle ,
793+ Object [] arguments )
794+ {
795+ IcebergTableHandle handle = ((IcebergTableLayoutHandle ) tableLayoutHandle ).getTable ();
796+ Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
797+
798+ if (handle .isSnapshotSpecified ()) {
799+ throw new PrestoException (NOT_SUPPORTED , "This connector do not allow table execute at specified snapshot" );
800+ }
801+
802+ transactionContext = new IcebergTransactionContext (Optional .of (icebergTable ), icebergTable .newTransaction ());
803+ Procedure procedure = procedureRegistry .resolve (
804+ new ConnectorId (procedureName .getCatalogName ()),
805+ new SchemaTableName (
806+ procedureName .getSchemaName (),
807+ procedureName .getObjectName ()));
808+ verify (procedure instanceof DistributedProcedure , "procedure must be DistributedProcedure" );
809+ return ((DistributedProcedure ) procedure ).getBeginCallDistributedProcedure ().begin (session , transactionContext , tableLayoutHandle , arguments );
810+ }
811+
812+ @ Override
813+ public void finishCallDistributedProcedure (ConnectorSession session , ConnectorDistributedProcedureHandle tableHandle , QualifiedObjectName procedureName , Collection <Slice > fragments )
814+ {
815+ Procedure procedure = procedureRegistry .resolve (
816+ new ConnectorId (procedureName .getCatalogName ()),
817+ new SchemaTableName (
818+ procedureName .getSchemaName (),
819+ procedureName .getObjectName ()));
820+ verify (procedure instanceof DistributedProcedure , "procedure must be DistributedProcedure" );
821+ ((DistributedProcedure ) procedure ).getFinishCallDistributedProcedure ().finish (transactionContext , tableHandle , fragments );
822+ transactionContext .getTransaction ().commitTransaction ();
823+ transactionContext .destroy ();
824+ }
825+
771826 @ Override
772827 public ConnectorTableHandle beginDelete (ConnectorSession session , ConnectorTableHandle tableHandle )
773828 {
@@ -788,7 +843,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
788843 }
789844
790845 validateTableMode (session , icebergTable );
791- transaction = icebergTable .newTransaction ();
846+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
792847
793848 return handle ;
794849 }
@@ -799,7 +854,7 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
799854 IcebergTableHandle handle = (IcebergTableHandle ) tableHandle ;
800855 Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
801856
802- RowDelta rowDelta = transaction .newRowDelta ();
857+ RowDelta rowDelta = transactionContext . getTransaction () .newRowDelta ();
803858
804859 List <CommitTaskData > commitTasks = fragments .stream ()
805860 .map (slice -> commitTaskCodec .fromJson (slice .getBytes ()))
@@ -836,7 +891,8 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
836891 }
837892
838893 rowDelta .commit ();
839- transaction .commitTransaction ();
894+ transactionContext .getTransaction ().commitTransaction ();
895+ transactionContext .destroy ();
840896 }
841897
842898 @ Override
@@ -914,11 +970,12 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl
914970 */
915971 private OptionalLong removeScanFiles (Table icebergTable , TupleDomain <IcebergColumnHandle > predicate )
916972 {
917- transaction = icebergTable .newTransaction ();
918- DeleteFiles deleteFiles = transaction .newDelete ()
973+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
974+ DeleteFiles deleteFiles = transactionContext . getTransaction () .newDelete ()
919975 .deleteFromRowFilter (toIcebergExpression (predicate ));
920976 deleteFiles .commit ();
921- transaction .commitTransaction ();
977+ transactionContext .getTransaction ().commitTransaction ();
978+ transactionContext .destroy ();
922979
923980 Map <String , String > summary = icebergTable .currentSnapshot ().summary ();
924981 long deletedRecords = Long .parseLong (summary .getOrDefault (DELETED_RECORDS_PROP , "0" ));
0 commit comments