Skip to content

Commit b727841

Browse files
authored
Merge pull request #301 from lensesio-dev/fix/fix_from_8.1.35
Fix Connect Union struct handling for enhanced schema support
2 parents bfdf5f5 + 10e8bc5 commit b727841

File tree

20 files changed

+341
-68
lines changed

20 files changed

+341
-68
lines changed

kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterfaceTest.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ class AwsS3StorageInterfaceTest
4343
val s3Client = mock[S3Client]
4444
val storageInterface = new AwsS3StorageInterface(mock[ConnectorTaskId], s3Client, batchDelete = false, None)
4545

46-
when(s3Client.headObject(any[HeadObjectRequest])).thenAnswer(HeadObjectResponse.builder().build())
47-
when(s3Client.copyObject(any[CopyObjectRequest])).thenAnswer(CopyObjectResponse.builder().build())
48-
when(s3Client.deleteObject(any[DeleteObjectRequest])).thenAnswer(DeleteObjectResponse.builder().build())
46+
org.mockito.Mockito.doReturn(HeadObjectResponse.builder().build()).when(s3Client).headObject(any[HeadObjectRequest])
47+
org.mockito.Mockito.doReturn(CopyObjectResponse.builder().build()).when(s3Client).copyObject(any[CopyObjectRequest])
48+
org.mockito.Mockito.doReturn(DeleteObjectResponse.builder().build()).when(s3Client).deleteObject(
49+
any[DeleteObjectRequest],
50+
)
4951

5052
val result = storageInterface.mvFile("oldBucket", "oldPath", "newBucket", "newPath", none)
5153

@@ -58,7 +60,7 @@ class AwsS3StorageInterfaceTest
5860
val s3Client = mock[S3Client]
5961
val storageInterface = new AwsS3StorageInterface(mock[ConnectorTaskId], s3Client, batchDelete = false, None)
6062

61-
when(s3Client.copyObject(any[CopyObjectRequest])).thenThrow(new RuntimeException("Copy failed"))
63+
org.mockito.Mockito.doThrow(new RuntimeException("Copy failed")).when(s3Client).copyObject(any[CopyObjectRequest])
6264

6365
val result = storageInterface.mvFile("oldBucket", "oldPath", "newBucket", "newPath", none)
6466

@@ -72,9 +74,11 @@ class AwsS3StorageInterfaceTest
7274
val s3Client = mock[S3Client]
7375
val storageInterface = new AwsS3StorageInterface(mock[ConnectorTaskId], s3Client, batchDelete = false, None)
7476

75-
when(s3Client.headObject(any[HeadObjectRequest])).thenAnswer(HeadObjectResponse.builder().build())
76-
when(s3Client.copyObject(any[CopyObjectRequest])).thenAnswer(CopyObjectResponse.builder().build())
77-
when(s3Client.deleteObject(any[DeleteObjectRequest])).thenThrow(new RuntimeException("Delete failed"))
77+
org.mockito.Mockito.doReturn(HeadObjectResponse.builder().build()).when(s3Client).headObject(any[HeadObjectRequest])
78+
org.mockito.Mockito.doReturn(CopyObjectResponse.builder().build()).when(s3Client).copyObject(any[CopyObjectRequest])
79+
org.mockito.Mockito.doThrow(new RuntimeException("Delete failed")).when(s3Client).deleteObject(
80+
any[DeleteObjectRequest],
81+
)
7882

7983
val result = storageInterface.mvFile("oldBucket", "oldPath", "newBucket", "newPath", none)
8084

@@ -88,9 +92,11 @@ class AwsS3StorageInterfaceTest
8892
val s3Client = mock[S3Client]
8993
val storageInterface = new AwsS3StorageInterface(mock[ConnectorTaskId], s3Client, batchDelete = false, None)
9094

91-
when(s3Client.headObject(any[HeadObjectRequest])).thenThrow(NoSuchKeyException.builder().build())
92-
when(s3Client.copyObject(any[CopyObjectRequest])).thenAnswer(CopyObjectResponse.builder().build())
93-
when(s3Client.deleteObject(any[DeleteObjectRequest])).thenThrow(new RuntimeException("Delete failed"))
95+
org.mockito.Mockito.doThrow(NoSuchKeyException.builder().build()).when(s3Client).headObject(any[HeadObjectRequest])
96+
org.mockito.Mockito.doReturn(CopyObjectResponse.builder().build()).when(s3Client).copyObject(any[CopyObjectRequest])
97+
org.mockito.Mockito.doThrow(new RuntimeException("Delete failed")).when(s3Client).deleteObject(
98+
any[DeleteObjectRequest],
99+
)
94100

