Skip to content

Commit 22f68fe

Browse files
andmariosclaude
andcommitted
fix: Add decimal logical type handling in ToAvroDataConverter
Added case for 'decimal' logical type in convertFieldValue method to properly convert java.math.BigDecimal values to ByteBuffer format expected by Avro. This fixes the failing AvroFormatWriterTest 'should write decimal data from the header' test. (cherry-picked from c6de95d) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 5df23bf commit 22f68fe

File tree

6 files changed

+251
-194
lines changed

6 files changed

+251
-194
lines changed

kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskParquetSchemaOptimizationTest.scala

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ import scala.jdk.CollectionConverters.MapHasAsJava
3737
import scala.jdk.CollectionConverters.SeqHasAsJava
3838

3939
/**
40-
* Integration test for Parquet format with schema optimization enabled.
41-
*
42-
* This test verifies that the fix for ArrayIndexOutOfBoundsException works correctly
43-
* when latest.schema.optimization.enabled is true and records with different schema
44-
* versions are interleaved, requiring adaptation to the latest schema.
45-
*/
40+
* Integration test for Parquet format with schema optimization enabled.
41+
*
42+
* This test verifies that the fix for ArrayIndexOutOfBoundsException works correctly
43+
* when latest.schema.optimization.enabled is true and records with different schema
44+
* versions are interleaved, requiring adaptation to the latest schema.
45+
*/
4646
class S3SinkTaskParquetSchemaOptimizationTest
4747
extends AnyFlatSpec
4848
with Matchers
@@ -144,7 +144,10 @@ class S3SinkTaskParquetSchemaOptimizationTest
144144

145145
// Record 4: V3 schema (introduces address)
146146
val address4 = new Struct(addressSchema).put("street", "123 Main St").put("city", "Seattle").put("zipCode", "98101")
147-
val struct4 = new Struct(schemaV3).put("name", "Diana").put("age", 28).put("email", "diana@example.com").put("address", address4)
147+
val struct4 =
148+
new Struct(schemaV3).put("name", "Diana").put("age", 28).put("email", "diana@example.com").put("address",
149+
address4,
150+
)
148151

149152
// Record 5: V2 schema again (should be adapted to latest V3)
150153
val struct5 = new Struct(schemaV2).put("name", "Eve").put("age", 32).put("email", "eve@example.com")
@@ -194,7 +197,7 @@ class S3SinkTaskParquetSchemaOptimizationTest
194197
rec3.get("address") should be(null)
195198

196199
// Record 4: V3 (full schema)
197-
val rec4 = genericRecords(3)
200+
val rec4 = genericRecords(3)
198201
val address4Rec = rec4.get("address").asInstanceOf[GenericRecord]
199202
rec4.get("name").toString should be("Diana")
200203
rec4.get("age") should be(28)
@@ -273,15 +276,21 @@ class S3SinkTaskParquetSchemaOptimizationTest
273276
val struct1 = new Struct(orderSchemaV1).put("orderId", "ORD-001").put("amount", 100.50).put("metadata", meta1)
274277

275278
// Record 2: V2 schema (introduces offset and timestamp in metadata)
276-
val meta2 = new Struct(metadataSchemaV2).put("topic", TopicName).put("partition", 1).put("offset", 2L).put("timestamp", 20002L)
279+
val meta2 =
280+
new Struct(metadataSchemaV2).put("topic", TopicName).put("partition", 1).put("offset", 2L).put("timestamp",
281+
20002L,
282+
)
277283
val struct2 = new Struct(orderSchemaV2).put("orderId", "ORD-002").put("amount", 250.75).put("metadata", meta2)
278284

279285
// Record 3: V1 schema again (should be adapted to V2)
280286
val meta3 = new Struct(metadataSchemaV1).put("topic", TopicName).put("partition", 1)
281287
val struct3 = new Struct(orderSchemaV1).put("orderId", "ORD-003").put("amount", 75.00).put("metadata", meta3)
282288

283289
// Record 4: V2 schema
284-
val meta4 = new Struct(metadataSchemaV2).put("topic", TopicName).put("partition", 1).put("offset", 4L).put("timestamp", 20004L)
290+
val meta4 =
291+
new Struct(metadataSchemaV2).put("topic", TopicName).put("partition", 1).put("offset", 4L).put("timestamp",
292+
20004L,
293+
)
285294
val struct4 = new Struct(orderSchemaV2).put("orderId", "ORD-004").put("amount", 500.00).put("metadata", meta4)
286295

