Skip to content

Commit 23b9517

Browse files
committed
[Iceberg]Accept and maintain isolation level on starting transaction
And set Iceberg connector supports isolation level to REPEATABLE_READ(SNAPSHOT)
1 parent 40742dc commit 23b9517

File tree

7 files changed

+27
-16
lines changed

7 files changed

+27
-16
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import com.facebook.presto.spi.statistics.TableStatisticType;
6565
import com.facebook.presto.spi.statistics.TableStatistics;
6666
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
67+
import com.facebook.presto.spi.transaction.IsolationLevel;
6768
import com.google.common.base.Functions;
6869
import com.google.common.base.Predicates;
6970
import com.google.common.base.VerifyException;
@@ -80,7 +81,6 @@
8081
import org.apache.iceberg.DeleteFiles;
8182
import org.apache.iceberg.FileFormat;
8283
import org.apache.iceberg.FileMetadata;
83-
import org.apache.iceberg.IsolationLevel;
8484
import org.apache.iceberg.MetricsConfig;
8585
import org.apache.iceberg.MetricsModes.None;
8686
import org.apache.iceberg.PartitionField;
@@ -192,6 +192,7 @@
192192
import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateStatisticsConsideringLayout;
193193
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
194194
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
195+
import static com.facebook.presto.spi.transaction.IsolationLevel.SERIALIZABLE;
195196
import static com.google.common.base.Strings.isNullOrEmpty;
196197
import static com.google.common.base.Verify.verify;
197198
import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -205,8 +206,6 @@
205206
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
206207
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
207208
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
208-
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
209-
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
210209
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
211210

212211
public abstract class IcebergAbstractMetadata
@@ -223,6 +222,7 @@ public abstract class IcebergAbstractMetadata
223222
protected Transaction transaction;
224223
protected final StatisticsFileCache statisticsFileCache;
225224
protected final IcebergTableProperties tableProperties;
225+
protected final IsolationLevel isolationLevel;
226226
protected final boolean autoCommitContext;
227227

228228
private final StandardFunctionResolution functionResolution;
@@ -237,6 +237,7 @@ public IcebergAbstractMetadata(
237237
FilterStatsCalculatorService filterStatsCalculatorService,
238238
StatisticsFileCache statisticsFileCache,
239239
IcebergTableProperties tableProperties,
240+
com.facebook.presto.spi.transaction.IsolationLevel isolationLevel,
240241
boolean autoCommitContext)
241242
{
242243
this.typeManager = requireNonNull(typeManager, "typeManager is null");
@@ -247,6 +248,7 @@ public IcebergAbstractMetadata(
247248
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
248249
this.statisticsFileCache = requireNonNull(statisticsFileCache, "statisticsFileCache is null");
249250
this.tableProperties = requireNonNull(tableProperties, "tableProperties is null");
251+
this.isolationLevel = requireNonNull(isolationLevel, "isolationLevel is null");
250252
this.autoCommitContext = autoCommitContext;
251253
}
252254

@@ -602,14 +604,13 @@ private Optional<ConnectorOutputMetadata> finishWrite(ConnectorSession session,
602604

603605
RowDelta rowDelta = transaction.newRowDelta();
604606
writableTableHandle.getTableName().getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
605-
IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT));
606607

607608
ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
608609
ImmutableSet.Builder<String> referencedDataFiles = ImmutableSet.builder();
609610
commitTasks.forEach(task -> handleTask(task, icebergTable, rowDelta, writtenFiles, referencedDataFiles));
610611

611612
rowDelta.validateDataFilesExist(referencedDataFiles.build());
612-
if (isolationLevel == IsolationLevel.SERIALIZABLE) {
613+
if (isolationLevel == SERIALIZABLE) {
613614
rowDelta.validateNoConflictingDataFiles();
614615
}
615616

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConnector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
import static com.facebook.presto.spi.connector.ConnectorCapabilities.NOT_NULL_COLUMN_CONSTRAINT;
4242
import static com.facebook.presto.spi.connector.EmptyConnectorCommitHandle.INSTANCE;
43-
import static com.facebook.presto.spi.transaction.IsolationLevel.SERIALIZABLE;
43+
import static com.facebook.presto.spi.transaction.IsolationLevel.REPEATABLE_READ;
4444
import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
4545
import static com.google.common.collect.Sets.immutableEnumSet;
4646
import static java.util.Objects.requireNonNull;
@@ -186,10 +186,10 @@ public ConnectorAccessControl getAccessControl()
186186
@Override
187187
public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean autoCommitContext, boolean readOnly)
188188
{
189-
checkConnectorSupports(SERIALIZABLE, isolationLevel);
189+
checkConnectorSupports(REPEATABLE_READ, isolationLevel);
190190
ConnectorTransactionHandle transaction = new HiveTransactionHandle();
191191
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
192-
transactionManager.put(transaction, metadataFactory.create(autoCommitContext));
192+
transactionManager.put(transaction, metadataFactory.create(isolationLevel, autoCommitContext));
193193
}
194194
return transaction;
195195
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import com.facebook.presto.spi.statistics.TableStatisticType;
6262
import com.facebook.presto.spi.statistics.TableStatistics;
6363
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
64+
import com.facebook.presto.spi.transaction.IsolationLevel;
6465
import com.google.common.annotations.VisibleForTesting;
6566
import com.google.common.cache.Cache;
6667
import com.google.common.cache.CacheBuilder;
@@ -181,10 +182,11 @@ public IcebergHiveMetadata(
181182
ManifestFileCache manifestFileCache,
182183
IcebergTableProperties tableProperties,
183184
ConnectorSystemConfig connectorSystemConfig,
185+
IsolationLevel isolationLevel,
184186
boolean autoCommitContext)
185187
{
186188
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion,
187-
filterStatsCalculatorService, statisticsFileCache, tableProperties, autoCommitContext);
189+
filterStatsCalculatorService, statisticsFileCache, tableProperties, isolationLevel, autoCommitContext);
188190
this.catalogName = requireNonNull(catalogName, "catalogName is null");
189191
this.metastore = requireNonNull(metastore, "metastore is null");
190192
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import com.facebook.presto.spi.function.StandardFunctionResolution;
2525
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
2626
import com.facebook.presto.spi.relation.RowExpressionService;
27+
import com.facebook.presto.spi.transaction.IsolationLevel;
2728

