Skip to content

Commit a4fc379

Browse files
committed
[Iceberg]Support single-table multi-statement writes transaction
1 parent 23b9517 commit a4fc379

30 files changed

+880
-108
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 59 additions & 56 deletions
Large diffs are not rendered by default.

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure;
5858
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
5959
import com.facebook.presto.iceberg.statistics.StatisticsFileCacheKey;
60+
import com.facebook.presto.iceberg.transaction.IcebergTransactionManager;
6061
import com.facebook.presto.orc.CachingStripeMetadataSource;
6162
import com.facebook.presto.orc.DwrfAwareStripeMetadataSourceFactory;
6263
import com.facebook.presto.orc.EncryptionLibrary;

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConnector.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import com.facebook.airlift.bootstrap.LifeCycleManager;
1717
import com.facebook.presto.hive.HiveTransactionHandle;
18+
import com.facebook.presto.iceberg.transaction.IcebergTransactionManager;
19+
import com.facebook.presto.iceberg.transaction.IcebergTransactionMetadata;
1820
import com.facebook.presto.spi.SystemTable;
1921
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
2022
import com.facebook.presto.spi.connector.Connector;
@@ -101,7 +103,7 @@ public IcebergConnector(
101103
@Override
102104
public boolean isSingleStatementWritesOnly()
103105
{
104-
return true;
106+
return false;
105107
}
106108

107109
@Override
@@ -197,14 +199,22 @@ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel
197199
@Override
198200
public ConnectorCommitHandle commit(ConnectorTransactionHandle transaction)
199201
{
200-
transactionManager.remove(transaction);
202+
IcebergTransactionMetadata icebergTransactionMetadata = transactionManager.get(transaction);
203+
if (icebergTransactionMetadata != null) {
204+
icebergTransactionMetadata.commit();
205+
transactionManager.remove(transaction);
206+
}
201207
return INSTANCE;
202208
}
203209

204210
@Override
205211
public void rollback(ConnectorTransactionHandle transaction)
206212
{
207-
transactionManager.remove(transaction);
213+
IcebergTransactionMetadata icebergTransactionMetadata = transactionManager.get(transaction);
214+
if (icebergTransactionMetadata != null) {
215+
icebergTransactionMetadata.rollback();
216+
transactionManager.remove(transaction);
217+
}
208218
}
209219

210220
@Override

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public enum IcebergErrorCode
4040
ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR),
4141
ICEBERG_UNKNOWN_MANIFEST_TYPE(15, EXTERNAL),
4242
ICEBERG_COMMIT_ERROR(16, EXTERNAL),
43-
ICEBERG_MISSING_COLUMN(17, EXTERNAL);
43+
ICEBERG_MISSING_COLUMN(17, EXTERNAL),
44+
ICEBERG_TRANSACTION_CONFLICT_ERROR(18, INTERNAL_ERROR);
4445

