Skip to content

Unordered kafka reader: data loss on cooperative rebalance (MarkCommitRecords after revoke) #4010

@TayPark

Description

@TayPark

We found a data loss bug in the unordered kafka reader during cooperative-sticky rebalancing. This write-up covers the background, the exact mechanism, and a fix (PR #4009).

Background: Why we're here

We run ~20 Kafka-to-S3 pipelines. We were using Vector, but kept hitting two problems:

                      Vector (current)
                      ┌──────────────────────────────────────┐
  Kafka ──────────►   │  adaptive_concurrency                │
  (multi-partition)   │  ┌─────────┐     ┌──────────────┐    │
                      │  │ consume ├────►│ S3 sink      │    │   Problems:
                      │  └─────────┘     │              │    │   1. IRSA token expire → batch drop
                      │                  │ (token=IRSA) │    │   2. adaptive concurrency → OOM
                      │                  └──────────────┘    │
                      └──────────────────────────────────────┘
  1. S3 token expiry: Vector doesn't refresh IRSA tokens well. When the token expires mid-upload, the batch is dropped silently.
  2. OOM: The adaptive concurrency controller sometimes allocates more memory than expected under bursty load, crashing the pod.

So we started a PoC with Redpanda Connect:

                      Redpanda Connect (PoC)
                      ┌──────────────────────────────────────────────┐
  Kafka ──────────►   │  consumer_group + unordered_processing       │
  (multi-partition)   │  ┌─────────────────┐    ┌──────────────┐    │
                      │  │ per-partition    │    │ S3 sink      │    │
                      │  │ input batcher   ├───►│              │    │
                      │  │ (50MB or 1min)  │    │ deterministic│    │
                      │  └─────────────────┘    │ path naming  │    │
                      │                         └──────────────┘    │
                      └──────────────────────────────────────────────┘

Deterministic sink: the key design choice

We configured input batching per partition with deterministic S3 paths so that reprocessing the same offset range always writes to the same file:

input:
  redpanda:
    consumer_group: my-group
    unordered_processing:
      enabled: true
      checkpoint_limit: 1
      batching:
        byte_size: 52428800  # 50 MiB per partition
        period: "1m"

output:
  aws_s3:
    path: '${!meta("kafka_topic")}+${!meta("kafka_partition")}+${!meta("kafka_offset")}.jsonl.zst'
    max_in_flight: 10

This produces files like:

S3 path structure (one file = one batch = one partition):

  my-topic+0+00000042.jsonl.zst    ← partition 0, batch starting at offset 42
  my-topic+0+00001387.jsonl.zst    ← partition 0, next batch at offset 1387
  my-topic+1+00000099.jsonl.zst    ← partition 1, batch starting at offset 99
  ...

  If the same batch is reprocessed, it overwrites the same file → idempotent.

The goal: at-least-once delivery with idempotent writes. A rebalance should either commit the offset after the sink confirms, or not commit it at all (so the new owner reprocesses and overwrites the same file).

The problem: rebalance drops a whole batch

During rebalance testing (HPA scale-up triggering cooperative-sticky rebalance), we observed:

Partition 31 — S3 files and committed offsets:

  ...
  my-topic+31+00045291.jsonl.zst   ← last file before rebalance
                                      (contains offsets 45291..46603)

  ┌──────────────────────────────────────────────────────────────┐
  │  OFFSET GAP: 46604 .. 47891  (1,288 messages)               │
  │                                                              │
  │  committed offset = 47892                                    │
  │  no S3 file exists for this range                            │
  │  these messages are permanently lost                         │
  └──────────────────────────────────────────────────────────────┘

  my-topic+31+00047892.jsonl.zst   ← first file after rebalance
                                      (new consumer starts here)

All 7 revoked partitions showed the same pattern. Per-partition loss was 1,300–1,600 contiguous messages (= one batcher window). Total: 10,433 messages.

Root cause

File: internal/impl/kafka/franz_reader_unordered.go

The race happens because OnPartitionsRevoked returns before in-flight batches complete, and nothing prevents MarkCommitRecords from being called on revoked partitions afterward.

 Timeline         Consumer A                         Kafka broker
 ────────────────────────────────────────────────────────────────────

 T0  normal        sendBatch()
                     Track(r)               ← checkpointer tracks batch
                     batchChan ← batch      ← batch enters pipeline
                     pipeline processing... ← S3 upload in progress

 T1  rebalance     OnPartitionsRevoked(rctx, revoked=[partition 0]):
                     CommitMarkedOffsets()   ← commits what's marked so far
                                               (in-flight batch NOT marked yet)
                     removeTopicPartitions()
                       close(rctx)          ← SoftStop loop, batcher.Close()
                       delete from map      ← partitionTracker gone
                     return                 ─────► broker proceeds with rebalance
                                                    assigns partition 0 to Consumer B

 T2  pipeline      ...still running...
     completes     onAck()
                     releaseFn()            ← checkpointer releases
                     commitFn(record)
                       MarkCommitRecords(r) ← franz-go: NO ownership check!
                                               re-inserts into g.uncommitted

 T3  auto-commit   AutoCommitMarks tick    ─────► broker accepts commit
                     commits offset 47892           (no ownership validation
                     for partition 0                 within session epoch)

 T4  Consumer B    starts at offset 47892  ─────► offsets 46604..47891 = LOST
                   (never sees the gap)

The core issue is at T2→T3. franz-go's MarkCommitRecords doesn't validate partition ownership. It unconditionally writes to g.uncommitted. After revoke() deletes the partition from g.uncommitted, a late MarkCommitRecords call re-inserts it. The next loopCommit tick commits it.

Secondary issue: batcher silent discard

 loop() receives SoftStop signal:

   Before fix                          After fix
   ─────────────────────────           ─────────────────────────
   case <-SoftStopChan():              case <-SoftStopChan():
     return  ← batcher has               batcher.Flush()  ← drain
              3 buffered msgs             sendBatch()      ← deliver
              they are gone               return

When the batcher hasn't hit its count/byte threshold yet (only the period timer would flush it), SoftStop discards whatever is buffered. This is the other half of the data loss.

Fix (PR #4009)

Two changes:

1. Block MarkCommitRecords for revoked partitions

// checkpointTracker gains a revoked set:

type checkpointTracker struct {
    mut     sync.Mutex
    topics  map[string]map[int32]*partitionTracker
    revoked map[string]map[int32]struct{}       // ← NEW
    ...
}

// removeTopicPartitions marks before deleting:

    revoked[topic][partition] = struct{}{}       // mark
    delete(trackedTopic, partition)              // then delete

// commitFn skips revoked partitions:

    commitFn = func(r *kgo.Record) {
        if checkpoints.isRevoked(r.Topic, r.Partition) {
            return                                // ← skip
        }
        cl.MarkCommitRecords(r)
    }

// OnPartitionsAssigned clears the revoked set:

    kgo.OnPartitionsAssigned(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
        checkpoints.clearRevoked(m)              // ← reset for re-assigned partitions
    })

2. Flush batcher on SoftStop

// In loop(), before returning on SoftStop:

    case <-p.shutSig.SoftStopChan():
        if p.batcher != nil {
            p.batcherLock.Lock()
            if batch, _ := p.batcher.Flush(context.Background()); len(batch) > 0 {
                record := p.topBatchRecord
                p.topBatchRecord = nil
                p.batcherLock.Unlock()
                _ = p.sendBatch(context.Background(), batch, record)
            } else {
                p.batcherLock.Unlock()
            }
        }
        return

How they work together

 With the fix applied:

 T0  sendBatch → Track(r) → pipeline processing...

 T1  OnPartitionsRevoked:
       CommitMarkedOffsets()
       removeTopicPartitions()
         SoftStop → batcher.Flush() → sendBatch()  ← buffered msgs delivered
         revoked[partition] = marked                ← partition flagged
         delete from map

 T2  pipeline completes → onAck() → commitFn(r)
       isRevoked(r.Topic, r.Partition) == true
       return (skip MarkCommitRecords)              ← offset NOT advanced

 T3  Consumer B starts at last committed offset
       reprocesses the same range
       writes to same S3 path (deterministic)       ← idempotent, no loss

Version / Environment

  • Redpanda Connect: verified against main at v4.81.0
  • Consumer group protocol: cooperative-sticky
  • Batching: input batching (byte_size or period)
  • Sink: S3 with non-trivial latency

Suggested labels

bug, kafka, inputs

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions