Skip to content

Conversation

@CarlosGamero
Copy link
Collaborator

@CarlosGamero CarlosGamero commented Dec 11, 2025

Summary by CodeRabbit

  • Improvements
    • Enhanced Kafka batch message processing with improved internal stream handling
    • Better error management in stream operations
  • Chores
    • Released patch version 0.8.1
    • Updated Kafka library dependency to ^1.22.0

✏️ Tip: You can customize this high-level summary in your review settings.

@CarlosGamero CarlosGamero self-assigned this Dec 11, 2025
@coderabbitai
Copy link

coderabbitai bot commented Dec 11, 2025

Walkthrough

This PR refactors Kafka consumer batch stream handling from event-based listeners to async iteration, updates the package version to 0.8.1, and bumps the @platformatic/kafka dependency to ^1.22.0. A minor formatting adjustment is also made to a biome-ignore directive.

Changes

Cohort / File(s) Summary
Stream handling refactor
packages/kafka/lib/AbstractKafkaConsumer.ts
Replaces event-driven on('data') and on('error') handlers for batch streams with async iteration via new private method handleSyncStreamBatch. Non-batch path now routes through handleSyncStream. Error handling delegated to handlerError.
Comment formatting
packages/kafka/lib/utils/KafkaMessageBatchStream.ts
Biome-ignore directive adjusted for formatting consistency. No functional or type-related changes.
Dependencies and versioning
packages/kafka/package.json
Version bumped from 0.8.0 to 0.8.1. Dependency @platformatic/kafka updated from ^1.21.0 to ^1.22.0.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~15 minutes

  • AbstractKafkaConsumer.ts: Verify that async iteration with error handling via handlerError is functionally equivalent to the previous on('error') event listener pattern
  • Dependency compatibility: Confirm that @platformatic/kafka ^1.22.0 works correctly with the refactored stream consumption approach

Possibly related PRs

Suggested reviewers

  • kibertoad

Poem

🐰 Hops with glee through async streams,
No more events cluttering dreams!
Iteration flows, so smooth and clean,
The finest refactor I've seen!

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly describes the main functional change: converting from event-based to async iteration for batch processing to control message flow.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch bugfix/try_to_fix_batch_process_oom

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c36b3aa and 7b2b252.

📒 Files selected for processing (3)
  • packages/kafka/lib/AbstractKafkaConsumer.ts (2 hunks)
  • packages/kafka/lib/utils/KafkaMessageBatchStream.ts (1 hunks)
  • packages/kafka/package.json (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
packages/kafka/lib/AbstractKafkaConsumer.ts (2)
packages/kafka/lib/utils/KafkaMessageBatchStream.ts (2)
  • KafkaMessageBatchStream (15-22)
  • KafkaMessageBatchStream (28-113)
packages/kafka/lib/types.ts (2)
  • DeserializedMessage (46-51)
  • SupportedMessageValues (42-44)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
  • GitHub Check: general (22.x, @message-queue-toolkit/outbox-core) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/redis-message-deduplication-store) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/s3-payload-store) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/gcp-pubsub) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/gcs-payload-store) / build
  • GitHub Check: kafka (24.x) / build
  • GitHub Check: kafka (22.x) / build
🔇 Additional comments (4)
packages/kafka/lib/utils/KafkaMessageBatchStream.ts (1)

17-17: LGTM!

Formatting adjustment to the biome-ignore directive is correct and has no functional impact.

packages/kafka/lib/AbstractKafkaConsumer.ts (3)

209-212: Excellent refactoring for backpressure handling.

The shift from event-based listeners to async iteration with await inside the for-await loop ensures that each batch is fully processed before accepting the next one. This directly addresses the OOM concern mentioned in the PR title by preventing message accumulation during slow processing.


227-236: LGTM!

The new handleSyncStreamBatch method correctly implements async iteration over message batches, maintaining consistency with the existing handleSyncStream pattern. The type assertion is appropriate given the schema validation performed in parseMessages.


214-214: Verify if this error handler is still necessary.

With the new async iteration approach (line 211), stream errors should already be caught by the .catch() handler. The direct on('error') listener might now be redundant, or it could handle different error scenarios (e.g., stream errors vs. iteration errors).

Please verify whether this error handler is still needed, or if it could lead to duplicate error handling.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@CarlosGamero CarlosGamero marked this pull request as ready for review December 11, 2025 13:07
@CarlosGamero CarlosGamero merged commit 9c3d302 into main Dec 11, 2025
37 checks passed
@CarlosGamero CarlosGamero deleted the bugfix/try_to_fix_batch_process_oom branch December 11, 2025 13:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants