-
Notifications
You must be signed in to change notification settings - Fork 341
Open
Description
We encountered an issue when exporting Kafka messages to MinIO using Parquet or Avro formats. The connector fails during the write process.
Steps to Reproduce:
- Set up Kafka + Kafka Connect with MinIO as S3-compatible storage.
- Use Confluent S3 Sink Connector (https://www.confluent.io/hub/confluentinc/kafka-connect-s3) with
format.classset to eitherio.confluent.connect.s3.format.avro.AvroFormat. - Send messages from Kafka topic to MinIO.
- Observe connector errors or failure to write output files.
Expected Behavior:
Kafka messages should be successfully written to MinIO in Avro format without errors.
Actual Behavior:
Connector throws an error and no files are written, or partial/corrupt files are generated in MinIO.
Kafka compose file:
version: '3.8'
name : kafka
services:
zookeeper:
image: docker.io/confluentinc/cp-zookeeper:7.2.1
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka-engine:
image: docker.io/confluentinc/cp-kafka:7.2.1
container_name: kafka-engine
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
TZ: ${TIMEZONE}
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-engine:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_LOG_RETENTION_MS: 21600000
debezium:
image: docker.io/debezium/connect:2.5
container_name: debezium
depends_on:
- kafka-engine
ports:
- "8085:8083"
environment:
TZ: ${TIMEZONE}
BOOTSTRAP_SERVERS: kafka-engine:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect-configs
OFFSET_STORAGE_TOPIC: connect-offsets
STATUS_STORAGE_TOPIC: connect-status
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_PLUGIN_PATH: /kafka/connect
AWS_ACCESS_KEY_ID: ${MINIO_ACCESS_KEY}
AWS_SECRET_ACCESS_KEY: ${MINIO_SECRET_KEY}
volumes:
- ${KAFKA_PROJECT_DIR}/plugins/confluent-connect-jdbc/lib:/kafka/connect/jdbc:z
- ${KAFKA_PROJECT_DIR}/plugins/confluent-connect-json-schema-converter/lib:/kafka/connect/json-schema:z
- ${KAFKA_PROJECT_DIR}/plugins/confluent-connect-avro-converter/lib:/kafka/connect/avro:z
- ${KAFKA_PROJECT_DIR}/plugins/confluent-kafka-connect-s3/lib:/kafka/connect/s3:z
- ${KAFKA_PROJECT_DIR}/connector:/kafka/connector:z
- ${KAFKA_PROJECT_DIR}/bash:/kafka/bash:z
kafka-ui:
image: docker.io/provectuslabs/kafka-ui:v0.7.2
container_name: kafka-ui
ports:
- "8090:8090"
depends_on:
- kafka-engine
- zookeeper
environment:
TZ: ${TIMEZONE}
SERVER_PORT: 8090
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-engine:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
AUTH_TYPE: "LOGIN_FORM"
SPRING_SECURITY_USER_NAME: ${KAFKA_UI_USER}
SPRING_SECURITY_USER_PASSWORD: ${KAFKA_UI_PASSWORD}
schema-registry:
image: docker.io/confluentinc/cp-schema-registry:7.9.2
container_name: schema-registry
depends_on:
- kafka-engine
ports:
- "8087:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-engine:9092
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
Connector Configuration Snippet:
{
"name": "minio_sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "timit_source_live.public.instansi",
"s3.bucket.name": "warehouse",
"s3.part.size": 5242880,
"s3.region": "us-east-1",
"s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"aws.access.key.id": "admin",
"aws.secret.access.key": "admin123",
"s3.endpoint": "http://192.168.0.112:9000",
"store.url": "http://192.168.0.112:9000",
"flush.size": 3,
"rotate.interval.ms": 10000,
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"enhanced.avro.schema.support": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"schema.compatibility": "NONE",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"topics.dir": "topics"
}
}
Environment:
- Kafka version:
7.2.1 - Confluent kafka connect S3 version:
10.6.4 - Confluent kafka connect avro converter version:
7.9.2 - Schema Registry version:
7.9.2 - MinIO version:
RELEASE.2025-07-23T15-54-02 - Format: Avro
Error Logs:
org.apache.kafka.connect.errors.ConnectException: Reflection exception:
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at io.confluent.connect.s3.S3SinkTask.newFormat(S3SinkTask.java:170)
at io.confluent.connect.s3.S3SinkTask.newRecordWriterProvider(S3SinkTask.java:179)
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:121)
... 9 more
Caused by: java.lang.NoClassDefFoundError: io/confluent/kafka/schemaregistry/utils/BoundedConcurrentHashMap
at io.confluent.connect.avro.AvroData.<init>(AvroData.java:347)
at io.confluent.connect.s3.format.avro.AvroFormat.<init>(AvroFormat.java:34)
... 16 more
Additional Context:
Writing messages as JSON works fine. The error only occurs when switching to Avro formats. Possibly related to format writer compatibility with MinIO.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels