Skip to content

NPE when switching to Cooperative Sticky protocol in Kafka connect #872

@tandonraghav

Description

@tandonraghav

Hi Guys,

We are trying to switch over to Cooperative Sticky protocol for connect framework and see the advantages of this in K8s.
But while experimenting when multiple pod disruption happens few of the pods are going into Exception and not recovering.

We started seeing this when added 2 configs in Connect config

connect.protocol: compatible
consumer.partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor

Using - confluentinc-kafka-connect-s3-11.0.0

I feel this might be related to #480, but need confirmation if this would be fixed?

not recover until manually restarted. Error: Cannot invoke "io.confluent.connect.s3.TopicPartitionWriter.buffer(org.apache.kafka.connect.sink.SinkRecord)" because the return value of "java.util.Map.get(Object)" is
null (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-kafka-s3-connector-logs.vectorc3-v1-1]
java.lang.NullPointerException: Cannot invoke "io.confluent.connect.s3.TopicPartitionWriter.buffer(org.apache.kafka.connect.sink.SinkRecord)" because the return value of "java.util.Map.get(Object)" is null
    at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:243)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
2025-09-10 09:38:31,448 ERROR [kafka-s3-connector-logs.vectorc3-v1|task-1] WorkerSinkTask{id=kafka-s3-connector-logs.vectorc3-v1-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will
not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-kafka-s3-connector-logs.vectorc3-v1-1]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.NullPointerException: Cannot invoke "io.confluent.connect.s3.TopicPartitionWriter.buffer(org.apache.kafka.connect.sink.SinkRecord)" because the return value of "java.util.Map.get(Object)" is nu
ll
    at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:243)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
    ... 11 more

Connect configs

    config.storage.topic: connect.s3.core-vector.configs.v3
    connect.protocol: compatible
    consumer.auto.offset.reset: latest
    consumer.partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor
    group.id: connect-s3-core-vector-cluster
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    offset.flush.interval.ms: 10000
    offset.storage.partitions: 3
    offset.storage.replication.factor: 3
    offset.storage.topic: connect.s3.core-vector.offset.v3
    producer.batch.size: 262144
    producer.compression.type: zstd
    producer.linger.ms: 500
    producer.max.request.size: 2097152
    scheduled.rebalance.max.delay.ms: 600000
    status.storage.partitions: 3
    status.storage.replication.factor: 3
    status.storage.topic: connect.s3.core-vector.status.v3
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false

Connector configuration

connector.class: io.confluent.connect.s3.S3SinkConnector
    errors.deadletterqueue.context.headers.enable: true
    errors.deadletterqueue.topic.name: deadletterqueue.connect.s3
    errors.log.enable: true
    errors.log.include.messages: true
    errors.tolerance: all
    file.delim: '-'
    flush.size: 100000
    format.class: io.confluent.connect.s3.format.parquet.ParquetFormat
    key.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable: false
    locale: en-US
    name: kafka-s3-connector-logs.vectorc3-v1
    parquet.codec: gzip
    partition.duration.ms: 600000
    partition.fields: ns
    partitioner.class: >-
      com.dataplatform.connect.partitioner.ParquetFieldTimeBasedPartitioner
    path.format: '''day''=YYYY-MM-dd/''hour''=HH'
    rotate.schedule.interval.ms: 600000
    s3.bucket.name: xxxx
    s3.elastic.buffer.enable: true
    s3.part.size: 25242880
    s3.region: ap-south-1
    storage.class: io.confluent.connect.s3.storage.S3Storage
    timestamp.extractor: Wallclock
    timezone: UTC
    topics: logs.vectorc3
    topics.dir: ephemeral/kafka_backup/kubernetes-logs
    transforms: AddTS,AddPartition,AddOffset,Cast,ExtractK8sNamespace,ParquetConverter
    transforms.AddOffset.offset.field: kafka_offset
    transforms.AddOffset.type: org.apache.kafka.connect.transforms.InsertField$Value
    transforms.AddPartition.partition.field: kafka_partition
    transforms.AddPartition.type: org.apache.kafka.connect.transforms.InsertField$Value
    transforms.AddTS.timestamp.field: kafka_timestamp
    transforms.AddTS.type: org.apache.kafka.connect.transforms.InsertField$Value
    transforms.Cast.spec: kafka_timestamp:string,kafka_partition:string,kafka_offset:string
    transforms.Cast.type: org.apache.kafka.connect.transforms.Cast$Value
    transforms.ExtractK8sNamespace.nested.fields.name: ns:kubernetes.pod_namespace
    transforms.ExtractK8sNamespace.type: com.dataplatform.connect.transform.ExtractNestedField$Value
    transforms.ParquetConverter.partitions.field.name: ns,kafka_timestamp,kafka_partition,kafka_offset
    transforms.ParquetConverter.type: com.dataplatform.connect.transform.ParqueConverter$Value
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false

Please let me know if any furthur info is required on this.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions