Skip to content

Commit c2cd15a

Browse files
Pluieschenjian2664
authored andcommitted
Write checkpoint if needed on CREATE OR REPLACE without schema change
1 parent 2325bc6 commit c2cd15a

File tree

4 files changed

+110
-37
lines changed

4 files changed

+110
-37
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 62 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,9 +1382,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
13821382
if (replaceExistingTable) {
13831383
List<DeltaLakeColumnHandle> existingColumns = getColumns(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry());
13841384
List<DeltaLakeColumnHandle> newColumns = getColumns(metadataEntry, protocolEntry);
1385-
if (isNewCheckpointFileRequired(existingColumns, newColumns)) {
1386-
writeCheckpoint(session, schemaTableName, location, tableHandle.toCredentialsHandle(), commitVersion);
1387-
}
1385+
writeCheckpointIfNeeded(session, schemaTableName, location, tableHandle.toCredentialsHandle(), tableHandle.getReadVersion(), checkpointInterval, commitVersion, Optional.of(existingColumns), Optional.of(newColumns));
13881386
}
13891387
}
13901388
}
@@ -1542,12 +1540,12 @@ public DeltaLakeOutputTableHandle beginCreateTable(
15421540

15431541
OptionalLong readVersion = OptionalLong.empty();
15441542
ProtocolEntry protocolEntry;
1543+
Optional<List<DeltaLakeColumnHandle>> existingColumns = Optional.empty();
15451544

1546-
boolean isNewCheckpointFileRequired = false;
15471545
if (replaceExistingTable) {
15481546
protocolEntry = protocolEntryForTable(ProtocolEntry.builder(handle.getProtocolEntry()), containsTimestampType, tableMetadata.getProperties());
15491547
readVersion = OptionalLong.of(handle.getReadVersion());
1550-
isNewCheckpointFileRequired = isNewCheckpointFileRequired(getColumns(handle.getMetadataEntry(), handle.getProtocolEntry()), columnHandles.build());
1548+
existingColumns = Optional.of(getColumns(handle.getMetadataEntry(), handle.getProtocolEntry()));
15511549
}
15521550
else {
15531551
TrinoFileSystem fileSystem = fileSystemFactory.create(session, location);
@@ -1570,7 +1568,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(
15701568
columnMappingMode,
15711569
maxFieldId,
15721570
replace,
1573-
isNewCheckpointFileRequired,
1571+
existingColumns,
15741572
readVersion,
15751573
protocolEntry);
15761574
}
@@ -1759,8 +1757,8 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
17591757
transactionLogWriter.flush();
17601758
writeCommitted = true;
17611759

1762-
if (handle.replace() && handle.readVersion().isPresent() && handle.isSchemaChanged()) {
1763-
writeCheckpoint(session, schemaTableName, location, handle.toCredentialsHandle(), commitVersion);
1760+
if (handle.replace() && handle.readVersion().isPresent()) {
1761+
writeCheckpointIfNeeded(session, schemaTableName, handle.location(), handle.toCredentialsHandle(), handle.readVersion().getAsLong(), handle.checkpointInterval(), commitVersion, handle.existingColumns(), Optional.of(handle.inputColumns()));
17641762
}
17651763

17661764
if (isCollectExtendedStatisticsColumnStatisticsOnWrite(session) && !computedStatistics.isEmpty()) {
@@ -2323,7 +2321,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(
23232321
long commitVersion = Failsafe.with(TRANSACTION_CONFLICT_RETRY_POLICY)
23242322
.get(context -> commitInsertOperation(session, handle, sourceTableHandles, isolationLevel, dataFileInfos, readVersion, context.getAttemptCount()));
23252323
writeCommitted = true;
2326-
writeCheckpointIfNeeded(session, handle.tableName(), handle.location(), handle.credentialsHandle(), handle.readVersion(), handle.metadataEntry().getCheckpointInterval(), commitVersion);
2324+
writeCheckpointIfNeeded(session, handle.tableName(), handle.location(), handle.credentialsHandle(), handle.readVersion(), handle.metadataEntry().getCheckpointInterval(), commitVersion, Optional.empty(), Optional.empty());
23272325
enqueueUpdateInfo(session, handle.tableName().getSchemaName(), handle.tableName().getTableName(), commitVersion, handle.metadataEntry().getSchemaString(), Optional.ofNullable(handle.metadataEntry().getDescription()));
23282326

23292327
if (isCollectExtendedStatisticsColumnStatisticsOnWrite(session) && !computedStatistics.isEmpty() && !dataFileInfos.isEmpty()) {
@@ -2713,7 +2711,7 @@ public void finishMerge(
27132711
handle.getMetadataEntry().getSchemaString(),
27142712
Optional.ofNullable(handle.getMetadataEntry().getDescription()));
27152713

2716-
writeCheckpointIfNeeded(session, handle.getSchemaTableName(), handle.getLocation(), handle.toCredentialsHandle(), handle.getReadVersion(), checkpointInterval, commitVersion);
2714+
writeCheckpointIfNeeded(session, handle.getSchemaTableName(), handle.getLocation(), handle.toCredentialsHandle(), handle.getReadVersion(), checkpointInterval, commitVersion, Optional.empty(), Optional.empty());
27172715
}
27182716
catch (RuntimeException e) {
27192717
if (!writeCommitted) {
@@ -2995,7 +2993,9 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
29952993
optimizeHandle.getCredentialsHandle(),
29962994
optimizeHandle.getCurrentVersion().orElseThrow(),
29972995
checkpointInterval,
2998-
commitVersion);
2996+
commitVersion,
2997+
Optional.empty(),
2998+
Optional.empty());
29992999
}
30003000
catch (Exception e) {
30013001
if (!writeCommitted) {
@@ -3183,23 +3183,25 @@ private void writeCheckpointIfNeeded(
31833183
VendedCredentialsHandle credentialsHandle,
31843184
long readVersion,
31853185
Optional<Long> checkpointInterval,
3186-
long newVersion)
3186+
long newVersion,
3187+
Optional<List<DeltaLakeColumnHandle>> existingColumns,
3188+
Optional<List<DeltaLakeColumnHandle>> newColumns)
31873189
{
31883190
try {
31893191
// We are writing checkpoint synchronously. It should not be long lasting operation for tables where transaction log is not humongous.
31903192
// Tables with really huge transaction logs would behave poorly in read flow already.
3191-
long lastCheckpointVersion = getLastCheckpointVersion(session, table, tableLocation, readVersion, credentialsHandle);
3192-
if (newVersion - lastCheckpointVersion < checkpointInterval.orElse(defaultCheckpointInterval)) {
3193-
return;
3193+
if (isCheckpointNeeded(
3194+
session,
3195+
table,
3196+
tableLocation,
3197+
credentialsHandle,
3198+
readVersion,
3199+
checkpointInterval,
3200+
newVersion,
3201+
existingColumns,
3202+
newColumns)) {
3203+
writeCheckpoint(session, table, tableLocation, credentialsHandle, newVersion);
31943204
}
3195-
3196-
// TODO: There is a race possibility here(https://github.com/trinodb/trino/issues/12004),
3197-
// which may result in us not writing checkpoints at exactly the planned frequency.
3198-
// The snapshot obtained above may already be on a version higher than `newVersion` because some other transaction could have just been committed.
3199-
// This does not pose correctness issue but may be confusing if someone looks into transaction log.
3200-
// To fix that we should allow for getting snapshot for given version.
3201-
3202-
writeCheckpoint(session, table, tableLocation, credentialsHandle, newVersion);
32033205
}
32043206
catch (Exception e) {
32053207
// We can't fail here as transaction was already committed, in case of INSERT this could result
@@ -3208,6 +3210,32 @@ private void writeCheckpointIfNeeded(
32083210
}
32093211
}
32103212

3213+
private boolean isCheckpointNeeded(
3214+
ConnectorSession session,
3215+
SchemaTableName table,
3216+
String tableLocation,
3217+
VendedCredentialsHandle credentialsHandle,
3218+
long readVersion,
3219+
Optional<Long> checkpointInterval,
3220+
long newVersion,
3221+
Optional<List<DeltaLakeColumnHandle>> existingColumns,
3222+
Optional<List<DeltaLakeColumnHandle>> newColumns)
3223+
{
3224+
// If columns have changed: we need to write a new checkpoint regardless of interval
3225+
if (columnChangeRequiresCheckpoint(existingColumns, newColumns)) {
3226+
return true;
3227+
}
3228+
3229+
// If we've reached the checkpoint writing interval: we need to write a checkpoint
3230+
// TODO: There is a race possibility here(https://github.com/trinodb/trino/issues/12004),
3231+
// which may result in us not writing checkpoints at exactly the planned frequency.
3232+
// The snapshot obtained above may already be on a version higher than `newVersion` because some other transaction could have just been committed.
3233+
// This does not pose correctness issue but may be confusing if someone looks into transaction log.
3234+
// To fix that we should allow for getting snapshot for given version.
3235+
long lastCheckpointVersion = getLastCheckpointVersion(session, table, tableLocation, readVersion, credentialsHandle);
3236+
return newVersion - lastCheckpointVersion >= checkpointInterval.orElse(defaultCheckpointInterval);
3237+
}
3238+
32113239
private void writeCheckpoint(ConnectorSession session, SchemaTableName table, String tableLocation, VendedCredentialsHandle credentialsHandle, long newVersion)
32123240
throws IOException
32133241
{
@@ -3216,13 +3244,18 @@ private void writeCheckpoint(ConnectorSession session, SchemaTableName table, St
32163244
checkpointWriterManager.writeCheckpoint(session, snapshot, credentialsHandle);
32173245
}
32183246

3219-
private static boolean isNewCheckpointFileRequired(List<DeltaLakeColumnHandle> existingColumns, List<DeltaLakeColumnHandle> newColumns)
3247+
private static boolean columnChangeRequiresCheckpoint(
3248+
Optional<List<DeltaLakeColumnHandle>> existingColumns,
3249+
Optional<List<DeltaLakeColumnHandle>> newColumns)
32203250
{
3221-
Map<String, Type> newColumnHandles = newColumns.stream()
3251+
if (existingColumns.isEmpty() || newColumns.isEmpty()) {
3252+
return false;
3253+
}
3254+
Map<String, Type> newColumnHandles = newColumns.get().stream()
32223255
.filter(column -> !isMetadataColumnHandle(column))
32233256
.collect(toImmutableMap(DeltaLakeColumnHandle::columnName, DeltaLakeColumnHandle::type));
32243257

3225-
for (DeltaLakeColumnHandle existingColumn : existingColumns) {
3258+
for (DeltaLakeColumnHandle existingColumn : existingColumns.get()) {
32263259
if (isMetadataColumnHandle(existingColumn)) {
32273260
continue;
32283261
}
@@ -4329,7 +4362,9 @@ private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandl
43294362
tableHandle.toCredentialsHandle(),
43304363
tableHandle.getReadVersion(),
43314364
tableHandle.getMetadataEntry().getCheckpointInterval(),
4332-
commitDeleteOperationResult.commitVersion());
4365+
commitDeleteOperationResult.commitVersion(),
4366+
Optional.empty(),
4367+
Optional.empty());
43334368
enqueueUpdateInfo(
43344369
session,
43354370
tableHandle.getSchemaName(),

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeOutputTableHandle.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public record DeltaLakeOutputTableHandle(
4242
ColumnMappingMode columnMappingMode,
4343
OptionalInt maxColumnId,
4444
boolean replace,
45-
boolean isSchemaChanged,
45+
Optional<List<DeltaLakeColumnHandle>> existingColumns,
4646
OptionalLong readVersion,
4747
ProtocolEntry protocolEntry)
4848
implements ConnectorOutputTableHandle
@@ -59,6 +59,7 @@ public record DeltaLakeOutputTableHandle(
5959
requireNonNull(schemaString, "schemaString is null");
6060
requireNonNull(columnMappingMode, "columnMappingMode is null");
6161
requireNonNull(maxColumnId, "maxColumnId is null");
62+
requireNonNull(existingColumns, "existingColumns is null");
6263
requireNonNull(readVersion, "readVersion is null");
6364
requireNonNull(protocolEntry, "protocolEntry is null");
6465
}

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1446,7 +1446,7 @@ public void testCheckpointWriteStatsAsStructWithPartiallyUnsupportedColumnStats(
14461446
}
14471447

14481448
@Test
1449-
public void testCreateOrReplaceCheckpointing()
1449+
public void testCreateOrReplaceWithSameSchemaWritesCheckpoint()
14501450
{
14511451
String tableName = "test_create_or_replace_checkpointing_" + randomNameSuffix();
14521452
assertUpdate(
@@ -1463,27 +1463,64 @@ public void testCreateOrReplaceCheckpointing()
14631463
assertThat(listCheckpointFiles(transactionLogDirectory)).isEmpty();
14641464
assertQuery("SELECT * FROM " + tableName, "VALUES (1,'ala'), (2,'kota'), (3, 'psa')");
14651465

1466-
// replace table
1466+
// replace table with same schema, expect a checkpoint because the checkpoint interval threshold has been reached
14671467
assertUpdate(
1468-
format("CREATE OR REPLACE TABLE %s (a_number integer) " +
1468+
format("CREATE OR REPLACE TABLE %s (a_number integer, a_string varchar) " +
14691469
" WITH (checkpoint_interval = 2)",
14701470
tableName));
14711471
assertThat(listCheckpointFiles(transactionLogDirectory)).hasSize(1);
14721472
assertThat(query("SELECT * FROM " + tableName)).returnsEmptyResult();
14731473

1474-
assertUpdate(format("INSERT INTO " + tableName + " VALUES 1", tableName), 1);
1474+
assertUpdate(format("INSERT INTO " + tableName + " VALUES (1, 'bobra')", tableName), 1);
14751475
assertThat(listCheckpointFiles(transactionLogDirectory)).hasSize(1);
1476-
assertQuery("SELECT * FROM " + tableName, "VALUES 1");
1476+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'bobra')");
14771477

14781478
// replace table with selection
14791479
assertUpdate(
1480-
format("CREATE OR REPLACE TABLE %s (a_string) " +
1480+
format("CREATE OR REPLACE TABLE %s (a_number, a_string) " +
14811481
" WITH (checkpoint_interval = 2) " +
1482-
" AS VALUES 'bobra', 'kreta'",
1482+
" AS VALUES (1, 'bobra'), (2, 'kreta')",
14831483
tableName),
14841484
2);
14851485
assertThat(listCheckpointFiles(transactionLogDirectory)).hasSize(2);
1486-
assertQuery("SELECT * FROM " + tableName, "VALUES 'bobra', 'kreta'");
1486+
assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'bobra'), (2, 'kreta')");
1487+
1488+
assertUpdate("DROP TABLE " + tableName);
1489+
}
1490+
1491+
@Test
1492+
public void testCreateOrReplaceWithDifferentSchemaWritesCheckpoint()
1493+
{
1494+
String tableName = "test_create_or_replace_checkpointing_" + randomNameSuffix();
1495+
assertUpdate(
1496+
format("CREATE OR REPLACE TABLE %s (a_number, a_string) " +
1497+
" WITH (location = '%s', " +
1498+
" partitioned_by = ARRAY['a_number']) " +
1499+
" AS VALUES (1, 'ala')",
1500+
tableName,
1501+
getLocationForTable(bucketName, tableName)),
1502+
1);
1503+
String transactionLogDirectory = format("%s/_delta_log", tableName);
1504+
1505+
assertUpdate(format("INSERT INTO %s VALUES (2, 'kota'), (3, 'psa')", tableName), 2);
1506+
assertThat(listCheckpointFiles(transactionLogDirectory)).isEmpty();
1507+
assertQuery("SELECT * FROM " + tableName, "VALUES (1,'ala'), (2,'kota'), (3, 'psa')");
1508+
1509+
// replace table with same schema and a large checkpoint_interval, expect no new checkpoint
1510+
assertUpdate(
1511+
format("CREATE OR REPLACE TABLE %s (a_number integer, a_string varchar) " +
1512+
" WITH (checkpoint_interval = 100)",
1513+
tableName));
1514+
assertThat(listCheckpointFiles(transactionLogDirectory)).hasSize(0);
1515+
assertThat(query("SELECT * FROM " + tableName)).returnsEmptyResult();
1516+
1517+
// replace table with a different schema and a large checkpoint_interval, expect a new checkpoint because columns have changed
1518+
assertUpdate(
1519+
format("CREATE OR REPLACE TABLE %s (a_number integer, a_string integer) " +
1520+
" WITH (checkpoint_interval = 100)",
1521+
tableName));
1522+
assertThat(listCheckpointFiles(transactionLogDirectory)).hasSize(1);
1523+
assertThat(query("SELECT * FROM " + tableName)).returnsEmptyResult();
14871524

14881525
assertUpdate("DROP TABLE " + tableName);
14891526
}

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ private static ConnectorPageSink createPageSink(String outputPath, DeltaLakeWrit
181181
NONE,
182182
OptionalInt.empty(),
183183
false,
184-
false,
184+
Optional.of(getColumnHandles()),
185185
OptionalLong.empty(),
186186
new ProtocolEntry(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, Optional.empty(), Optional.empty()));
187187

0 commit comments

Comments
 (0)