287296
val record1 = toSinkRecord(struct1, TopicName, 1, 1L, 20001L)
@@ -301,7 +310,7 @@ class S3SinkTaskParquetSchemaOptimizationTest
301310
genericRecords.size should be(4)
302311

303312
// Record 1: V1 -> V2 (metadata.offset and metadata.timestamp should be null)
304-
val rec1 = genericRecords.head
313+
val rec1 = genericRecords.head
305314
val metaRec1 = rec1.get("metadata").asInstanceOf[GenericRecord]
306315
rec1.get("orderId").toString should be("ORD-001")
307316
rec1.get("amount") should be(100.50)
@@ -311,7 +320,7 @@ class S3SinkTaskParquetSchemaOptimizationTest
311320
metaRec1.get("timestamp") should be(null)
312321

313322
// Record 2: V2 (full metadata)
314-
val rec2 = genericRecords(1)
323+
val rec2 = genericRecords(1)
315324
val metaRec2 = rec2.get("metadata").asInstanceOf[GenericRecord]
316325
rec2.get("orderId").toString should be("ORD-002")
317326
rec2.get("amount") should be(250.75)
@@ -321,7 +330,7 @@ class S3SinkTaskParquetSchemaOptimizationTest
321330
metaRec2.get("timestamp") should be(20002L)
322331

323332
// Record 3: V1 -> V2
324-
val rec3 = genericRecords(2)
333+
val rec3 = genericRecords(2)
325334
val metaRec3 = rec3.get("metadata").asInstanceOf[GenericRecord]
326335
rec3.get("orderId").toString should be("ORD-003")
327336
rec3.get("amount") should be(75.00)
@@ -331,7 +340,7 @@ class S3SinkTaskParquetSchemaOptimizationTest
331340
metaRec3.get("timestamp") should be(null)
332341

333342
// Record 4: V2
334-
val rec4 = genericRecords(3)
343+
val rec4 = genericRecords(3)
335344
val metaRec4 = rec4.get("metadata").asInstanceOf[GenericRecord]
336345
rec4.get("orderId").toString should be("ORD-004")
337346
rec4.get("amount") should be(500.00)
@@ -342,4 +351,3 @@ class S3SinkTaskParquetSchemaOptimizationTest
342351
}
343352

344353
}
345-

kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskParquetV3ThenV2SchemaOptimizationTest.scala

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,22 @@ import scala.jdk.CollectionConverters.MapHasAsJava
3737
import scala.jdk.CollectionConverters.SeqHasAsJava
3838

3939
/**
40-
* Integration test that reproduces the VersionSchemaChangeDetector bug.
41-
*
42-
* This test sends records with schema v3 first, then v2.
43-
* When processed with schema.change.detector=version (without the fix),
44-
* it would fail because:
45-
* - v3 written first, file initialized with v3 schema
46-
* - v2 arrives, detector checks: 2 > 3 = False (no schema change detected)
47-
* - v2 record written to v3 file → ArrayIndexOutOfBoundsException
48-
*
49-
* With latest.schema.optimization.enabled=true, records should be adapted
50-
* to the latest schema seen (v3) and written correctly.
51-
*
52-
* Based on the Python test producer script that reproduces the bug with:
53-
* - DslSDPEvent schema v2 (without businessUnit field)
54-
* - DslSDPEvent schema v3 (with businessUnit field)
55-
*/
40+
* Integration test that reproduces the VersionSchemaChangeDetector bug.
41+
*
42+
* This test sends records with schema v3 first, then v2.
43+
* When processed with schema.change.detector=version (without the fix),
44+
* it would fail because:
45+
* - v3 written first, file initialized with v3 schema
46+
* - v2 arrives, detector checks: 2 > 3 = False (no schema change detected)
47+
* - v2 record written to v3 file → ArrayIndexOutOfBoundsException
48+
*
49+
* With latest.schema.optimization.enabled=true, records should be adapted
50+
* to the latest schema seen (v3) and written correctly.
51+
*
52+
* Based on the Python test producer script that reproduces the bug with:
53+
* - DslSDPEvent schema v2 (without businessUnit field)
54+
* - DslSDPEvent schema v3 (with businessUnit field)
55+
*/
5656
class S3SinkTaskParquetV3ThenV2SchemaOptimizationTest
5757
extends AnyFlatSpec
5858
with Matchers
@@ -209,23 +209,23 @@ class S3SinkTaskParquetV3ThenV2SchemaOptimizationTest
209209
}
210210

