Skip to content

Commit 3e24daa

Browse files
authored
chore: Update S3 data-lake to use the latest version of the cdk (#69229)
1 parent 9aaf726 commit 3e24daa

File tree

3 files changed

+9
-6
lines changed

3 files changed

+9
-6
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
testExecutionConcurrency=-1
2-
cdkVersion=0.1.61
2+
cdkVersion=0.1.69
33
JunitMethodExecutionTimeout=10m

airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoader.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ class S3DataLakeStreamLoader(
101101
// In principle, this doesn't matter, but the iceberg SDK throws an error about
102102
// stale table metadata without this.
103103
table.refresh()
104-
computeOrExecuteSchemaUpdate().pendingUpdate?.commit()
104+
// Commit all pending schema updates in order (important for two-phase commits)
105+
computeOrExecuteSchemaUpdate().pendingUpdates.forEach { it.commit() }
105106
table.manageSnapshots().replaceBranch(mainBranchName, stagingBranchName).commit()
106107

107108
if (stream.isSingleGenerationTruncate()) {

airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTableSynchronizerTest.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ class S3DataLakeTableSynchronizerTest {
9797
)
9898

9999
// We expect the original schema to be returned
100-
assertThat(result).isEqualTo(SchemaUpdateResult(existingSchema, pendingUpdate = null))
100+
assertThat(result)
101+
.isEqualTo(SchemaUpdateResult(existingSchema, pendingUpdates = emptyList()))
101102

102103
// Verify that no calls to updateSchema() manipulation were made
103104
verify(exactly = 0) { mockUpdateSchema.deleteColumn(any()) }
@@ -132,7 +133,8 @@ class S3DataLakeTableSynchronizerTest {
132133
// The final returned schema is the table's schema after refresh
133134
// Since we aren't actually applying changes, just assert that it's whatever the mock
134135
// returns
135-
assertThat(result).isEqualTo(SchemaUpdateResult(mockNewSchema, pendingUpdate = null))
136+
assertThat(result)
137+
.isEqualTo(SchemaUpdateResult(mockNewSchema, pendingUpdates = emptyList()))
136138
}
137139

138140
@Test
@@ -406,7 +408,7 @@ class S3DataLakeTableSynchronizerTest {
406408

407409
every { mockTable.schema() } returns existingSchema
408410

409-
val (schema, pendingUpdate) =
411+
val (schema, pendingUpdates) =
410412
synchronizer.maybeApplySchemaChanges(
411413
mockTable,
412414
incomingSchema,
@@ -422,6 +424,6 @@ class S3DataLakeTableSynchronizerTest {
422424
confirmVerified(mockUpdateSchema)
423425

424426
assertThat(schema).isSameAs(mockNewSchema)
425-
assertThat(pendingUpdate).isNotNull
427+
assertThat(pendingUpdates).hasSize(1)
426428
}
427429
}

0 commit comments

Comments
 (0)