Skip to content

Commit 711cee7

Browse files
authored
fix(schema): streamline schema change handling and enhance test coverage (#3019) (#3024)
1 parent 451566e commit 711cee7

File tree

4 files changed

+2100
-590
lines changed

4 files changed

+2100
-590
lines changed

core/src/main/java/kafka/automq/table/worker/IcebergTableManager.java

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ protected List<SchemaChange> checkSchemaChanges(Table table, Schema currentSchem
152152
*/
153153
@VisibleForTesting
154154
protected synchronized void applySchemaChange(Table table, List<SchemaChange> changes) {
155-
LOGGER.info("Applying schema changes to table {}, changes {}", tableId, changes);
155+
LOGGER.info("Applying schema changes to table {}, changes {}", tableId, changes.stream().map(c -> c.getType() + ":" + c.getColumnFullName()).toList());
156156
Tasks.range(1)
157157
.retry(2)
158158
.run(notUsed -> applyChanges(table, changes));
@@ -221,16 +221,18 @@ private void collectRemovedField(Types.NestedField tableField, String parentName
221221
// if field doesn't exist in current schema and it's not a struct, mark it as optional (soft removal)
222222
if (currentField == null && !tableField.isOptional()) {
223223
changes.add(new SchemaChange(SchemaChange.ChangeType.MAKE_OPTIONAL, fieldName,
224-
tableField.type().asPrimitiveType(), parentName));
224+
null, parentName));
225225
return;
226226
}
227227
// if it is a nested field, recursively process subfields
228228
if (tableField.type().isStructType()) {
229-
List<Types.NestedField> tableSubFields = tableField.type().asStructType().fields();
230-
231-
for (Types.NestedField tableSubField : tableSubFields) {
232-
collectRemovedField(tableSubField, fullFieldName, currentSchema, changes);
233-
}
229+
collectRemovedStructFields(tableField.type().asStructType().fields(), fullFieldName, currentSchema, changes);
230+
} else if (isStructList(tableField.type())) {
231+
collectRemovedStructFields(tableField.type().asListType().elementType().asStructType().fields(),
232+
fullFieldName + ".element", currentSchema, changes);
233+
} else if (isStructMap(tableField.type())) {
234+
collectRemovedStructFields(tableField.type().asMapType().valueType().asStructType().fields(),
235+
fullFieldName + ".value", currentSchema, changes);
234236
}
235237
}
236238

@@ -241,38 +243,59 @@ private void collectFieldChanges(Types.NestedField currentField, String parentNa
241243
Types.NestedField tableField = tableSchema.findField(fullFieldName);
242244

243245
if (tableField == null) {
244-
// if it is a nested field, recursively process subfields
245-
if (currentField.type().isStructType()) {
246-
List<Types.NestedField> currentSubFields = currentField.type().asStructType().fields();
247-
248-
for (Types.NestedField currentSubField : currentSubFields) {
249-
collectFieldChanges(currentSubField, fullFieldName, tableSchema, changes);
246+
changes.add(new SchemaChange(SchemaChange.ChangeType.ADD_COLUMN, fieldName,
247+
currentField.type(), parentName));
248+
return;
249+
} else {
250+
Type currentType = currentField.type();
251+
Type tableType = tableField.type();
252+
if (currentType.isStructType() && tableType.isStructType()) {
253+
collectStructFieldChanges(currentType.asStructType().fields(), fullFieldName, tableSchema, changes);
254+
collectOptionalFieldChanges(currentField, parentName, changes, tableField, fieldName);
255+
} else if (isStructList(currentType) && isStructList(tableType)) {
256+
collectStructFieldChanges(currentType.asListType().elementType().asStructType().fields(),
257+
fullFieldName + ".element", tableSchema, changes);
258+
} else if (isStructMap(currentType) && isStructMap(tableType)) {
259+
collectStructFieldChanges(currentType.asMapType().valueType().asStructType().fields(),
260+
fullFieldName + ".value", tableSchema, changes);
261+
} else if (!currentType.isStructType() && !tableType.isStructType()) {
262+
collectOptionalFieldChanges(currentField, parentName, changes, tableField, fieldName);
263+
264+
if (!tableType.equals(currentType) && canPromoteType(tableType, currentType)) {
265+
changes.add(new SchemaChange(SchemaChange.ChangeType.PROMOTE_TYPE, fieldName, currentType, parentName));
250266
}
251-
} else {
252-
changes.add(new SchemaChange(SchemaChange.ChangeType.ADD_COLUMN, fieldName, currentField.type(), parentName));
253267
}
254-
} else {
255-
// if it is a nested field, recursively process subfields
256-
if (currentField.type().isStructType() && tableField.type().isStructType()) {
257-
List<Types.NestedField> currentSubFields = currentField.type().asStructType().fields();
268+
}
269+
}
258270

259-
for (Types.NestedField currentSubField : currentSubFields) {
260-
collectFieldChanges(currentSubField, fullFieldName, tableSchema, changes);
261-
}
262-
} else if (!currentField.type().isStructType() && !tableField.type().isStructType()) {
263-
// process optional fields
264-
if (!tableField.isOptional() && currentField.isOptional()) {
265-
changes.add(new SchemaChange(SchemaChange.ChangeType.MAKE_OPTIONAL, fieldName, null, parentName));
266-
}
271+
private static void collectOptionalFieldChanges(Types.NestedField currentField, String parentName, List<SchemaChange> changes, Types.NestedField tableField, String fieldName) {
272+
if (!tableField.isOptional() && currentField.isOptional()) {
273+
changes.add(new SchemaChange(SchemaChange.ChangeType.MAKE_OPTIONAL, fieldName, null, parentName));
274+
}
275+
}
267276

268-
// promote type if needed
269-
if (!tableField.type().equals(currentField.type()) && canPromoteType(tableField.type(), currentField.type())) {
270-
changes.add(new SchemaChange(SchemaChange.ChangeType.PROMOTE_TYPE, fieldName, currentField.type(), parentName));
271-
}
272-
}
277+
private void collectStructFieldChanges(List<Types.NestedField> currentSubFields, String parentFullName,
278+
Schema tableSchema, List<SchemaChange> changes) {
279+
for (Types.NestedField currentSubField : currentSubFields) {
280+
collectFieldChanges(currentSubField, parentFullName, tableSchema, changes);
281+
}
282+
}
283+
284+
private void collectRemovedStructFields(List<Types.NestedField> tableSubFields, String parentFullName,
285+
Schema currentSchema, List<SchemaChange> changes) {
286+
for (Types.NestedField tableSubField : tableSubFields) {
287+
collectRemovedField(tableSubField, parentFullName, currentSchema, changes);
273288
}
274289
}
275290

291+
private boolean isStructList(Type type) {
292+
return type.typeId() == Type.TypeID.LIST && type.asListType().elementType().isStructType();
293+
}
294+
295+
private boolean isStructMap(Type type) {
296+
return type.typeId() == Type.TypeID.MAP && type.asMapType().valueType().isStructType();
297+
}
298+
276299
private boolean canPromoteType(Type oldType, Type newType) {
277300
if (oldType.typeId() == Type.TypeID.INTEGER && newType.typeId() == Type.TypeID.LONG) {
278301
return true;

0 commit comments

Comments
 (0)