Skip to content

Commit 297bb11

Browse files
authored
Preserve required/optional field property when converting from source schema to Iceberg (memiiso#575)
* Add configuration option to preserve the `required` field property - debezium.sink.iceberg.preserve-required-property * Implement `preserve-required-property` for `JsonSchemaConverter`
1 parent b4d050f commit 297bb11

File tree

6 files changed

+132
-8
lines changed

6 files changed

+132
-8
lines changed

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,8 @@ public interface IcebergConfig {
8787
@WithDefault("org.apache.iceberg.io.ResolvingFileIO")
8888
String ioImpl();
8989

90+
@WithName("debezium.sink.iceberg.preserve-required-property")
91+
@WithDefault("false")
92+
boolean preserveRequiredProperty();
93+
9094
}

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/converter/JsonSchemaConverter.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ private static String getFieldName(JsonNode fieldSchema) {
4747
return nameNode.textValue();
4848
}
4949

50+
private static boolean getFieldIsOptional(JsonNode fieldSchema) {
51+
JsonNode nameNode = fieldSchema.get("optional");
52+
if (nameNode == null || nameNode.isNull()) {
53+
return true;
54+
}
55+
56+
return nameNode.booleanValue();
57+
}
5058

5159
/***
5260
* converts given debezium filed to iceberg field equivalent. does recursion in case of complex/nested types.
@@ -59,12 +67,14 @@ private static String getFieldName(JsonNode fieldSchema) {
5967
private IcebergSchemaInfo debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, IcebergSchemaInfo schemaData, JsonNode keySchemaNode) {
6068
String fieldType = fieldSchema.get("type").textValue();
6169
String fieldTypeName = getFieldName(fieldSchema);
70+
boolean fieldIsOptional = getFieldIsOptional(fieldSchema);
6271

6372
if (fieldType == null || fieldType.isBlank()) {
6473
throw new DebeziumException("Unexpected schema field, field type is null or empty, fieldSchema:" + fieldSchema + " fieldName:" + fieldName);
6574
}
6675

6776
boolean isPkField = !(keySchemaNode == null || keySchemaNode.isNull());
77+
boolean isOptional = config.iceberg().preserveRequiredProperty() ? fieldIsOptional : !isPkField;
6878
switch (fieldType) {
6979
case "struct":
7080
int rootStructId = schemaData.nextFieldId().getAndIncrement();
@@ -76,7 +86,7 @@ private IcebergSchemaInfo debeziumFieldToIcebergField(JsonNode fieldSchema, Stri
7686
}
7787
// create it as struct, nested type
7888
final Types.StructType structType = Types.StructType.of(subSchemaData.fields());
79-
final Types.NestedField structField = Types.NestedField.of(rootStructId, !isPkField, fieldName, structType);
89+
final Types.NestedField structField = Types.NestedField.of(rootStructId, isOptional, fieldName, structType);
8090
schemaData.fields().add(structField);
8191
return schemaData;
8292
case "map":
@@ -92,7 +102,7 @@ private IcebergSchemaInfo debeziumFieldToIcebergField(JsonNode fieldSchema, Stri
92102
final IcebergSchemaInfo valSchemaData = schemaData.copyPreservingMetadata();
93103
debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData, null);
94104
final Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type());
95-
schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField));
105+
schemaData.fields().add(Types.NestedField.of(rootMapId, isOptional, fieldName, mapField));
96106
return schemaData;
97107

98108
case "array":
@@ -103,11 +113,11 @@ private IcebergSchemaInfo debeziumFieldToIcebergField(JsonNode fieldSchema, Stri
103113
final IcebergSchemaInfo arraySchemaData = schemaData.copyPreservingMetadata();
104114
debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData, null);
105115
final Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type());
106-
schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField));
116+
schemaData.fields().add(Types.NestedField.of(rootArrayId, isOptional, fieldName, listField));
107117
return schemaData;
108118
default:
109119
// its primitive field
110-
final Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), !isPkField, fieldName, icebergPrimitiveField(fieldName, fieldType, fieldTypeName, fieldSchema));
120+
final Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), isOptional, fieldName, icebergPrimitiveField(fieldName, fieldType, fieldTypeName, fieldSchema));
111121
schemaData.fields().add(field);
112122
if (isPkField) schemaData.identifierFieldIds().add(field.fieldId());
113123
return schemaData;

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/converter/StructSchemaConverter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ private void debeziumFieldToIcebergField(Field connectField, IcebergSchemaInfo s
7777
}
7878

7979
boolean isPkField = connectKeyField != null;
80+
boolean isOptional = config.iceberg().preserveRequiredProperty() ? connectField.schema().isOptional() : !isPkField;
8081
LOGGER.trace("Converting field: '{}', Type: '{}', LogicalType: '{}', PK: {}", fieldName, fieldType, fieldTypeName, isPkField);
8182

8283
switch (fieldType) {
@@ -88,7 +89,7 @@ private void debeziumFieldToIcebergField(Field connectField, IcebergSchemaInfo s
8889
}
8990
// Create it as struct, nested type
9091
final Types.StructType structType = Types.StructType.of(subSchemaData.fields());
91-
final Types.NestedField structField = Types.NestedField.of(rootStructId, !isPkField, fieldName, structType, connectSchema.doc());
92+
final Types.NestedField structField = Types.NestedField.of(rootStructId, isOptional, fieldName, structType, connectSchema.doc());
9293
schemaData.fields().add(structField);
9394
// Add struct field ID to PK list if the struct itself is a PK (though generally disallowed)
9495
if (isPkField) {
@@ -125,7 +126,7 @@ private void debeziumFieldToIcebergField(Field connectField, IcebergSchemaInfo s
125126
Type valueType = valSchemaData.fields().get(0).type(); // Assuming the recursive call adds one field
126127

127128
final Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keyType, valueType);
128-
schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField, connectSchema.doc()));
129+
schemaData.fields().add(Types.NestedField.of(rootMapId, isOptional, fieldName, mapField, connectSchema.doc()));
129130
break; // Added break
130131

131132
case ARRAY:
@@ -143,14 +144,14 @@ private void debeziumFieldToIcebergField(Field connectField, IcebergSchemaInfo s
143144
Type elementType = arraySchemaData.fields().get(0).type(); // Assuming the recursive call adds one field
144145

145146
final Types.ListType listField = Types.ListType.ofOptional(elementFieldId, elementType);
146-
schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField, connectSchema.doc()));
147+
schemaData.fields().add(Types.NestedField.of(rootArrayId, isOptional, fieldName, listField, connectSchema.doc()));
147148
break; // Added break
148149

149150
default:
150151
// It's a primitive field
151152
int primitiveFieldId = schemaData.nextFieldId().getAndIncrement();
152153
final Type.PrimitiveType primitiveType = icebergPrimitiveField(fieldName, connectSchema);
153-
final Types.NestedField field = Types.NestedField.of(primitiveFieldId, !isPkField, fieldName, primitiveType, connectSchema.doc());
154+
final Types.NestedField field = Types.NestedField.of(primitiveFieldId, isOptional, fieldName, primitiveType, connectSchema.doc());
154155
schemaData.fields().add(field);
155156
if (isPkField) schemaData.identifierFieldIds().add(field.fieldId());
156157
break; // Added break

debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/converter/JsonEventConverterTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.apache.iceberg.types.Types;
1919
import org.apache.kafka.common.serialization.Serde;
2020
import org.junit.jupiter.api.BeforeAll;
21+
import org.junit.jupiter.api.BeforeEach;
2122
import org.junit.jupiter.api.Test;
2223
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
2324

@@ -32,6 +33,7 @@
3233
import static org.junit.jupiter.api.Assertions.assertFalse;
3334
import static org.junit.jupiter.api.Assertions.assertNull;
3435
import static org.junit.jupiter.api.Assertions.assertTrue;
36+
import static org.mockito.Mockito.when;
3537

3638
@QuarkusTest
3739
@DisabledIfEnvironmentVariable(named = "DEBEZIUM_FORMAT_VALUE", matches = "connect")
@@ -52,6 +54,11 @@ static void setup() {
5254
JsonEventConverter.initializeJsonSerde();
5355
}
5456

57+
@BeforeEach
58+
void setUpBeforeEach() {
59+
when(icebergConfig.preserveRequiredProperty()).thenReturn(false);
60+
}
61+
5562
@Test
5663
public void testNestedJsonRecord() {
5764
JsonEventConverter e = new JsonEventConverter("test", serdeWithSchema, null, config);
@@ -92,6 +99,34 @@ public void testUnwrapJsonRecord() {
9299
assertEquals(schema.identifierFieldIds(), Set.of());
93100
}
94101

102+
@Test
103+
public void testPreserveRequiredProperty() {
104+
// Preserve required fields as required in Iceberg schema
105+
when(icebergConfig.preserveRequiredProperty()).thenReturn(true);
106+
107+
JsonEventConverter e = new JsonEventConverter("test", unwrapWithSchema, null, config);
108+
Schema schema = e.icebergSchema();
109+
RecordWrapper record = e.convert(schema);
110+
assertEquals("orders", record.getField("__table").toString());
111+
assertEquals(LocalDate.parse("2016-02-19"), record.getField("order_date"));
112+
assertEquals("""
113+
table {
114+
1: id: required int
115+
2: order_date: required date
116+
3: purchaser: required int
117+
4: quantity: required int
118+
5: product_id: required int
119+
6: __op: optional string
120+
7: __table: optional string
121+
8: __lsn: optional long
122+
9: __source_ts_ms: optional timestamptz
123+
10: __deleted: optional string
124+
}""",
125+
schema.toString());
126+
127+
assertEquals(schema.identifierFieldIds(), Set.of());
128+
}
129+
95130
@Test
96131
public void testNestedArrayJsonRecord() {
97132
JsonEventConverter e = new JsonEventConverter("test", unwrapWithArraySchema, null, config);

debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/converter/StructSchemaConverterTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ void setUpBeforeEach() {
5050
when(config.debezium()).thenReturn(debeziumConfig);
5151
when(icebergConfig.createIdentifierFields()).thenReturn(true);
5252
when(debeziumConfig.isEventFlatteningEnabled()).thenReturn(true);
53+
when(icebergConfig.preserveRequiredProperty()).thenReturn(false);
5354
}
5455

5556
@Test
@@ -259,6 +260,78 @@ void testComplexSchemaConversion() {
259260
System.out.println(icebergSchema.toString());
260261
}
261262

263+
@Test
264+
void testPreserveRequiredProperty() {
265+
// Preserve required fields as required in Iceberg schema
266+
when(icebergConfig.preserveRequiredProperty()).thenReturn(true);
267+
268+
org.apache.iceberg.Schema icebergSchema = setupRequiredPropertyTest();
269+
LOGGER.error("{}", icebergSchema);
270+
271+
assertTrue(icebergSchema.findField("pk_id").isRequired());
272+
assertTrue(icebergSchema.findField("f_required").isRequired());
273+
assertTrue(icebergSchema.findField("f_optional").isOptional());
274+
assertTrue(icebergSchema.findField("f_array").isRequired());
275+
assertTrue(icebergSchema.findField("f_array_opt").isOptional());
276+
assertTrue(icebergSchema.findField("f_map").isRequired());
277+
assertTrue(icebergSchema.findField("f_map_opt").isOptional());
278+
assertTrue(icebergSchema.findField("f_struct").isRequired());
279+
assertTrue(icebergSchema.findField("f_struct_opt").isOptional());
280+
}
281+
282+
@Test
283+
void testDefaultRequiredPropertyConversion() {
284+
org.apache.iceberg.Schema icebergSchema = setupRequiredPropertyTest();
285+
LOGGER.error("{}", icebergSchema);
286+
287+
assertTrue(icebergSchema.findField("pk_id").isRequired());
288+
assertTrue(icebergSchema.findField("f_required").isOptional());
289+
assertTrue(icebergSchema.findField("f_optional").isOptional());
290+
assertTrue(icebergSchema.findField("f_array").isOptional());
291+
assertTrue(icebergSchema.findField("f_array_opt").isOptional());
292+
assertTrue(icebergSchema.findField("f_map").isOptional());
293+
assertTrue(icebergSchema.findField("f_map_opt").isOptional());
294+
assertTrue(icebergSchema.findField("f_struct").isOptional());
295+
assertTrue(icebergSchema.findField("f_struct_opt").isOptional());
296+
}
297+
298+
private org.apache.iceberg.Schema setupRequiredPropertyTest() {
299+
// Define nested schema
300+
SchemaBuilder structBuilder = SchemaBuilder.struct()
301+
.field("nested_id", Schema.INT32_SCHEMA)
302+
.field("nested_data", Schema.OPTIONAL_STRING_SCHEMA);
303+
SchemaBuilder mapBuilder = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA);
304+
305+
// Define value schema
306+
org.apache.kafka.connect.data.Schema valueSchema = SchemaBuilder.struct()
307+
.name("SimpleRecord")
308+
.field("pk_id", Schema.INT32_SCHEMA) // PK field, required by definition in Connect schema
309+
.field("f_required", Schema.INT64_SCHEMA)
310+
.field("f_optional", Schema.OPTIONAL_INT64_SCHEMA)
311+
.field("f_array", SchemaBuilder.array(Schema.INT64_SCHEMA))
312+
.field("f_array_opt", SchemaBuilder.array(Schema.INT64_SCHEMA).optional())
313+
.field("f_map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA))
314+
.field("f_map_opt", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).optional())
315+
.field("f_struct", SchemaBuilder.struct()
316+
.field("nested_id", Schema.INT32_SCHEMA)
317+
.field("nested_data", Schema.OPTIONAL_STRING_SCHEMA))
318+
.field("f_struct_opt", SchemaBuilder.struct()
319+
.field("nested_id", Schema.INT32_SCHEMA)
320+
.field("nested_data", Schema.OPTIONAL_STRING_SCHEMA)
321+
.optional())
322+
.build();
323+
// Define key schema
324+
org.apache.kafka.connect.data.Schema keySchema = SchemaBuilder.struct()
325+
.name("SimpleRecordKey")
326+
.field("pk_id", Schema.INT32_SCHEMA) // The PK field
327+
.build();
328+
329+
// Convert to Iceberg schema
330+
StructSchemaConverter converter = new StructSchemaConverter(valueSchema, keySchema, config);
331+
return converter.icebergSchema();
332+
333+
}
334+
262335
// Helper assertion method
263336
private Types.NestedField assertField(org.apache.iceberg.Schema schema, String name, int expectedId, boolean expectedOptional, org.apache.iceberg.types.Type expectedType) {
264337
Types.NestedField field = schema.findField(name);

docs/docs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ When event and key schema information is enabled (`debezium.format.value.schemas
2929
| `debezium.sink.iceberg.destination-regexp-replace` | `` | Replace part of the regexp for modifying destination Iceberg table names |
3030
| `debezium.sink.iceberg.destination-uppercase-table-names` | `false` | Creates uppercase Iceberg table names |
3131
| `debezium.sink.iceberg.destination-lowercase-table-names` | `false` | Creates lowercase Iceberg table names |
32+
| `debezium.sink.iceberg.preserve-required-property` | `false` | When translating schema from source to Iceberg, by default, only primary key columns are defined as required; the rest of the columns become optional. When this is set to `true`, the columns preserve their original required/optional property. |
3233
| `debezium.sink.batch.batch-size-wait` | `NoBatchSizeWait` | Batch size wait strategy: Controls the size of data files and the frequency of uploads. Explained below. |
3334
| `debezium.sink.iceberg.{iceberg.prop.name}` | | [Iceberg config](https://iceberg.apache.org/docs/latest/configuration/) These settings are passed to Iceberg without the `debezium.sink.iceberg.` prefix. |
3435
| `debezium.source.offset.storage` | `io.debezium.server.iceberg.offset.IcebergOffsetBackingStore` | The name of the Java class that is responsible for persistence of connector offsets. see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming). Set to `io.debezium.server.iceberg.offset.IcebergOffsetBackingStore` to use Iceberg table |

0 commit comments

Comments
 (0)