Skip to content

Commit d90c514

Browse files
committed
Preserve table features on CREATE OR REPLACE TABLE
According to https://github.com/delta-io/delta/blob/master/PROTOCOL.md#table-features-for-new-and-existing-tables, retain existing read and write features when executing `CREATE OR REPLACE TABLE`, as previous table data may still depend on them.
1 parent e7ba40b commit d90c514

File tree

4 files changed

+48
-9
lines changed

4 files changed

+48
-9
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1355,12 +1355,12 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
13551355
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true, Optional.empty()));
13561356
}
13571357
}
1358-
protocolEntry = protocolEntryForTable(tableHandle.getProtocolEntry().minReaderVersion(), tableHandle.getProtocolEntry().minWriterVersion(), containsTimestampType, tableMetadata.getProperties());
1358+
protocolEntry = protocolEntryForTable(ProtocolEntry.builder(tableHandle.getProtocolEntry()), containsTimestampType, tableMetadata.getProperties());
13591359
statisticsAccess.deleteExtendedStatistics(session, schemaTableName, location, tableHandle.toCredentialsHandle());
13601360
}
13611361
else {
13621362
setRollback(() -> deleteRecursivelyIfExists(fileSystem, deltaLogDirectory));
1363-
protocolEntry = protocolEntryForTable(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, containsTimestampType, tableMetadata.getProperties());
1363+
protocolEntry = protocolEntryForTable(ProtocolEntry.builder(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION), containsTimestampType, tableMetadata.getProperties());
13641364
}
13651365

13661366
MetadataEntry metadataEntry = MetadataEntry.builder()
@@ -1545,15 +1545,15 @@ public DeltaLakeOutputTableHandle beginCreateTable(
15451545

15461546
boolean isNewCheckpointFileRequired = false;
15471547
if (replaceExistingTable) {
1548-
protocolEntry = protocolEntryForTable(handle.getProtocolEntry().minReaderVersion(), handle.getProtocolEntry().minWriterVersion(), containsTimestampType, tableMetadata.getProperties());
1548+
protocolEntry = protocolEntryForTable(ProtocolEntry.builder(handle.getProtocolEntry()), containsTimestampType, tableMetadata.getProperties());
15491549
readVersion = OptionalLong.of(handle.getReadVersion());
15501550
isNewCheckpointFileRequired = isNewCheckpointFileRequired(getColumns(handle.getMetadataEntry(), handle.getProtocolEntry()), columnHandles.build());
15511551
}
15521552
else {
15531553
TrinoFileSystem fileSystem = fileSystemFactory.create(session, location);
15541554
checkPathContainsNoFiles(fileSystem, finalLocation);
15551555
setRollback(() -> deleteRecursivelyIfExists(fileSystem, finalLocation));
1556-
protocolEntry = protocolEntryForTable(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, containsTimestampType, tableMetadata.getProperties());
1556+
protocolEntry = protocolEntryForTable(ProtocolEntry.builder(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION), containsTimestampType, tableMetadata.getProperties());
15571557
}
15581558

15591559
return new DeltaLakeOutputTableHandle(
@@ -3133,10 +3133,10 @@ private void checkSupportedWriterVersion(DeltaLakeTableHandle handle)
31333133
}
31343134
}
31353135