211211
/**
212-
* Test that reproduces the exact bug scenario:
213-
* - Send 5 v3 records first (with businessUnit field)
214-
* - Then send 5 v2 records (without businessUnit field)
215-
*
216-
* Without the fix, this would cause ArrayIndexOutOfBoundsException because:
217-
* - VersionSchemaChangeDetector only checks newVersion > oldVersion
218-
* - 2 > 3 is False, so no schema change is detected
219-
* - v2 record gets written to v3-initialized Parquet file
220-
*
221-
* With latest.schema.optimization.enabled=true, all records should be
222-
* adapted to v3 schema and written correctly.
223-
*/
212+
* Test that reproduces the exact bug scenario:
213+
* - Send 5 v3 records first (with businessUnit field)
214+
* - Then send 5 v2 records (without businessUnit field)
215+
*
216+
* Without the fix, this would cause ArrayIndexOutOfBoundsException because:
217+
* - VersionSchemaChangeDetector only checks newVersion > oldVersion
218+
* - 2 > 3 is False, so no schema change is detected
219+
* - v2 record gets written to v3-initialized Parquet file
220+
*
221+
* With latest.schema.optimization.enabled=true, all records should be
222+
* adapted to v3 schema and written correctly.
223+
*/
224224
"S3SinkTask" should "handle v3 then v2 schema sequence with parquet and schema optimization" in {
225225
val props = (
226226
defaultProps ++
227227
Map(
228-
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS PARQUET PROPERTIES('padding.length.partition'='12','padding.length.offset'='12','${FlushCount.entryName}'=10)",
228+
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS PARQUET PROPERTIES('padding.length.partition'='12','padding.length.offset'='12','${FlushCount.entryName}'=10)",
229229
"connect.s3.latest.schema.optimization.enabled" -> "true",
230230
)
231231
).asJava
@@ -292,14 +292,14 @@ class S3SinkTaskParquetV3ThenV2SchemaOptimizationTest
292292
}
293293

294294
/**
295-
* Test with interleaved v3 and v2 records to ensure consistent handling.
296-
* Pattern: v3, v2, v3, v2, v3, v2, v3, v2
297-
*/
295+
* Test with interleaved v3 and v2 records to ensure consistent handling.
296+
* Pattern: v3, v2, v3, v2, v3, v2, v3, v2
297+
*/
298298
"S3SinkTask" should "handle interleaved v3 and v2 records with parquet and schema optimization" in {
299299
val props = (
300300
defaultProps ++
301301
Map(
302-
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS PARQUET PROPERTIES('padding.length.partition'='12','padding.length.offset'='12','${FlushCount.entryName}'=8)",
302+
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS PARQUET PROPERTIES('padding.length.partition'='12','padding.length.offset'='12','${FlushCount.entryName}'=8)",
303303
"connect.s3.latest.schema.optimization.enabled" -> "true",
304304
)
305305
).asJava
@@ -357,14 +357,14 @@ class S3SinkTaskParquetV3ThenV2SchemaOptimizationTest
357357
}
358358