4546
private final ErrorCode errorCode;
4647

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
298298
@Override
299299
public void createSchema(ConnectorSession session, String schemaName, Map<String, Object> properties)
300300
{
301+
shouldRunInAutoCommitTransaction("CREATE SCHEMA");
301302
Optional<String> location = getLocation(properties).map(uri -> {
302303
try {
303304
hdfsEnvironment.getFileSystem(new HdfsContext(session, schemaName), new Path(uri));
@@ -322,6 +323,7 @@ public void createSchema(ConnectorSession session, String schemaName, Map<String
322323
@Override
323324
public void dropSchema(ConnectorSession session, String schemaName)
324325
{
326+
shouldRunInAutoCommitTransaction("DROP SCHEMA");
325327
// basic sanity check to provide a better error message
326328
if (!listTables(session, Optional.of(schemaName)).isEmpty() ||
327329
!listViews(session, Optional.of(schemaName)).isEmpty()) {
@@ -334,6 +336,7 @@ public void dropSchema(ConnectorSession session, String schemaName)
334336
@Override
335337
public void renameSchema(ConnectorSession session, String source, String target)
336338
{
339+
shouldRunInAutoCommitTransaction("RENAME SCHEMA");
337340
MetastoreContext metastoreContext = getMetastoreContext(session);
338341
metastore.renameDatabase(metastoreContext, source, target);
339342
}
@@ -383,7 +386,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
383386
SortOrder sortOrder = parseSortFields(schema, tableProperties.getSortOrder(tableMetadata.getProperties()));
384387
FileFormat fileFormat = tableProperties.getFileFormat(session, tableMetadata.getProperties());
385388
TableMetadata metadata = newTableMetadata(schema, partitionSpec, sortOrder, targetPath, populateTableProperties(this, tableMetadata, tableProperties, fileFormat, session));
386-
transaction = createTableTransaction(tableName, operations, metadata);
389+
openCreateTableTransaction(schemaTableName, createTableTransaction(tableName, operations, metadata));
387390

388391
return new IcebergOutputTableHandle(
389392
schemaName,
@@ -401,6 +404,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
401404
@Override
402405
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
403406
{
407+
shouldRunInAutoCommitTransaction("DROP TABLE");
404408
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
405409
verify(handle.getIcebergTableName().getTableType() == DATA, "only the data table can be dropped");
406410
// TODO: support path override in Iceberg table creation
@@ -420,6 +424,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
420424
@Override
421425
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTable)
422426
{
427+
shouldRunInAutoCommitTransaction("RENAME TABLE");
423428
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
424429
verify(handle.getIcebergTableName().getTableType() == DATA, "only the data table can be renamed");
425430
metastore.renameTable(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName(), newTable.getSchemaName(), newTable.getTableName());
@@ -428,6 +433,7 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand
428433
@Override
429434
public void createView(ConnectorSession session, ConnectorTableMetadata viewMetadata, String viewData, boolean replace)
430435
{
436+
shouldRunInAutoCommitTransaction("CREATE VIEW");
431437
MetastoreContext metastoreContext = getMetastoreContext(session);
432438
SchemaTableName viewName = viewMetadata.getTable();
433439
Table table = createTableObjectForViewCreation(
@@ -494,13 +500,15 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s
494500
@Override
495501
public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target)
496502
{
503+
shouldRunInAutoCommitTransaction("RENAME VIEW");
497504
// Not checking if source view exists as this is already done in RenameViewTask
498505
metastore.renameTable(getMetastoreContext(session), source.getSchemaName(), source.getTableName(), target.getSchemaName(), target.getTableName());
499506
}
500507

501508
@Override
502509
public void dropView(ConnectorSession session, SchemaTableName viewName)
503510
{
511+
shouldRunInAutoCommitTransaction("DROP VIEW");
504512
ConnectorViewDefinition view = getViews(session, viewName.toSchemaTablePrefix()).get(viewName);
505513
checkIfNullView(view, viewName);
506514

@@ -587,12 +595,6 @@ private Set<ColumnStatisticMetadata> getHiveSupportedColumnStatistics(ConnectorS
587595
.collect(toImmutableSet());
588596
}
589597

590-
@Override
591-
public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle)
592-
{
593-
return tableHandle;
594-
}
595-
596598
@Override
597599
public void finishStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<ComputedStatistics> computedStatistics)
598600
{

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import com.facebook.presto.hive.NodeVersion;
2020
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
2121
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
22+
import com.facebook.presto.iceberg.transaction.IcebergTransactionMetadata;
2223
import com.facebook.presto.spi.ConnectorSystemConfig;
23-
import com.facebook.presto.spi.connector.ConnectorMetadata;
2424
import com.facebook.presto.spi.function.StandardFunctionResolution;
2525
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
2626
import com.facebook.presto.spi.relation.RowExpressionService;
@@ -82,12 +82,12 @@ public IcebergHiveMetadataFactory(
8282
this.connectorSystemConfig = requireNonNull(connectorSystemConfig, "connectorSystemConfig is null");
8383
}
8484

85-
public ConnectorMetadata create()
85+
public IcebergTransactionMetadata create()
8686
{
8787
return create(REPEATABLE_READ, true);
8888
}
8989

90-
public ConnectorMetadata create(IsolationLevel isolationLevel, boolean autoCommitContext)
90+
public IcebergTransactionMetadata create(IsolationLevel isolationLevel, boolean autoCommitContext)
9191
{
9292
return new IcebergHiveMetadata(
9393
catalogName,

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
*/
1414
package com.facebook.presto.iceberg;
1515

16-
import com.facebook.presto.spi.connector.ConnectorMetadata;
16+
import com.facebook.presto.iceberg.transaction.IcebergTransactionMetadata;
1717
import com.facebook.presto.spi.transaction.IsolationLevel;
1818

1919
public interface IcebergMetadataFactory
2020
{
21-
ConnectorMetadata create();
21+
IcebergTransactionMetadata create();
2222

23-
ConnectorMetadata create(IsolationLevel isolationLevel, boolean autoCommitContext);
23+
IcebergTransactionMetadata create(IsolationLevel isolationLevel, boolean autoCommitContext);
2424
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
195195
@Override
196196
public void createSchema(ConnectorSession session, String schemaName, Map<String, Object> properties)
197197
{
198+
shouldRunInAutoCommitTransaction("CREATE SCHEMA");
198199
catalogFactory.getNamespaces(session).createNamespace(toIcebergNamespace(Optional.of(schemaName), catalogFactory.isNestedNamespaceEnabled()),
199200
properties.entrySet().stream()
200201
.collect(toMap(Map.Entry::getKey, e -> e.getValue().toString())));
@@ -203,6 +204,7 @@ public void createSchema(ConnectorSession session, String schemaName, Map<String
203204
@Override
204205
public void dropSchema(ConnectorSession session, String schemaName)
205206
{
207+
shouldRunInAutoCommitTransaction("DROP SCHEMA");
206208
try {
207209
catalogFactory.getNamespaces(session).dropNamespace(toIcebergNamespace(Optional.of(schemaName), catalogFactory.isNestedNamespaceEnabled()));
208210
}
@@ -214,12 +216,14 @@ public void dropSchema(ConnectorSession session, String schemaName)
214216
@Override
215217
public void renameSchema(ConnectorSession session, String source, String target)
216218
{
219+
shouldRunInAutoCommitTransaction("RENAME SCHEMA");
217220
throw new PrestoException(NOT_SUPPORTED, format("Iceberg %s catalog does not support rename namespace", catalogType.name()));
218221
}
219222

220223
@Override
221224
public void createView(ConnectorSession session, ConnectorTableMetadata viewMetadata, String viewData, boolean replace)
222225
{
226+
shouldRunInAutoCommitTransaction("CREATE VIEW");
223227
Catalog catalog = catalogFactory.getCatalog(session);
224228
if (!(catalog instanceof ViewCatalog)) {
225229
throw new PrestoException(NOT_SUPPORTED, "This connector does not support creating views");
@@ -300,6 +304,7 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s
300304
@Override
301305
public void dropView(ConnectorSession session, SchemaTableName viewName)
302306
{
307+
shouldRunInAutoCommitTransaction("DROP VIEW");
303308
Catalog catalog = catalogFactory.getCatalog(session);
304309
if (!(catalog instanceof ViewCatalog)) {
305310
throw new PrestoException(NOT_SUPPORTED, "This connector does not support dropping views");
@@ -310,6 +315,7 @@ public void dropView(ConnectorSession session, SchemaTableName viewName)
310315
@Override
311316
public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target)
312317
{
318+
shouldRunInAutoCommitTransaction("RENAME VIEW");
313319
Catalog catalog = catalogFactory.getCatalog(session);
314320
if (!(catalog instanceof ViewCatalog)) {
315321
throw new PrestoException(NOT_SUPPORTED, "This connector does not support renaming views");
@@ -343,27 +349,27 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
343349
TableIdentifier tableIdentifier = toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled());
344350
String targetPath = tableProperties.getTableLocation(tableMetadata.getProperties());
345351
if (!isNullOrEmpty(targetPath)) {
346-
transaction = catalogFactory.getCatalog(session).newCreateTableTransaction(
352+
openCreateTableTransaction(schemaTableName, catalogFactory.getCatalog(session).newCreateTableTransaction(
347353
tableIdentifier,
348354
schema,
349355
partitionSpec,
350356
targetPath,
351-
populateTableProperties(this, tableMetadata, tableProperties, fileFormat, session));
357+
populateTableProperties(this, tableMetadata, tableProperties, fileFormat, session)));
352358
}
353359
else {
354-
transaction = catalogFactory.getCatalog(session).newCreateTableTransaction(
360+
openCreateTableTransaction(schemaTableName, catalogFactory.getCatalog(session).newCreateTableTransaction(
355361
tableIdentifier,
356362
schema,
357363
partitionSpec,
358-
populateTableProperties(this, tableMetadata, tableProperties, fileFormat, session));
364+
populateTableProperties(this, tableMetadata, tableProperties, fileFormat, session)));
359365
}
360366
}
361367
catch (AlreadyExistsException e) {
362368
throw new TableAlreadyExistsException(schemaTableName);
363369
}
364370

365-
Table icebergTable = transaction.table();
366-
ReplaceSortOrder replaceSortOrder = transaction.replaceSortOrder();
371+
Table icebergTable = getIcebergTable(session, schemaTableName);
372+
ReplaceSortOrder replaceSortOrder = icebergTable.replaceSortOrder();
367373
SortOrder sortOrder = parseSortFields(schema, tableProperties.getSortOrder(tableMetadata.getProperties()));
368374
List<SortField> sortFields = getSupportedSortFields(icebergTable.schema(), sortOrder);
369375
for (SortField sortField : sortFields) {
@@ -398,6 +404,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
398404
@Override
399405
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
400406
{
407+
shouldRunInAutoCommitTransaction("DROP TABLE");
401408
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
402409
verify(icebergTableHandle.getIcebergTableName().getTableType() == DATA, "only the data table can be dropped");
403410
TableIdentifier tableIdentifier = toIcebergTableIdentifier(icebergTableHandle.getSchemaTableName(), catalogFactory.isNestedNamespaceEnabled());
@@ -407,6 +414,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
407414
@Override
408415
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTable)
409416
{
417+
shouldRunInAutoCommitTransaction("RENAME TABLE");
410418
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
411419
verify(icebergTableHandle.getIcebergTableName().getTableType() == DATA, "only the data table can be renamed");
412420
TableIdentifier from = toIcebergTableIdentifier(icebergTableHandle.getSchemaTableName(), catalogFactory.isNestedNamespaceEnabled());

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import com.facebook.presto.common.type.TypeManager;
1818
import com.facebook.presto.hive.NodeVersion;
1919
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
20-
import com.facebook.presto.spi.connector.ConnectorMetadata;
20+
import com.facebook.presto.iceberg.transaction.IcebergTransactionMetadata;
2121
import com.facebook.presto.spi.function.StandardFunctionResolution;
2222
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
2323
import com.facebook.presto.spi.relation.RowExpressionService;
@@ -68,12 +68,12 @@ public IcebergNativeMetadataFactory(
6868
this.tableProperties = requireNonNull(tableProperties, "tableProperties is null");
6969
}
7070

71-
public ConnectorMetadata create()
71+
public IcebergTransactionMetadata create()
7272
{
7373
return create(REPEATABLE_READ, true);
7474
}
7575

76-
public ConnectorMetadata create(IsolationLevel isolationLevel, boolean autoCommitContext)
76+
public IcebergTransactionMetadata create(IsolationLevel isolationLevel, boolean autoCommitContext)
7777
{
7878
return new IcebergNativeMetadata(catalogFactory, typeManager, functionResolution,
7979
rowExpressionService, commitTaskCodec, catalogType, nodeVersion,

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.presto.common.type.TypeManager;
1919
import com.facebook.presto.iceberg.changelog.ChangelogSplitSource;
2020
import com.facebook.presto.iceberg.equalitydeletes.EqualityDeletesSplitSource;
21+
import com.facebook.presto.iceberg.transaction.IcebergTransactionManager;
2122
import com.facebook.presto.spi.ConnectorSession;
2223
import com.facebook.presto.spi.ConnectorSplitSource;
2324
import com.facebook.presto.spi.ConnectorTableLayoutHandle;

0 commit comments

Comments
 (0)