3136-
private ProtocolEntry protocolEntryForTable(int readerVersion, int writerVersion, boolean containsTimestampType, Map<String, Object> properties)
3136+
private ProtocolEntry protocolEntryForTable(ProtocolEntry.Builder builder, boolean containsTimestampType, Map<String, Object> properties)
31373137
{
31383138
return protocolEntry(
3139-
ProtocolEntry.builder(readerVersion, writerVersion),
3139+
builder,
31403140
containsTimestampType,
31413141
getChangeDataFeedEnabled(properties),
31423142
getColumnMappingMode(properties),

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public record ProtocolEntry(
6161
// Spark done the similar logic here:
6262
// https://github.com/delta-io/delta/blob/f5ebfbb10bda87b6b2b7c9032e86edaf40d446bc/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L155-L157
6363
if (minReaderVersion >= MIN_VERSION_SUPPORTS_READER_FEATURES && readerFeatures.isEmpty()) {
64-
log.warn("readerFeatures must exist when minReaderVersion is greater than or equal to " + MIN_VERSION_SUPPORTS_READER_FEATURES);
64+
log.debug("readerFeatures must exist when minReaderVersion is greater than or equal to %s", MIN_VERSION_SUPPORTS_READER_FEATURES);
6565
}
6666
if (minWriterVersion < MIN_VERSION_SUPPORTS_WRITER_FEATURES && writerFeatures.isPresent()) {
6767
throw new IllegalArgumentException("writerFeatures must not exist when minWriterVersion is less than " + MIN_VERSION_SUPPORTS_WRITER_FEATURES);
@@ -72,13 +72,13 @@ public record ProtocolEntry(
7272
// Spark done the similar logic here:
7373
// https://github.com/delta-io/delta/blob/f5ebfbb10bda87b6b2b7c9032e86edaf40d446bc/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L158-L160
7474
if (minWriterVersion >= MIN_VERSION_SUPPORTS_WRITER_FEATURES && writerFeatures.isEmpty()) {
75-
log.warn("writerFeatures must exist when minWriterVersion is greater than or equal to " + MIN_VERSION_SUPPORTS_WRITER_FEATURES);
75+
log.debug("writerFeatures must exist when minWriterVersion is greater than or equal to %s", MIN_VERSION_SUPPORTS_WRITER_FEATURES);
7676
}
7777

7878
// Spark done the similar logic here:
7979
// https://github.com/delta-io/delta/blob/f5ebfbb10bda87b6b2b7c9032e86edaf40d446bc/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L162-L166
8080
if (minReaderVersion >= MIN_VERSION_SUPPORTS_READER_FEATURES && minWriterVersion < MIN_VERSION_SUPPORTS_WRITER_FEATURES) {
81-
log.warn("When reader is on table features, writer must be on table features too");
81+
log.debug("When reader is on table features, writer must be on table features too");
8282
}
8383

8484
readerFeatures = requireNonNull(readerFeatures, "readerFeatures is null").map(ImmutableSet::copyOf);

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,27 @@ void testWriteCheckpointOnSchemaChangeCTAS()
494494
}
495495
}
496496

497+
@Test
498+
void testCreateOrReplacePreservesFeatures()
499+
throws Exception
500+
{
501+
try (TestTable table = newTrinoTable("test_create_or_replace_preserves_features_", "(x int) WITH (deletion_vectors_enabled = true)")) {
502+
assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName()))
503+
.contains("deletion_vectors_enabled = true");
504+
505+
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " (x int)");
506+
// verify deletion_vector_enabled is preserved after CREATE OR REPLACE TABLE
507+
Path tableLocation = Path.of(getTableLocation(table.getName()).replace("file://", ""));
508+
ProtocolEntry protocolEntry = loadProtocolEntry(1, tableLocation);
509+
assertThat(protocolEntry.readerFeaturesContains("deletionVectors")).isTrue();
510+
assertThat(protocolEntry.writerFeaturesContains("deletionVectors")).isTrue();
511+
512+
// deletion_vectors_enabled is not enabled, since we created the table without it
513+
assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName()))
514+
.doesNotContain("deletion_vectors_enabled = true");
515+
}
516+
}
517+
497518
@Test // regression test for https://github.com/trinodb/trino/issues/24121
498519
void testPartitionValuesParsedCheckpoint()
499520
throws Exception

testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCreateOrReplaceTableAsSelectCompatibility.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,24 @@ public void testCreateOrReplaceTableWithSchemaChangeOnCheckpoint()
140140
}
141141
}
142142

143+
@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
144+
public void testCreateOrReplaceTableWithOnlyFeaturesChangedOnTrino()
145+
{
146+
String tableName = "test_create_or_replace_table_discard_features_" + randomNameSuffix();
147+
148+
onTrino().executeQuery("CREATE TABLE delta.default." + tableName + " (x INT) " +
149+
"WITH (location = 's3://" + bucketName + "/delta-lake-test-" + tableName + "', deletion_vectors_enabled = true)");
150+
try {
151+
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES 1, 2, 3");
152+
153+
onTrino().executeQuery("CREATE OR REPLACE TABLE delta.default." + tableName + " (x INT)");
154+
assertThat(onDelta().executeQuery("SELECT * FROM " + tableName)).hasNoRows();
155+
}
156+
finally {
157+
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
158+
}
159+
}
160+
143161
private static List<Row> performInsert(QueryExecutor queryExecutor, String tableName, int numberOfRows)
144162
{
145163
ImmutableList.Builder<Row> expectedRowBuilder = ImmutableList.builder();

0 commit comments

Comments
 (0)