-
Notifications
You must be signed in to change notification settings - Fork 7
Stop stream accepting messages when consuming in process #376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes implement backpressure management for Kafka consumer batch processing by pausing the consumer stream before processing each batch and resuming it afterward. A unit test is added to verify the pause/resume behavior and correct invocation of the consume method. Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/kafka/lib/AbstractKafkaConsumer.ts (1)
227-238: Critical: Stream will remain paused if consume throws an error.If
consumethrows an exception (line 232-235), theresume()call on line 236 will never execute, leaving the consumer stream permanently paused. This will halt all message processing until the application restarts.Apply this diff to ensure the stream is always resumed:
private async handleSyncStreamBatch( stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>, ): Promise<void> { for await (const messageBatch of stream) { this.consumerStream?.pause() - await this.consume( - messageBatch.topic, - messageBatch.messages as DeserializedMessage<SupportedMessageValues<TopicsConfig>>, - ) - this.consumerStream?.resume() + try { + await this.consume( + messageBatch.topic, + messageBatch.messages as DeserializedMessage<SupportedMessageValues<TopicsConfig>>, + ) + } finally { + this.consumerStream?.resume() + } } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/kafka/lib/AbstractKafkaConsumer.spec.ts(1 hunks)packages/kafka/lib/AbstractKafkaConsumer.ts(1 hunks)packages/kafka/package.json(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
packages/kafka/lib/AbstractKafkaConsumer.ts (1)
packages/kafka/lib/types.ts (2)
DeserializedMessage(46-51)SupportedMessageValues(42-44)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: general (22.x, @message-queue-toolkit/schemas) / build
- GitHub Check: general (20.x, @message-queue-toolkit/s3-payload-store) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (22.x, @message-queue-toolkit/outbox-core) / build
- GitHub Check: general (20.x, @message-queue-toolkit/outbox-core) / build
- GitHub Check: general (24.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (20.x, @message-queue-toolkit/metrics) / build
- GitHub Check: general (20.x, @message-queue-toolkit/gcs-payload-store) / build
- GitHub Check: general (22.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (22.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (20.x, @message-queue-toolkit/redis-message-deduplication-store) / build
- GitHub Check: kafka (22.x) / build
- GitHub Check: kafka (24.x) / build
🔇 Additional comments (1)
packages/kafka/package.json (1)
3-3: LGTM! Appropriate version bump.The patch version increment is suitable for the backpressure management feature being introduced.
Summary by CodeRabbit
Release Notes
Bug Fixes
Tests
✏️ Tip: You can customize this high-level summary in your review settings.