2829
import javax.inject.Inject;
2930

31+
import static com.facebook.presto.spi.transaction.IsolationLevel.REPEATABLE_READ;
3032
import static java.util.Objects.requireNonNull;
3133

3234
public class IcebergHiveMetadataFactory
@@ -82,10 +84,10 @@ public IcebergHiveMetadataFactory(
8284

8385
public ConnectorMetadata create()
8486
{
85-
return create(true);
87+
return create(REPEATABLE_READ, true);
8688
}
8789

88-
public ConnectorMetadata create(boolean autoCommitContext)
90+
public ConnectorMetadata create(IsolationLevel isolationLevel, boolean autoCommitContext)
8991
{
9092
return new IcebergHiveMetadata(
9193
catalogName,
@@ -102,6 +104,7 @@ public ConnectorMetadata create(boolean autoCommitContext)
102104
manifestFileCache,
103105
tableProperties,
104106
connectorSystemConfig,
107+
isolationLevel,
105108
autoCommitContext);
106109
}
107110
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@
1414
package com.facebook.presto.iceberg;
1515

1616
import com.facebook.presto.spi.connector.ConnectorMetadata;
17+
import com.facebook.presto.spi.transaction.IsolationLevel;
1718

1819
public interface IcebergMetadataFactory
1920
{
2021
ConnectorMetadata create();
2122

22-
ConnectorMetadata create(boolean autoCommitContext);
23+
ConnectorMetadata create(IsolationLevel isolationLevel, boolean autoCommitContext);
2324
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.facebook.presto.spi.function.StandardFunctionResolution;
3232
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
3333
import com.facebook.presto.spi.relation.RowExpressionService;
34+
import com.facebook.presto.spi.transaction.IsolationLevel;
3435
import com.google.common.collect.ImmutableList;
3536
import com.google.common.collect.ImmutableMap;
3637
import org.apache.hadoop.fs.Path;
@@ -109,10 +110,11 @@ public IcebergNativeMetadata(
109110
FilterStatsCalculatorService filterStatsCalculatorService,
110111
StatisticsFileCache statisticsFileCache,
111112
IcebergTableProperties tableProperties,
113+
IsolationLevel isolationLevel,
112114
boolean autoCommitContext)
113115
{
114116
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion,
115-
filterStatsCalculatorService, statisticsFileCache, tableProperties, autoCommitContext);
117+
filterStatsCalculatorService, statisticsFileCache, tableProperties, isolationLevel, autoCommitContext);
116118
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
117119
this.catalogType = requireNonNull(catalogType, "catalogType is null");
118120
this.warehouseDataDir = Optional.ofNullable(catalogFactory.getCatalogWarehouseDataDir());

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import com.facebook.presto.spi.function.StandardFunctionResolution;
2222
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
2323
import com.facebook.presto.spi.relation.RowExpressionService;
24+
import com.facebook.presto.spi.transaction.IsolationLevel;
2425

2526
import javax.inject.Inject;
2627

28+
import static com.facebook.presto.spi.transaction.IsolationLevel.REPEATABLE_READ;
2729
import static java.util.Objects.requireNonNull;
2830

2931
public class IcebergNativeMetadataFactory
@@ -68,13 +70,13 @@ public IcebergNativeMetadataFactory(
6870

6971
public ConnectorMetadata create()
7072
{
71-
return create(true);
73+
return create(REPEATABLE_READ, true);
7274
}
7375

74-
public ConnectorMetadata create(boolean autoCommitContext)
76+
public ConnectorMetadata create(IsolationLevel isolationLevel, boolean autoCommitContext)
7577
{
7678
return new IcebergNativeMetadata(catalogFactory, typeManager, functionResolution,
7779
rowExpressionService, commitTaskCodec, catalogType, nodeVersion,
78-
filterStatsCalculatorService, statisticsFileCache, tableProperties, autoCommitContext);
80+
filterStatsCalculatorService, statisticsFileCache, tableProperties, isolationLevel, autoCommitContext);
7981
}
8082
}

0 commit comments

Comments
 (0)