Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/bulk/core/load/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The Load CDK provides functionality for destination connectors including stream-

| Version | Date | Pull Request | Subject |
|---------|------------|--------------|-------------------------------------------------------------------------------------------------|
| 1.0.7 | 2026-03-27 | | Fix: update Iceberg sort order before schema evolution to prevent ValidationException when deleting columns referenced by the sort order. Handles Dedupe-to-Append mode switches and PK changes. |
| 1.0.6 | 2026-03-12 | [#74715](https://github.com/airbytehq/airbyte/pull/74715) | Fix: drop temp table after successful upsert to prevent duplicate records across syncs. |
| 1.0.5 | 2026-03-10 | [#74723](https://github.com/airbytehq/airbyte/pull/74723) | Fix schema evolution: defer identifier field update when replacing columns to avoid Iceberg conflict. |
| 1.0.4 | 2026-03-05 | [#74328](https://github.com/airbytehq/airbyte/pull/74328) | Fix iceberg dedup: map PK NumberType to StringType instead of DecimalType for identifier field compatibility. |
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/bulk/core/load/version.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=1.0.6
version=1.0.7
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ package io.airbyte.cdk.load.toolkits.iceberg.parquet
import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator.Companion.PARENT_CHILD_SEPARATOR
import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator.Companion.splitIntoParentAndLeaf
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
import org.apache.iceberg.Schema
import org.apache.iceberg.SortDirection
import org.apache.iceberg.Table
import org.apache.iceberg.UpdateSchema
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.PrimitiveType

private val logger = KotlinLogging.logger {}

/** Describes how the [IcebergTableSynchronizer] handles column type changes. */
enum class ColumnTypeChangeBehavior {
/**
Expand Down Expand Up @@ -86,6 +90,27 @@ class IcebergTableSynchronizer(
return SchemaUpdateResult(existingSchema, pendingUpdates = emptyList())
}

// Update the sort order before creating the UpdateSchema, because:
// 1. Deleting a column referenced by the sort order will cause
// SortOrder.checkCompatibility to throw ValidationException on commit.
// 2. UpdateSchema captures the table's metadata version at creation time.
// If we replace the sort order after creating it, the commit would fail
// with a stale metadata error.
val columnsBeingDeleted = buildList {
addAll(diff.removedColumns)
if (columnTypeChangeBehavior == ColumnTypeChangeBehavior.OVERWRITE) {
// In OVERWRITE mode, type-changed columns are deleted and re-added
// with new field IDs. The old sort field references become invalid.
addAll(diff.updatedDataTypes)
}
}
replaceSortOrderIfNeeded(
table = table,
columnsBeingDeleted = columnsBeingDeleted,
identifierFieldsChanged = diff.identifierFieldsChanged,
incomingIdentifierFieldNames = incomingSchema.identifierFieldNames(),
)

val update: UpdateSchema = table.updateSchema().allowIncompatibleChanges()

// 1) Remove columns that no longer exist in the incoming schema
Expand Down Expand Up @@ -267,6 +292,90 @@ class IcebergTableSynchronizer(
return SchemaUpdateResult(newSchema, pendingUpdates = listOf(update))
}
}

/**
* Update the table's sort order if it would conflict with pending schema changes.
*
* Sort orders are set at table creation from identifier fields (PKs) and never updated. This
* causes [org.apache.iceberg.exceptions.ValidationException] when schema evolution deletes a
* column referenced by the sort order.
*
* This method handles three cases:
* 1. Identifier fields changed → rebuild sort order from new identifiers (covers
* ```
* Dedupe→Append, PK changes within Dedupe)
* ```
* 2. Columns being deleted conflict with sort order → remove those fields
* 3. Neither → no-op
*
* Must be called BEFORE creating the [UpdateSchema], since this commits a metadata change and
* the subsequent UpdateSchema needs the refreshed metadata version.
*/
private fun replaceSortOrderIfNeeded(
table: Table,
columnsBeingDeleted: List<String>,
identifierFieldsChanged: Boolean,
incomingIdentifierFieldNames: Set<String>,
) {
val currentSortOrder = table.sortOrder()

// If the table has no sort order, there's nothing to conflict and nothing to update.
// (Append→Dedupe would need a sort order added, but that case requires a reset.)
if (currentSortOrder.isUnsorted) {
return
}

if (identifierFieldsChanged) {
// Rebuild sort order from the new identifier fields.
// For Dedupe→Append: incoming identifiers are empty → unsorted.
// For PK changes within Dedupe: new identifiers → new sort order.
val builder = table.replaceSortOrder()
for (fieldName in incomingIdentifierFieldNames) {
// Only include fields that exist in the current schema. Fields being
// added in the same schema change can't be referenced yet.
if (table.schema().findField(fieldName) != null) {
builder.asc(fieldName)
}
}
logger.info {
"Replacing sort order due to identifier field change. " +
"New sort fields: ${incomingIdentifierFieldNames.ifEmpty { setOf("(unsorted)") }}"
}
builder.commit()
table.refresh()
return
}

// No identifier change — check if any deleted columns conflict with the sort order.
if (columnsBeingDeleted.isEmpty()) {
return
}

val schema = table.schema()
val fieldIdsBeingDeleted =
columnsBeingDeleted.mapNotNull { schema.findField(it)?.fieldId() }.toSet()

val hasConflict = currentSortOrder.fields().any { it.sourceId() in fieldIdsBeingDeleted }
if (!hasConflict) {
return
}

// Rebuild the sort order, keeping only fields that aren't being deleted.
val builder = table.replaceSortOrder()
for (sortField in currentSortOrder.fields()) {
if (sortField.sourceId() !in fieldIdsBeingDeleted) {
val fieldName = schema.findColumnName(sortField.sourceId())
when (sortField.direction()) {
SortDirection.ASC -> builder.asc(fieldName, sortField.nullOrder())
SortDirection.DESC -> builder.desc(fieldName, sortField.nullOrder())
else -> builder.asc(fieldName, sortField.nullOrder())
}
}
}
logger.info { "Replacing sort order to remove fields being deleted: $columnsBeingDeleted" }
builder.commit()
table.refresh()
}
}

data class SchemaUpdateResult(val schema: Schema, val pendingUpdates: List<UpdateSchema>)
Loading
Loading