95101
val result = storageInterface.mvFile("oldBucket", "oldPath", "newBucket", "newPath", none)
96102

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/AvroStreamReader.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package io.lenses.streamreactor.connect.cloud.common.formats.reader
1717

18-
import io.confluent.connect.avro.AvroData
18+
import io.lenses.streamreactor.connect.avro.AvroDataFactory
1919
import org.apache.avro.file.DataFileStream
2020
import org.apache.avro.generic.GenericDatumReader
2121
import org.apache.avro.generic.GenericRecord
@@ -25,7 +25,7 @@ import java.io.InputStream
2525
import scala.util.Try
2626

2727
class AvroStreamReader(input: InputStream) extends CloudDataIterator[SchemaAndValue] {
28-
private val avroDataConverter = new AvroData(100)
28+
private val avroDataConverter = AvroDataFactory.create()
2929

3030
private val datumReader = new GenericDatumReader[GenericRecord]()
3131

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/ParquetStreamReader.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package io.lenses.streamreactor.connect.cloud.common.formats.reader
1717

1818
import cats.implicits.catsSyntaxEitherId
19-
import io.confluent.connect.avro.AvroData
19+
import io.lenses.streamreactor.connect.avro.AvroDataFactory
2020
import io.lenses.streamreactor.connect.cloud.common.formats.reader.parquet.ParquetSeekableInputStream
2121
import io.lenses.streamreactor.connect.cloud.common.formats.reader.parquet.ParquetStreamingInputFile
2222
import org.apache.avro.generic.GenericRecord
@@ -34,7 +34,7 @@ class ParquetStreamReader(
3434
) extends CloudDataIterator[SchemaAndValue]
3535
with Using {
3636
private val parquetReaderIteratorAdaptor = new ParquetReaderIteratorAdaptor(reader)
37-
private val avroDataConverter = new AvroData(100)
37+
private val avroDataConverter = AvroDataFactory.create()
3838

3939
override def close(): Unit = {
4040
val _ = Try(reader.close())

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToAvroDataConverter.scala

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
*/
1616
package io.lenses.streamreactor.connect.cloud.common.sink.conversion
1717

18-
import io.confluent.connect.avro.AvroData
19-
import io.confluent.connect.avro.AvroDataConfig
20-
import io.confluent.connect.schema.AbstractDataConfig
18+
import io.lenses.streamreactor.connect.avro.AvroDataFactory
2119
import org.apache.avro.Schema
2220
import org.apache.kafka.connect.data.Struct
2321
import org.apache.kafka.connect.data.{ Schema => ConnectSchema }
@@ -39,13 +37,33 @@ import scala.jdk.CollectionConverters.SeqHasAsJava
3937

4038
object ToAvroDataConverter {
4139

42-
private val avroDataConfig = new AvroDataConfig(
43-
Map(
44-
AvroDataConfig.ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG -> "true",
45-
AbstractDataConfig.SCHEMAS_CACHE_SIZE_CONFIG -> "100",
46-
).asJava,
40+
private val avroDataConverter = AvroDataFactory.create(100)
41+
42+
/** Schema name used by Confluent's AvroConverter for union types */
43+
private val ConfluentAvroUnionSchemaName = "io.confluent.connect.avro.Union"
44+
45+
/**
46+
* Mapping from Kafka Connect Schema.Type to Avro Schema.Type.
47+
* Connect and Avro use different naming conventions for their types:
48+
* Connect: INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, STRUCT
49+
* Avro: INT, INT, INT, LONG, FLOAT, DOUBLE, RECORD
50+
* This map is used in the fallback branch of union type matching
51+
* where the primary name-based match has already failed.
52+
*/
53+
private val connectToAvroType: Map[ConnectSchema.Type, Schema.Type] = Map(
54+
ConnectSchema.Type.INT8 -> Schema.Type.INT,
55+
ConnectSchema.Type.INT16 -> Schema.Type.INT,
56+
ConnectSchema.Type.INT32 -> Schema.Type.INT,
57+
ConnectSchema.Type.INT64 -> Schema.Type.LONG,
58+
ConnectSchema.Type.FLOAT32 -> Schema.Type.FLOAT,
59+
ConnectSchema.Type.FLOAT64 -> Schema.Type.DOUBLE,
60+
ConnectSchema.Type.BOOLEAN -> Schema.Type.BOOLEAN,
61+
ConnectSchema.Type.STRING -> Schema.Type.STRING,
62+
ConnectSchema.Type.BYTES -> Schema.Type.BYTES,
63+
ConnectSchema.Type.ARRAY -> Schema.Type.ARRAY,
64+
ConnectSchema.Type.MAP -> Schema.Type.MAP,
65+
ConnectSchema.Type.STRUCT -> Schema.Type.RECORD,
4766
)
48-
private val avroDataConverter = new AvroData(avroDataConfig)
4967

5068
def convertSchema(connectSchema: ConnectSchema): Schema = avroDataConverter.fromConnectSchema(connectSchema)
5169

@@ -246,9 +264,13 @@ object ToAvroDataConverter {
246264
// Convert the value using the matched schema
247265
convertFieldValue(fieldValue, avroSchema)
248266
case None =>
249-
// Fallback: try to find by type match
267+
// Fallback: try to find by type match using the Connect-to-Avro type mapping.
268+
// Connect and Avro use different type names (e.g. INT32 vs INT, INT64 vs LONG,
269+
// FLOAT32 vs FLOAT, FLOAT64 vs DOUBLE, STRUCT vs RECORD), so a direct string
270+
// comparison would fail for these types.
250271
val typeMatchSchema = targetUnionSchema.getTypes.asScala.find { avroType =>
251-
avroType.getType != Schema.Type.NULL && fieldSchema.`type`().getName.toUpperCase == avroType.getType.name()
272+
avroType.getType != Schema.Type.NULL &&
273+
connectToAvroType.get(fieldSchema.`type`()).contains(avroType.getType)
252274
}
253275
typeMatchSchema.map(convertFieldValue(fieldValue, _)).getOrElse(fieldValue)
254276
}
@@ -259,9 +281,6 @@ object ToAvroDataConverter {
259281
}
260282
}
261283

262-
/** Schema name used by Confluent's AvroConverter for union types */
263-
private val ConfluentAvroUnionSchemaName = "io.confluent.connect.avro.Union"
264-
265284
private def convertDateToDaysFromEpoch[A <: Any](value: Date) =
266285
ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), LocalDate.ofInstant(value.toInstant, ZoneId.systemDefault()))
267286

kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/ParquetFormatWriterSchemaEvolutionTest.scala

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
*/
1616
package io.lenses.streamreactor.connect.cloud.common.formats.writer
1717

18-
import io.confluent.connect.avro.AvroData
19-
import io.confluent.connect.avro.AvroDataConfig
18+
import io.lenses.streamreactor.connect.avro.AvroDataFactory
2019
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
2120
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
2221
import io.lenses.streamreactor.connect.cloud.common.model.Offset
@@ -358,15 +357,8 @@ class ParquetFormatWriterSchemaEvolutionTest extends AnyFlatSpec with Matchers w
358357
val avroSchemaV2 = parser.parse(avroSchemaV2Json)
359358
val avroSchemaV3 = new Parser().parse(avroSchemaV3Json) // Use new parser to avoid cache
360359

361-
// Create AvroData converters (like Schema Registry does)
362-
// Each deserialization in production creates a new AvroData instance or uses cached one
363-
val avroDataConfig = new AvroDataConfig(Map(
364-
"enhanced.avro.schema.support" -> "true",
365-
"schemas.cache.config" -> "100",
366-
).asJava)
367-
368-
val avroDataV3 = new AvroData(avroDataConfig)
369-
val avroDataV2 = new AvroData(avroDataConfig) // Separate instance to simulate different deserializations
360+
val avroDataV3 = AvroDataFactory.create()
361+
val avroDataV2 = AvroDataFactory.create() // Separate instance to simulate different deserialization
370362

371363
// Convert Avro schemas to Connect schemas (this is what Schema Registry does)
372364
val connectSchemaV3 = avroDataV3.toConnectSchema(avroSchemaV3)

0 commit comments

Comments
 (0)