1515
1616import com .facebook .airlift .json .JsonCodec ;
1717import com .facebook .airlift .log .Logger ;
18+ import com .facebook .presto .common .QualifiedObjectName ;
1819import com .facebook .presto .common .Subfield ;
1920import com .facebook .presto .common .predicate .TupleDomain ;
2021import com .facebook .presto .common .type .BigintType ;
2728import com .facebook .presto .iceberg .changelog .ChangelogUtil ;
2829import com .facebook .presto .spi .ColumnHandle ;
2930import com .facebook .presto .spi .ColumnMetadata ;
31+ import com .facebook .presto .spi .ConnectorDistributedProcedureHandle ;
32+ import com .facebook .presto .spi .ConnectorId ;
3033import com .facebook .presto .spi .ConnectorInsertTableHandle ;
3134import com .facebook .presto .spi .ConnectorNewTableLayout ;
3235import com .facebook .presto .spi .ConnectorOutputTableHandle ;
3336import com .facebook .presto .spi .ConnectorSession ;
37+ import com .facebook .presto .spi .ConnectorSplitSource ;
3438import com .facebook .presto .spi .ConnectorTableHandle ;
3539import com .facebook .presto .spi .ConnectorTableLayout ;
3640import com .facebook .presto .spi .ConnectorTableLayoutHandle ;
4751import com .facebook .presto .spi .connector .ConnectorOutputMetadata ;
4852import com .facebook .presto .spi .connector .ConnectorTableVersion ;
4953import com .facebook .presto .spi .function .StandardFunctionResolution ;
54+ import com .facebook .presto .spi .procedure .DistributedProcedure ;
55+ import com .facebook .presto .spi .procedure .IProcedureRegistry ;
56+ import com .facebook .presto .spi .procedure .Procedure ;
5057import com .facebook .presto .spi .relation .RowExpression ;
5158import com .facebook .presto .spi .relation .RowExpressionService ;
5259import com .facebook .presto .spi .statistics .ColumnStatisticMetadata ;
7077import org .apache .iceberg .FileFormat ;
7178import org .apache .iceberg .FileMetadata ;
7279import org .apache .iceberg .FileScanTask ;
80+ import org .apache .iceberg .ManifestFile ;
7381import org .apache .iceberg .PartitionField ;
7482import org .apache .iceberg .PartitionSpec ;
7583import org .apache .iceberg .PartitionSpecParser ;
@@ -160,23 +168,26 @@ public abstract class IcebergAbstractMetadata
160168 implements ConnectorMetadata
161169{
162170 protected final TypeManager typeManager ;
171+ protected final IProcedureRegistry procedureRegistry ;
163172 protected final JsonCodec <CommitTaskData > commitTaskCodec ;
164173 protected final NodeVersion nodeVersion ;
165174 protected final RowExpressionService rowExpressionService ;
166- protected Transaction transaction ;
175+ protected IcebergTransactionContext transactionContext ;
167176
168177 private final StandardFunctionResolution functionResolution ;
169178 private final ConcurrentMap <SchemaTableName , Table > icebergTables = new ConcurrentHashMap <>();
170179 private static final Logger log = Logger .get (IcebergAbstractMetadata .class );
171180
172181 public IcebergAbstractMetadata (
173182 TypeManager typeManager ,
183+ IProcedureRegistry procedureRegistry ,
174184 StandardFunctionResolution functionResolution ,
175185 RowExpressionService rowExpressionService ,
176186 JsonCodec <CommitTaskData > commitTaskCodec ,
177187 NodeVersion nodeVersion )
178188 {
179189 this .typeManager = requireNonNull (typeManager , "typeManager is null" );
190+ this .procedureRegistry = requireNonNull (procedureRegistry , "procedureRegistry is null" );
180191 this .commitTaskCodec = requireNonNull (commitTaskCodec , "commitTaskCodec is null" );
181192 this .functionResolution = requireNonNull (functionResolution , "functionResolution is null" );
182193 this .rowExpressionService = requireNonNull (rowExpressionService , "rowExpressionService is null" );
@@ -198,6 +209,11 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName
198209
199210 public abstract void unregisterTable (ConnectorSession clientSession , SchemaTableName schemaTableName );
200211
212+ public Optional <ConnectorSplitSource > getSplitSourceInCurrentCallProcedureTransaction ()
213+ {
214+ return transactionContext == null ? Optional .empty () : transactionContext .getConnectorSplitSource ();
215+ }
216+
201217 /**
202218 * This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
203219 */
@@ -413,7 +429,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
413429
414430 protected ConnectorInsertTableHandle beginIcebergTableInsert (IcebergTableHandle table , Table icebergTable )
415431 {
416- transaction = icebergTable .newTransaction ();
432+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
417433
418434 return new IcebergWritableTableHandle (
419435 table .getSchemaName (),
@@ -430,12 +446,13 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(IcebergTableHandle
430446 public Optional <ConnectorOutputMetadata > finishInsert (ConnectorSession session , ConnectorInsertTableHandle insertHandle , Collection <Slice > fragments , Collection <ComputedStatistics > computedStatistics )
431447 {
432448 if (fragments .isEmpty ()) {
433- transaction .commitTransaction ();
449+ transactionContext .getTransaction ().commitTransaction ();
450+ transactionContext .destroy ();
434451 return Optional .empty ();
435452 }
436453
437454 IcebergWritableTableHandle table = (IcebergWritableTableHandle ) insertHandle ;
438- Table icebergTable = transaction .table ();
455+ Table icebergTable = transactionContext . getTransaction () .table ();
439456
440457 List <CommitTaskData > commitTasks = fragments .stream ()
441458 .map (slice -> commitTaskCodec .fromJson (slice .getBytes ()))
@@ -446,7 +463,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
446463 icebergTable .schema ().findType (field .sourceId ())))
447464 .toArray (Type []::new );
448465
449- AppendFiles appendFiles = transaction .newFastAppend ();
466+ AppendFiles appendFiles = transactionContext . getTransaction () .newFastAppend ();
450467 for (CommitTaskData task : commitTasks ) {
451468 DataFiles .Builder builder = DataFiles .builder (icebergTable .spec ())
452469 .withPath (task .getPath ())
@@ -464,7 +481,8 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
464481 }
465482
466483 appendFiles .commit ();
467- transaction .commitTransaction ();
484+ transactionContext .getTransaction ().commitTransaction ();
485+ transactionContext .destroy ();
468486
469487 return Optional .of (new HiveWrittenPartitions (commitTasks .stream ()
470488 .map (CommitTaskData ::getPath )
@@ -755,6 +773,44 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
755773 }
756774 }
757775
776+ @ Override
777+ public ConnectorDistributedProcedureHandle beginCallDistributedProcedure (
778+ ConnectorSession session ,
779+ QualifiedObjectName procedureName ,
780+ ConnectorTableLayoutHandle tableLayoutHandle ,
781+ Object [] arguments )
782+ {
783+ IcebergTableHandle handle = ((IcebergTableLayoutHandle ) tableLayoutHandle ).getTable ();
784+ Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
785+
786+ if (handle .isSnapshotSpecified ()) {
787+ throw new PrestoException (NOT_SUPPORTED , "This connector do not allow table execute at specified snapshot" );
788+ }
789+
790+ transactionContext = new IcebergTransactionContext (Optional .of (icebergTable ), icebergTable .newTransaction ());
791+ Procedure procedure = procedureRegistry .resolve (
792+ new ConnectorId (procedureName .getCatalogName ()),
793+ new SchemaTableName (
794+ procedureName .getSchemaName (),
795+ procedureName .getObjectName ()));
796+ verify (procedure instanceof DistributedProcedure , "procedure must be DistributedProcedure" );
797+ return ((DistributedProcedure ) procedure ).getBeginCallDistributedProcedure ().begin (session , transactionContext , tableLayoutHandle , arguments );
798+ }
799+
800+ @ Override
801+ public void finishCallDistributedProcedure (ConnectorSession session , ConnectorDistributedProcedureHandle tableHandle , QualifiedObjectName procedureName , Collection <Slice > fragments )
802+ {
803+ Procedure procedure = procedureRegistry .resolve (
804+ new ConnectorId (procedureName .getCatalogName ()),
805+ new SchemaTableName (
806+ procedureName .getSchemaName (),
807+ procedureName .getObjectName ()));
808+ verify (procedure instanceof DistributedProcedure , "procedure must be DistributedProcedure" );
809+ ((DistributedProcedure ) procedure ).getFinishCallDistributedProcedure ().finish (transactionContext , tableHandle , fragments );
810+ transactionContext .getTransaction ().commitTransaction ();
811+ transactionContext .destroy ();
812+ }
813+
758814 @ Override
759815 public ConnectorTableHandle beginDelete (ConnectorSession session , ConnectorTableHandle tableHandle )
760816 {
@@ -775,7 +831,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
775831 }
776832
777833 validateTableMode (session , icebergTable );
778- transaction = icebergTable .newTransaction ();
834+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
779835
780836 return handle ;
781837 }
@@ -786,7 +842,7 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
786842 IcebergTableHandle handle = (IcebergTableHandle ) tableHandle ;
787843 Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
788844
789- RowDelta rowDelta = transaction .newRowDelta ();
845+ RowDelta rowDelta = transactionContext . getTransaction () .newRowDelta ();
790846
791847 List <CommitTaskData > commitTasks = fragments .stream ()
792848 .map (slice -> commitTaskCodec .fromJson (slice .getBytes ()))
@@ -823,7 +879,8 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
823879 }
824880
825881 rowDelta .commit ();
826- transaction .commitTransaction ();
882+ transactionContext .getTransaction ().commitTransaction ();
883+ transactionContext .destroy ();
827884 }
828885
829886 @ Override
@@ -857,7 +914,15 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa
857914 Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
858915
859916 boolean supportsMetadataDelete = true ;
860- for (PartitionSpec spec : icebergTable .specs ().values ()) {
917+ // Get partition specs that really need to be checked
918+ Set <PartitionSpec > partitionSpecs = handle .getIcebergTableName ().getSnapshotId ().map (
919+ snapshot -> icebergTable .snapshot (snapshot ).allManifests (icebergTable .io ()).stream ()
920+ .map (ManifestFile ::partitionSpecId )
921+ .map (icebergTable .specs ()::get )
922+ .collect (toImmutableSet ()))
923+ .orElseGet (() -> ImmutableSet .copyOf (icebergTable .specs ().values ())); // No snapshot, so no data. This case doesn't matter.
924+
925+ for (PartitionSpec spec : partitionSpecs ) {
861926 // Currently we do not support delete when any partition columns in predicate is not transform by identity()
862927 Set <Integer > partitionColumnSourceIds = spec .fields ().stream ()
863928 .filter (field -> field .transform ().isIdentity ())
@@ -913,15 +978,16 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl
913978 */
914979 private long removeScanFiles (Table icebergTable , Iterable <FileScanTask > scan )
915980 {
916- transaction = icebergTable .newTransaction ();
917- DeleteFiles deletes = transaction .newDelete ();
981+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
982+ DeleteFiles deletes = transactionContext . getTransaction () .newDelete ();
918983 AtomicLong rowsDeleted = new AtomicLong (0L );
919984 scan .forEach (t -> {
920985 deletes .deleteFile (t .file ());
921986 rowsDeleted .addAndGet (t .estimatedRowsCount ());
922987 });
923988 deletes .commit ();
924- transaction .commitTransaction ();
989+ transactionContext .getTransaction ().commitTransaction ();
990+ transactionContext .destroy ();
925991 return rowsDeleted .get ();
926992 }
927993
0 commit comments