Skip to content

Conversation

@matus-tomlein
Copy link

@matus-tomlein matus-tomlein commented Jan 8, 2026

Summary

Fixes #643

This PR addresses a race condition where DynamoDB lease table entries for closed Kinesis shards are not cleaned up when Bento pods are terminated during Kubernetes scaling events.

Changes

1. Fix Shutdown Logic (input_kinesis.go:486-507)

Modified the awsKinesisConsumerClosing case to detect finished shards:

case awsKinesisConsumerClosing:
    // If the iterator is empty, the shard is finished and should be deleted
    // rather than checkpointed. This prevents orphaned DynamoDB entries when
    // pods are terminated after Kinesis closes shards but before consumption completes.
    //
    // Note: There's an edge case where if a shard naturally finishes (state becomes
    // awsKinesisConsumerFinished) but the pod is terminated before this defer runs,
    // the state will be changed to awsKinesisConsumerClosing. This means Delete could
    // be called from both code paths, but this is safe because Delete is idempotent.
    if iter == "" {
        // Use context.Background() to ensure the delete completes even during shutdown
        k.checkpointer.Delete(context.Background(), ...)
    } else {
        k.checkpointer.Checkpoint(..., final=true)
    }

What this fixes: When a pod is terminated and the shard iterator is empty (shard closed and fully consumed), the DynamoDB entry is now deleted instead of leaving an orphaned checkpoint.

2. Add Periodic Cleanup (input_kinesis.go:700-746)

Added proactive cleanup in the rebalancing loop (runs every 30s by default) with two phases:

Phase 1: Clean up entries with expired leases

for clientID, claims := range clientClaims {
    for _, claim := range claims {
        if finishedShardIDs[claim.ShardID] {
            isExpired := time.Since(claim.LeaseTimeout) > k.leasePeriod*2
            if isExpired {
                k.checkpointer.Delete(...)
            } else {
                k.log.Debugf("Skipping cleanup of finished shard '%v' from client '%v' (lease still active)", ...)
            }
        }
    }
}

Phase 2: Clean up orphaned entries (no ClientID)

orphanedShards, err := k.checkpointer.OrphanedShards(k.ctx, info.id)
for _, shardID := range orphanedShards {
    if finishedShardIDs[shardID] {
        k.checkpointer.Delete(...)
    }
}

3. New OrphanedShards Method (input_kinesis_checkpointer.go:183-208)

Added method to query DynamoDB entries without ClientID:

func (k *awsKinesisCheckpointer) OrphanedShards(ctx context.Context, streamID string) ([]string, error) {
    scanRes, err := k.svc.Scan(ctx, &dynamodb.ScanInput{
        TableName:        aws.String(k.conf.Table),
        FilterExpression: aws.String("StreamID = :stream_id AND attribute_not_exists(ClientID)"),
        ...
    })
    ...
}

Why needed: The existing AllClaims() function skips entries without ClientID, so a separate query is required to find truly orphaned entries created by Checkpoint(final=true).

Safety

  • k.checkpointer.Delete() is idempotent (safe to call multiple times)
  • DynamoDB DeleteItem succeeds even if the item doesn't exist
  • Lease expiry check prevents interfering with active consumers
  • Detailed debug logging for all cleanup scenarios
  • No data corruption or duplicate processing risk

Testing

  • Existing tests pass
  • Code compiles successfully
  • Changes are backward compatible

Impact

  • Eliminates false positive latency alerts
  • Removes need for manual DynamoDB cleanup
  • Resolves recurring production incidents
  • Enhanced observability with detailed logging

🤖 Generated with Claude Code

Co-Authored-By: Claude noreply@anthropic.com

…ng pod termination

When Kinesis scales down (merges shards) and Bento pods scale down
simultaneously in Kubernetes, DynamoDB lease table entries for closed
shards were not being cleaned up, causing false positive latency alerts.

This fix addresses a race condition where pods could be terminated after
Kinesis closed shards but before consumers finished processing them.

Changes:
- Modified shutdown logic to detect finished shards (empty iterator) and
  delete their DynamoDB entries instead of leaving orphaned checkpoints
- Added periodic cleanup in rebalancing loop to proactively remove
  DynamoDB entries for closed Kinesis shards
- Both Delete operations are idempotent and safe to call multiple times

The rebalancing cleanup (every 30s by default) serves as a safety net
for edge cases and cleans up any existing orphaned entries.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings January 8, 2026 11:03
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request fixes a race condition where DynamoDB lease table entries for closed Kinesis shards are not cleaned up when Bento pods are terminated during Kubernetes scaling events, leading to orphaned entries and false positive latency alerts.

  • Adds conditional cleanup logic during pod shutdown to delete checkpoints for finished shards instead of saving them
  • Implements periodic background cleanup that removes DynamoDB entries for shards that have been closed by Kinesis

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

- Add lease expiry check in periodic cleanup to avoid interfering with
  active consumers still processing final records of closed shards
- Use context.Background() in shutdown Delete to ensure cleanup
  completes even when k.ctx is cancelled during shutdown

Co-authored-by: GitHub Copilot
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Changes based on PR review comments:

1. Added OrphanedShards() method to query DynamoDB entries without ClientID
   - AllClaims() skips entries without ClientID, so the previous check would
     never find truly orphaned entries
   - New method uses DynamoDB filter expression to find orphaned entries

2. Improved cleanup logic and logging
   - Split cleanup into two phases: expired leases and orphaned entries
   - Added debug logging when skipping cleanup due to active lease
   - Better error messages distinguishing between different cleanup scenarios

3. Documented double-deletion edge case
   - Added comment explaining why Delete can be called from multiple paths
   - Clarified that this is safe due to idempotency

These changes ensure that:
- Orphaned entries (from Checkpoint(final=true)) are properly cleaned up
- Active consumers are not interfered with (lease expiry check)
- Debugging is easier with detailed logging
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

aws_kinesis: DynamoDB entries for closed shards not cleaned up during pod termination

1 participant