359359
/**
360-
* Test multiple batches: first batch v3, second batch v2, simulating
361-
* the real-world scenario where consumers process in batches.
362-
*/
360+
* Test multiple batches: first batch v3, second batch v2, simulating
361+
* the real-world scenario where consumers process in batches.
362+
*/
363363
"S3SinkTask" should "handle multiple batches with v3 first then v2 batch" in {
364364
val props = (
365365
defaultProps ++
366366
Map(
367-
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS PARQUET PROPERTIES('padding.length.partition'='12','padding.length.offset'='12','${FlushCount.entryName}'=6)",
367+
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS PARQUET PROPERTIES('padding.length.partition'='12','padding.length.offset'='12','${FlushCount.entryName}'=6)",
368368
"connect.s3.latest.schema.optimization.enabled" -> "true",
369369
)
370370
).asJava

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToAvroDataConverter.scala

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2025 Lenses.io Ltd
2+
* Copyright 2017-2026 Lenses.io Ltd
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,26 +56,26 @@ object ToAvroDataConverter {
5656
}
5757

5858
/**
59-
* Converts SinkData to an Avro GenericRecord using a target Avro schema.
60-
* This ensures the GenericRecord uses the exact schema object provided,
61-
* which is critical for Parquet/Avro writers that rely on schema identity
62-
* for field position lookups.
63-
*
64-
* @param sinkData The SinkData to convert
65-
* @param targetSchema The target Avro schema to use for the GenericRecord
66-
* @return The converted value (GenericRecord for structs, primitive values otherwise)
67-
*/
59+
* Converts SinkData to an Avro GenericRecord using a target Avro schema.
60+
* This ensures the GenericRecord uses the exact schema object provided,
61+
* which is critical for Parquet/Avro writers that rely on schema identity
62+
* for field position lookups.
63+
*
64+
* @param sinkData The SinkData to convert
65+
* @param targetSchema The target Avro schema to use for the GenericRecord
66+
* @return The converted value (GenericRecord for structs, primitive values otherwise)
67+
*/
6868
def convertToGenericRecordWithSchema(sinkData: SinkData, targetSchema: Schema): Any =
6969
sinkData match {
7070
case StructSinkData(structVal) => convertStructToGenericRecord(structVal, targetSchema)
7171
case other => convertNonStructSinkData(other)
7272
}
7373

7474
/**
75-
* Common handler for non-struct SinkData types.
76-
* This consolidates the conversion logic for all SinkData variants except StructSinkData,
77-
* which requires different handling depending on whether a target schema is provided.
78-
*/
75+
* Common handler for non-struct SinkData types.
76+
* This consolidates the conversion logic for all SinkData variants except StructSinkData,
77+
* which requires different handling depending on whether a target schema is provided.
78+
*/
7979
private def convertNonStructSinkData(sinkData: SinkData): Any =
8080
sinkData match {
8181
case MapSinkData(map, _) => convert(map)
@@ -90,9 +90,9 @@ object ToAvroDataConverter {
9090
}
9191

9292
/**
93-
* Converts a Connect Struct to an Avro GenericRecord using the specified target schema.
94-
* This handles nested structures recursively.
95-
*/
93+
* Converts a Connect Struct to an Avro GenericRecord using the specified target schema.
94+
* This handles nested structures recursively.
95+
*/
9696
private def convertStructToGenericRecord(struct: Struct, targetSchema: Schema): GenericRecord = {
9797
val record = new GenericData.Record(targetSchema)
9898
targetSchema.getFields.asScala.foreach { avroField =>
@@ -111,9 +111,9 @@ object ToAvroDataConverter {
111111
}
112112

113113
/**
114-
* Converts a Connect value to an Avro value using the target Avro schema.
115-
* Handles logical types (Date, Time, Timestamp) and nested structures.
116-
*/
114+
* Converts a Connect value to an Avro value using the target Avro schema.
115+
* Handles logical types (Date, Time, Timestamp) and nested structures.
116+
*/
117117
private def convertFieldValue(value: Any, targetSchema: Schema): Any =
118118
if (value == null) {
119119
null
@@ -145,15 +145,21 @@ object ToAvroDataConverter {
145145
case d: Date => d.getTime * 1000L
146146
case other => other
147147
}
148+
case Some("decimal") =>
149+
value match {
150+
case bd: java.math.BigDecimal =>
151+
ByteBuffer.wrap(bd.unscaledValue().toByteArray)
152+
case other => other
153+
}
148154
case _ =>
149155
// No logical type or unhandled logical type - convert based on physical schema type
150156
convertBySchemaType(value, targetSchema)
151157
}
152158
}
153159

154160
/**
155-
* Converts a value based on the physical Avro schema type.
156-
*/
161+
* Converts a value based on the physical Avro schema type.
162+
*/
157163
private def convertBySchemaType(value: Any, targetSchema: Schema): Any =
158164
targetSchema.getType match {
159165
case Schema.Type.RECORD =>

0 commit comments

Comments
 (0)