1515
1616import com .facebook .airlift .json .JsonCodec ;
1717import com .facebook .airlift .log .Logger ;
18+ import com .facebook .presto .common .QualifiedObjectName ;
1819import com .facebook .presto .common .Subfield ;
1920import com .facebook .presto .common .predicate .TupleDomain ;
2021import com .facebook .presto .common .type .BigintType ;
2728import com .facebook .presto .iceberg .changelog .ChangelogUtil ;
2829import com .facebook .presto .spi .ColumnHandle ;
2930import com .facebook .presto .spi .ColumnMetadata ;
31+ import com .facebook .presto .spi .ConnectorDistributedProcedureHandle ;
32+ import com .facebook .presto .spi .ConnectorId ;
3033import com .facebook .presto .spi .ConnectorInsertTableHandle ;
3134import com .facebook .presto .spi .ConnectorNewTableLayout ;
3235import com .facebook .presto .spi .ConnectorOutputTableHandle ;
3336import com .facebook .presto .spi .ConnectorSession ;
37+ import com .facebook .presto .spi .ConnectorSplitSource ;
3438import com .facebook .presto .spi .ConnectorTableHandle ;
3539import com .facebook .presto .spi .ConnectorTableLayout ;
3640import com .facebook .presto .spi .ConnectorTableLayoutHandle ;
4751import com .facebook .presto .spi .connector .ConnectorOutputMetadata ;
4852import com .facebook .presto .spi .connector .ConnectorTableVersion ;
4953import com .facebook .presto .spi .function .StandardFunctionResolution ;
54+ import com .facebook .presto .spi .procedure .DistributedProcedure ;
55+ import com .facebook .presto .spi .procedure .IProcedureRegistry ;
56+ import com .facebook .presto .spi .procedure .Procedure ;
5057import com .facebook .presto .spi .relation .RowExpression ;
5158import com .facebook .presto .spi .relation .RowExpressionService ;
5259import com .facebook .presto .spi .statistics .ColumnStatisticMetadata ;
7077import org .apache .iceberg .FileFormat ;
7178import org .apache .iceberg .FileMetadata ;
7279import org .apache .iceberg .FileScanTask ;
80+ import org .apache .iceberg .ManifestFile ;
7381import org .apache .iceberg .PartitionField ;
7482import org .apache .iceberg .PartitionSpec ;
7583import org .apache .iceberg .PartitionSpecParser ;
@@ -161,23 +169,26 @@ public abstract class IcebergAbstractMetadata
161169 implements ConnectorMetadata
162170{
163171 protected final TypeManager typeManager ;
172+ protected final IProcedureRegistry procedureRegistry ;
164173 protected final JsonCodec <CommitTaskData > commitTaskCodec ;
165174 protected final NodeVersion nodeVersion ;
166175 protected final RowExpressionService rowExpressionService ;
167- protected Transaction transaction ;
176+ protected IcebergTransactionContext transactionContext ;
168177
169178 private final StandardFunctionResolution functionResolution ;
170179 private final ConcurrentMap <SchemaTableName , Table > icebergTables = new ConcurrentHashMap <>();
171180 private static final Logger log = Logger .get (IcebergAbstractMetadata .class );
172181
173182 public IcebergAbstractMetadata (
174183 TypeManager typeManager ,
184+ IProcedureRegistry procedureRegistry ,
175185 StandardFunctionResolution functionResolution ,
176186 RowExpressionService rowExpressionService ,
177187 JsonCodec <CommitTaskData > commitTaskCodec ,
178188 NodeVersion nodeVersion )
179189 {
180190 this .typeManager = requireNonNull (typeManager , "typeManager is null" );
191+ this .procedureRegistry = requireNonNull (procedureRegistry , "procedureRegistry is null" );
181192 this .commitTaskCodec = requireNonNull (commitTaskCodec , "commitTaskCodec is null" );
182193 this .functionResolution = requireNonNull (functionResolution , "functionResolution is null" );
183194 this .rowExpressionService = requireNonNull (rowExpressionService , "rowExpressionService is null" );
@@ -199,6 +210,11 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName
199210
200211 public abstract void unregisterTable (ConnectorSession clientSession , SchemaTableName schemaTableName );
201212
213+ public Optional <ConnectorSplitSource > getSplitSourceInCurrentCallProcedureTransaction ()
214+ {
215+ return transactionContext == null ? Optional .empty () : transactionContext .getConnectorSplitSource ();
216+ }
217+
202218 /**
203219 * This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
204220 */
@@ -428,7 +444,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
428444
429445 protected ConnectorInsertTableHandle beginIcebergTableInsert (IcebergTableHandle table , Table icebergTable )
430446 {
431- transaction = icebergTable .newTransaction ();
447+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
432448
433449 return new IcebergWritableTableHandle (
434450 table .getSchemaName (),
@@ -445,12 +461,13 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(IcebergTableHandle
445461 public Optional <ConnectorOutputMetadata > finishInsert (ConnectorSession session , ConnectorInsertTableHandle insertHandle , Collection <Slice > fragments , Collection <ComputedStatistics > computedStatistics )
446462 {
447463 if (fragments .isEmpty ()) {
448- transaction .commitTransaction ();
464+ transactionContext .getTransaction ().commitTransaction ();
465+ transactionContext .destroy ();
449466 return Optional .empty ();
450467 }
451468
452469 IcebergWritableTableHandle table = (IcebergWritableTableHandle ) insertHandle ;
453- Table icebergTable = transaction .table ();
470+ Table icebergTable = transactionContext . getTransaction () .table ();
454471
455472 List <CommitTaskData > commitTasks = fragments .stream ()
456473 .map (slice -> commitTaskCodec .fromJson (slice .getBytes ()))
@@ -461,7 +478,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
461478 icebergTable .schema ().findType (field .sourceId ())))
462479 .toArray (Type []::new );
463480
464- AppendFiles appendFiles = transaction .newFastAppend ();
481+ AppendFiles appendFiles = transactionContext . getTransaction () .newFastAppend ();
465482 for (CommitTaskData task : commitTasks ) {
466483 DataFiles .Builder builder = DataFiles .builder (icebergTable .spec ())
467484 .withPath (task .getPath ())
@@ -479,7 +496,8 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
479496 }
480497
481498 appendFiles .commit ();
482- transaction .commitTransaction ();
499+ transactionContext .getTransaction ().commitTransaction ();
500+ transactionContext .destroy ();
483501
484502 return Optional .of (new HiveWrittenPartitions (commitTasks .stream ()
485503 .map (CommitTaskData ::getPath )
@@ -770,6 +788,44 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
770788 }
771789 }
772790
791+ @ Override
792+ public ConnectorDistributedProcedureHandle beginCallDistributedProcedure (
793+ ConnectorSession session ,
794+ QualifiedObjectName procedureName ,
795+ ConnectorTableLayoutHandle tableLayoutHandle ,
796+ Object [] arguments )
797+ {
798+ IcebergTableHandle handle = ((IcebergTableLayoutHandle ) tableLayoutHandle ).getTable ();
799+ Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
800+
801+ if (handle .isSnapshotSpecified ()) {
802+ throw new PrestoException (NOT_SUPPORTED , "This connector do not allow table execute at specified snapshot" );
803+ }
804+
805+ transactionContext = new IcebergTransactionContext (Optional .of (icebergTable ), icebergTable .newTransaction ());
806+ Procedure procedure = procedureRegistry .resolve (
807+ new ConnectorId (procedureName .getCatalogName ()),
808+ new SchemaTableName (
809+ procedureName .getSchemaName (),
810+ procedureName .getObjectName ()));
811+ verify (procedure instanceof DistributedProcedure , "procedure must be DistributedProcedure" );
812+ return ((DistributedProcedure ) procedure ).getBeginCallDistributedProcedure ().begin (session , transactionContext , tableLayoutHandle , arguments );
813+ }
814+
815+ @ Override
816+ public void finishCallDistributedProcedure (ConnectorSession session , ConnectorDistributedProcedureHandle tableHandle , QualifiedObjectName procedureName , Collection <Slice > fragments )
817+ {
818+ Procedure procedure = procedureRegistry .resolve (
819+ new ConnectorId (procedureName .getCatalogName ()),
820+ new SchemaTableName (
821+ procedureName .getSchemaName (),
822+ procedureName .getObjectName ()));
823+ verify (procedure instanceof DistributedProcedure , "procedure must be DistributedProcedure" );
824+ ((DistributedProcedure ) procedure ).getFinishCallDistributedProcedure ().finish (transactionContext , tableHandle , fragments );
825+ transactionContext .getTransaction ().commitTransaction ();
826+ transactionContext .destroy ();
827+ }
828+
773829 @ Override
774830 public ConnectorTableHandle beginDelete (ConnectorSession session , ConnectorTableHandle tableHandle )
775831 {
@@ -790,7 +846,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
790846 }
791847
792848 validateTableMode (session , icebergTable );
793- transaction = icebergTable .newTransaction ();
849+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
794850
795851 return handle ;
796852 }
@@ -801,7 +857,7 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
801857 IcebergTableHandle handle = (IcebergTableHandle ) tableHandle ;
802858 Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
803859
804- RowDelta rowDelta = transaction .newRowDelta ();
860+ RowDelta rowDelta = transactionContext . getTransaction () .newRowDelta ();
805861
806862 List <CommitTaskData > commitTasks = fragments .stream ()
807863 .map (slice -> commitTaskCodec .fromJson (slice .getBytes ()))
@@ -838,7 +894,8 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
838894 }
839895
840896 rowDelta .commit ();
841- transaction .commitTransaction ();
897+ transactionContext .getTransaction ().commitTransaction ();
898+ transactionContext .destroy ();
842899 }
843900
844901 @ Override
@@ -872,7 +929,15 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa
872929 Table icebergTable = getIcebergTable (session , handle .getSchemaTableName ());
873930
874931 boolean supportsMetadataDelete = true ;
875- for (PartitionSpec spec : icebergTable .specs ().values ()) {
932+ // Get partition specs that really need to be checked
933+ Set <PartitionSpec > partitionSpecs = handle .getIcebergTableName ().getSnapshotId ().map (
934+ snapshot -> icebergTable .snapshot (snapshot ).allManifests (icebergTable .io ()).stream ()
935+ .map (ManifestFile ::partitionSpecId )
936+ .map (icebergTable .specs ()::get )
937+ .collect (toImmutableSet ()))
938+ .orElseGet (() -> ImmutableSet .copyOf (icebergTable .specs ().values ())); // No snapshot, so no data. This case doesn't matter.
939+
940+ for (PartitionSpec spec : partitionSpecs ) {
876941 // Currently we do not support delete when any partition columns in predicate is not transform by identity()
877942 Set <Integer > partitionColumnSourceIds = spec .fields ().stream ()
878943 .filter (field -> field .transform ().isIdentity ())
@@ -928,15 +993,16 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl
928993 */
929994 private long removeScanFiles (Table icebergTable , Iterable <FileScanTask > scan )
930995 {
931- transaction = icebergTable .newTransaction ();
932- DeleteFiles deletes = transaction .newDelete ();
996+ transactionContext = new IcebergTransactionContext ( Optional . of ( icebergTable ), icebergTable .newTransaction () );
997+ DeleteFiles deletes = transactionContext . getTransaction () .newDelete ();
933998 AtomicLong rowsDeleted = new AtomicLong (0L );
934999 scan .forEach (t -> {
9351000 deletes .deleteFile (t .file ());
9361001 rowsDeleted .addAndGet (t .estimatedRowsCount ());
9371002 });
9381003 deletes .commit ();
939- transaction .commitTransaction ();
1004+ transactionContext .getTransaction ().commitTransaction ();
1005+ transactionContext .destroy ();
9401006 return rowsDeleted .get ();
9411007 }
9421008
0 commit comments