Skip to content
Open

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.iceberg.statistics.StatisticsFileCacheKey;
import com.facebook.presto.iceberg.transaction.IcebergTransactionManager;
import com.facebook.presto.orc.CachingStripeMetadataSource;
import com.facebook.presto.orc.DwrfAwareStripeMetadataSourceFactory;
import com.facebook.presto.orc.EncryptionLibrary;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.facebook.presto.hive.HiveTransactionHandle;
import com.facebook.presto.iceberg.function.IcebergBucketFunction;
import com.facebook.presto.iceberg.function.changelog.ApplyChangelogFunction;
import com.facebook.presto.iceberg.transaction.IcebergTransactionManager;
import com.facebook.presto.iceberg.transaction.IcebergTransactionMetadata;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.connector.Connector;
Expand Down Expand Up @@ -45,7 +47,7 @@

import static com.facebook.presto.spi.connector.ConnectorCapabilities.NOT_NULL_COLUMN_CONSTRAINT;
import static com.facebook.presto.spi.connector.EmptyConnectorCommitHandle.INSTANCE;
import static com.facebook.presto.spi.transaction.IsolationLevel.SERIALIZABLE;
import static com.facebook.presto.spi.transaction.IsolationLevel.REPEATABLE_READ;
import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
import static com.google.common.collect.Sets.immutableEnumSet;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -109,7 +111,7 @@ public IcebergConnector(
@Override
public boolean isSingleStatementWritesOnly()
{
return true;
return false;
}

@Override
Expand Down Expand Up @@ -204,27 +206,35 @@ public ConnectorAccessControl getAccessControl()
}

@Override
public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly)
public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean autoCommitContext, boolean readOnly)
{
checkConnectorSupports(SERIALIZABLE, isolationLevel);
checkConnectorSupports(REPEATABLE_READ, isolationLevel);
ConnectorTransactionHandle transaction = new HiveTransactionHandle();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
transactionManager.put(transaction, metadataFactory.create());
transactionManager.put(transaction, metadataFactory.create(isolationLevel, autoCommitContext));
}
return transaction;
}

@Override
public ConnectorCommitHandle commit(ConnectorTransactionHandle transaction)
{
transactionManager.remove(transaction);
IcebergTransactionMetadata icebergTransactionMetadata = transactionManager.get(transaction);
if (icebergTransactionMetadata != null) {
icebergTransactionMetadata.commit();
transactionManager.remove(transaction);
}
return INSTANCE;
}

@Override
public void rollback(ConnectorTransactionHandle transaction)
{
transactionManager.remove(transaction);
IcebergTransactionMetadata icebergTransactionMetadata = transactionManager.get(transaction);
if (icebergTransactionMetadata != null) {
icebergTransactionMetadata.rollback();
transactionManager.remove(transaction);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public enum IcebergErrorCode
ICEBERG_MISSING_COLUMN(17, EXTERNAL),
ICEBERG_INVALID_MATERIALIZED_VIEW(18, EXTERNAL),
ICEBERG_INVALID_SPEC_ID(19, EXTERNAL),
ICEBERG_TRANSACTION_CONFLICT_ERROR(20, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import com.facebook.presto.spi.statistics.TableStatisticType;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
import com.facebook.presto.spi.transaction.IsolationLevel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -192,10 +193,12 @@ public IcebergHiveMetadata(
StatisticsFileCache statisticsFileCache,
ManifestFileCache manifestFileCache,
IcebergTableProperties tableProperties,
ConnectorSystemConfig connectorSystemConfig)
ConnectorSystemConfig connectorSystemConfig,
IsolationLevel isolationLevel,
boolean autoCommitContext)
{
super(typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, schemaTableNamesCodec,
nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties);
nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties, isolationLevel, autoCommitContext);
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand Down Expand Up @@ -338,6 +341,7 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
@Override
public void createSchema(ConnectorSession session, String schemaName, Map<String, Object> properties)
{
shouldRunInAutoCommitTransaction("CREATE SCHEMA");
Optional<String> location = getLocation(properties).map(uri -> {
try {
hdfsEnvironment.getFileSystem(new HdfsContext(session, schemaName), new Path(uri));
Expand All @@ -362,6 +366,7 @@ public void createSchema(ConnectorSession session, String schemaName, Map<String
@Override
public void dropSchema(ConnectorSession session, String schemaName)
{
shouldRunInAutoCommitTransaction("DROP SCHEMA");
// basic sanity check to provide a better error message
if (!listTables(session, Optional.of(schemaName)).isEmpty() ||
!listViews(session, Optional.of(schemaName)).isEmpty()) {
Expand All @@ -374,6 +379,7 @@ public void dropSchema(ConnectorSession session, String schemaName)
@Override
public void renameSchema(ConnectorSession session, String source, String target)
{
shouldRunInAutoCommitTransaction("RENAME SCHEMA");
MetastoreContext metastoreContext = getMetastoreContext(session);
metastore.renameDatabase(metastoreContext, source, target);
}
Expand Down Expand Up @@ -423,7 +429,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
SortOrder sortOrder = parseSortFields(schema, getSortOrder(tableMetadata.getProperties()));
FileFormat fileFormat = tableProperties.getFileFormat(session, tableMetadata.getProperties());
TableMetadata metadata = newTableMetadata(schema, partitionSpec, sortOrder, targetPath, populateTableProperties(this, tableMetadata, tableProperties, fileFormat, session));
transaction = createTableTransaction(tableName, operations, metadata);
openCreateTableTransaction(schemaTableName, createTableTransaction(tableName, operations, metadata));

return new IcebergOutputTableHandle(
schemaName,
Expand All @@ -441,6 +447,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
shouldRunInAutoCommitTransaction("DROP TABLE");
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
verify(handle.getIcebergTableName().getTableType() == DATA, "only the data table can be dropped");
// TODO: support path override in Iceberg table creation
Expand All @@ -460,6 +467,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
@Override
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTable)
{
shouldRunInAutoCommitTransaction("RENAME TABLE");
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
verify(handle.getIcebergTableName().getTableType() == DATA, "only the data table can be renamed");
metastore.renameTable(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName(), newTable.getSchemaName(), newTable.getTableName());
Expand All @@ -468,6 +476,7 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand
@Override
public void createView(ConnectorSession session, ConnectorTableMetadata viewMetadata, String viewData, boolean replace)
{
shouldRunInAutoCommitTransaction("CREATE VIEW");
MetastoreContext metastoreContext = getMetastoreContext(session);
SchemaTableName viewName = viewMetadata.getTable();
Table table = createTableObjectForViewCreation(
Expand Down Expand Up @@ -559,13 +568,15 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s
@Override
public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target)
{
shouldRunInAutoCommitTransaction("RENAME VIEW");
// Not checking if source view exists as this is already done in RenameViewTask
metastore.renameTable(getMetastoreContext(session), source.getSchemaName(), source.getTableName(), target.getSchemaName(), target.getTableName());
}

@Override
public void dropView(ConnectorSession session, SchemaTableName viewName)
{
shouldRunInAutoCommitTransaction("DROP VIEW");
ConnectorViewDefinition view = getViews(session, viewName.toSchemaTablePrefix()).get(viewName);
checkIfNullView(view, viewName);

Expand Down Expand Up @@ -651,12 +662,6 @@ private Set<ColumnStatisticMetadata> getHiveSupportedColumnStatistics(ConnectorS
.collect(toImmutableSet());
}

@Override
public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return tableHandle;
}

@Override
public void finishStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<ComputedStatistics> computedStatistics)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.iceberg.transaction.IcebergTransactionMetadata;
import com.facebook.presto.spi.ConnectorSystemConfig;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.procedure.ProcedureRegistry;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.transaction.IsolationLevel;
import jakarta.inject.Inject;

import java.util.List;

import static com.facebook.presto.spi.MaterializedViewDefinition.ColumnMapping;
import static com.facebook.presto.spi.transaction.IsolationLevel.REPEATABLE_READ;
import static java.util.Objects.requireNonNull;

public class IcebergHiveMetadataFactory
Expand Down Expand Up @@ -93,7 +95,12 @@ public IcebergHiveMetadataFactory(
this.connectorSystemConfig = requireNonNull(connectorSystemConfig, "connectorSystemConfig is null");
}

public ConnectorMetadata create()
public IcebergTransactionMetadata create()
{
return create(REPEATABLE_READ, true);
}

public IcebergTransactionMetadata create(IsolationLevel isolationLevel, boolean autoCommitContext)
{
return new IcebergHiveMetadata(
catalogName,
Expand All @@ -112,6 +119,8 @@ public ConnectorMetadata create()
statisticsFileCache,
manifestFileCache,
tableProperties,
connectorSystemConfig);
connectorSystemConfig,
isolationLevel,
autoCommitContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.iceberg.transaction.IcebergTransactionMetadata;
import com.facebook.presto.spi.transaction.IsolationLevel;

public interface IcebergMetadataFactory
{
ConnectorMetadata create();
IcebergTransactionMetadata create();

IcebergTransactionMetadata create(IsolationLevel isolationLevel, boolean autoCommitContext);
}
Loading
Loading