-
Notifications
You must be signed in to change notification settings - Fork 7
feat: Kafka improving memory handling #393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughReplaces the previous batch-iteration path in AbstractKafkaConsumer with a Transform-based KafkaMessageBatchStream that delivers batches via a callback; stream initialization now routes consumerStream errors to handlerError and non-batch mode uses handleSyncStream explicitly. Changes
Sequence Diagram(s)sequenceDiagram
participant Consumer as AbstractKafkaConsumer
participant Stream as consumerStream
participant Batch as KafkaMessageBatchStream
participant Handler as consume()/handlerError
Consumer->>Stream: initialize and attach error listener
Stream->>Batch: pipe messages (object stream)
alt batch mode configured
Batch->>Consumer: invoke onBatch(batch)
Consumer->>Handler: consume(batch)
Handler-->>Consumer: success / throws
Consumer->>Handler: on error -> handlerError
else non-batch mode
Stream->>Consumer: handleSyncStream(async iterator)
Consumer->>Handler: consume(message)
Handler-->>Consumer: success / throws
Consumer->>Handler: on error -> handlerError
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| this.batchTimeoutPerTopicPartition[key] = setTimeout( | ||
| () => this.flushCurrentBatchMessages(message.topic, message.partition), | ||
| this.timeout, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could still encounter backpressure issues due to timeout handling, but I am not really sure how to address this properly. In most cases, I expect we’ll hit the message limit before the timeout triggers, though I’m not completely certain. Let’s monitor how it behaves in practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I just had an idea to try to improve/fix this. Will try to implement it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe I’ve fixed the issue, and the timeout flow now respects backpressure as well, preventing potential memory issues. Please review it carefully 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So right now when timeout passes, instead of flushing messages we add it to dictionary and it is processed on next message - is that correct or I'm missing something? I'm wondering if it can cause a lag if there is a bigger time between messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not correct. Let me explain
This is the piece of code we are referring to:
this.batchTimeoutPerTopicPartition[key] = setTimeout(
() =>
this.timeoutProcessingPromises.set(
key,
this.flushCurrentBatchMessages(message.topic, message.partition),
),
this.timeout,
)
When a timeout occurs, we call flushCurrentBatchMessages without awaiting it, so it returns a promise. This promise is stored in the timeoutProcessingPromises map. When the next message arrives, we check whether there are any pending promises and await them before allowing the message to continue processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, true, I missed the fact we are already triggering flushing, just now awaiting it. All good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In @packages/kafka/lib/utils/KafkaMessageBatchStream.ts:
- Around line 101-108: splitTopicPartitionKey breaks when topic names contain
':' because it simply splits on every colon; change it to find the last ':' (use
lastIndexOf) and split into topic = key.slice(0, idx) and partitionStr =
key.slice(idx + 1), then parseInt partitionStr base 10 and validate (throw if
idx < 0 or parsed partition is NaN). Update getTopicPartitionKey stays the same
but ensure splitTopicPartitionKey robustly handles topic names with colons and
provides clear error messages referencing getTopicPartitionKey generated keys.
- Around line 93-97: The flushCurrentBatchMessages path in
KafkaMessageBatchStream can call this.onBatch with an empty messages array when
a partition was already flushed; update flushCurrentBatchMessages to read const
messages = this.currentBatchPerTopicPartition[key] ?? [] and only call await
this.onBatch({ topic, partition, messages }) if messages.length > 0, then still
clear the slot (set this.currentBatchPerTopicPartition[key] = [] or delete the
key) to avoid duplicate deliveries and retain existing semantics.
- Around line 62-65: The setTimeout callback stores an async fire-and-forget
call to flushCurrentBatchMessages which can produce unhandled rejections and
allow concurrent flushes against the same topic/partition; update the setTimeout
handler to call an async wrapper that catches and logs errors from
flushCurrentBatchMessages/onBatch, and serialize/track in-flight flushes for a
given key (e.g., maintain an inFlightFlushes map keyed by
`${topic}:${partition}` that awaits existing Promise before running the next
flush or chains the promise) so _transform and timeout-triggered flushes do not
run concurrently for the same partition.
🧹 Nitpick comments (2)
packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts (2)
77-80: Timing-dependent test may be flaky.The test waits 150ms for a 100ms timeout to trigger. While this provides 50% buffer, under CI load it could occasionally fail. Consider increasing the margin or using a more deterministic approach like calling
end()on the stream and awaiting its finish.
1-228: Consider adding test coverage for edge cases.The current tests cover the happy path well. Consider adding tests for:
- Calling
end()on the stream and verifying_flushdelivers remaining messages- Error handling when
onBatchthrows- Empty message scenarios
These would increase confidence in the new implementation's robustness.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/kafka/lib/AbstractKafkaConsumer.tspackages/kafka/lib/utils/KafkaMessageBatchStream.spec.tspackages/kafka/lib/utils/KafkaMessageBatchStream.ts
🧰 Additional context used
🧬 Code graph analysis (2)
packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts (1)
packages/kafka/lib/utils/KafkaMessageBatchStream.ts (1)
KafkaMessageBatchStream(23-99)
packages/kafka/lib/AbstractKafkaConsumer.ts (2)
packages/kafka/lib/utils/KafkaMessageBatchStream.ts (1)
KafkaMessageBatchStream(23-99)packages/kafka/lib/types.ts (2)
DeserializedMessage(46-51)SupportedMessageValues(42-44)
🔇 Additional comments (5)
packages/kafka/lib/utils/KafkaMessageBatchStream.ts (1)
23-43: Good refactor to Transform stream with callback-based API.The switch from Duplex to Transform with an explicit
onBatchcallback properly leverages Node's built-in backpressure. TheobjectMode: trueand async_transformpattern look correct.packages/kafka/lib/AbstractKafkaConsumer.ts (3)
193-206: Clean integration with proper error routing.The new batch stream initialization correctly:
- Attaches an error listener to the consumer stream
- Routes batch processing errors through
handlerErrorvia.catch()- Falls back to
handleSyncStreamfor non-batch mode with error handlingThis maintains the existing error handling patterns while enabling the new callback-based batch delivery.
198-202: Consider whetheronBatchshould propagate errors to pause the stream.Currently, errors from
this.consume()are caught and passed tohandlerError, but the batch stream continues accepting messages. This matches the previous behavior, but given the PR's goal of respecting backpressure, you may want errors to signal the stream to pause/destroy rather than silently continuing.If the intent is to continue processing despite errors (relying on retry logic), this is fine. Otherwise, consider propagating the error to stop the stream.
152-157: v8 ignore comments are correctly placed.The catch blocks handle edge cases that shouldn't occur in normal operation, and marking them for coverage exclusion is appropriate.
Also applies to: 169-174
packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts (1)
17-34: Tests correctly updated to new callback-based API.The pattern of using a closure-captured
resolvePromiseto signal completion when expected batches are received is clean and appropriate for this async testing scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
packages/kafka/lib/utils/KafkaMessageBatchStream.ts (1)
92-106: Consider skipping empty batch flushes.In edge cases (e.g., when
_flushis called after a partition's batch was already flushed), this method might invokeonBatchwith an empty messages array. While not harmful, it's unnecessary overhead.♻️ Optional: Guard against empty batches
private async flushCurrentBatchMessages(topic: string, partition: number) { const key = getTopicPartitionKey(topic, partition) // Clear timeout if (this.batchTimeoutPerTopicPartition[key]) { clearTimeout(this.batchTimeoutPerTopicPartition[key]) this.batchTimeoutPerTopicPartition[key] = undefined } const messages = this.currentBatchPerTopicPartition[key] ?? [] - // Push the batch downstream - await this.onBatch({ topic, partition, messages }) + // Push the batch downstream only if non-empty + if (messages.length > 0) { + await this.onBatch({ topic, partition, messages }) + } this.currentBatchPerTopicPartition[key] = [] }packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts (1)
239-295: Backpressure test validates sequential processing.The test correctly verifies that batches are processed one at a time and never concurrently. The assertions on timing and concurrency are thorough.
💡 Consider adding a stress test scenario
The current test uses 80ms delays between messages, ensuring each batch completes before the next arrives. Consider adding a complementary test where messages arrive rapidly (e.g., every 5ms) while batch processing is slow (e.g., 100ms), to verify backpressure under high load:
it('should handle backpressure with rapid message arrival', async () => { const messages = Array.from({ length: 10 }, (_, i) => ({ id: i + 1, topic: 'test-topic', partition: 0 })) let processing = 0 let maxConcurrent = 0 const stream = new KafkaMessageBatchStream<any>( async (_batch) => { processing++ maxConcurrent = Math.max(maxConcurrent, processing) await sleep(100) // Slow processing processing-- }, { batchSize: 1000, timeoutMilliseconds: 10 } ) // Rapid writes for (const msg of messages) { stream.write(msg) await sleep(5) // Much faster than processing } await waitAndRetry(() => processing === 0, 50, 50) expect(maxConcurrent).toBe(1) })This would validate backpressure when the stream buffer accumulates messages faster than they can be processed.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
packages/kafka/lib/utils/KafkaMessageBatchStream.spec.tspackages/kafka/lib/utils/KafkaMessageBatchStream.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts (1)
packages/kafka/lib/utils/KafkaMessageBatchStream.ts (1)
KafkaMessageBatchStream(21-107)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: general (22.x, @message-queue-toolkit/outbox-core) / build
- GitHub Check: general (24.x, @message-queue-toolkit/schemas) / build
- GitHub Check: general (22.x, @message-queue-toolkit/redis-message-deduplication-store) / build
- GitHub Check: general (24.x, @message-queue-toolkit/core) / build
- GitHub Check: general (22.x, @message-queue-toolkit/s3-payload-store) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (20.x, @message-queue-toolkit/outbox-core) / build
- GitHub Check: general (20.x, @message-queue-toolkit/gcs-payload-store) / build
- GitHub Check: general (24.x, @message-queue-toolkit/outbox-core) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (20.x, @message-queue-toolkit/redis-message-deduplication-store) / build
- GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (22.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (24.x, @message-queue-toolkit/s3-payload-store) / build
- GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: kafka (24.x) / build
- GitHub Check: kafka (22.x) / build
🔇 Additional comments (8)
packages/kafka/lib/utils/KafkaMessageBatchStream.ts (5)
1-20: Excellent architectural improvement!The shift from Duplex to Transform with an explicit callback pattern is the right solution for the OOM issue. The documentation clearly explains how backpressure prevents memory leaks.
21-43: Constructor refactoring looks good.The callback-first parameter pattern and the timeoutProcessingPromises Map provide the foundation for proper backpressure handling.
45-77: Backpressure implementation is well-designed.The key mechanism at lines 48-54 ensures that timeout-based flushes complete before accepting new messages. The snapshot pattern prevents race conditions, and the finally() cleanup ensures proper promise lifecycle management.
80-90: Proper stream cleanup on close.The _flush implementation correctly flushes all remaining batches when the stream is ending.
109-117: Helper functions are clean and correct.The topic-partition key management is straightforward, with appropriate defensive error handling.
packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts (3)
8-53: Test correctly updated for callback-based API.The pattern of using a dedicated
resolvePromiseto signal completion when the expected batch count is reached is clean and effective.
55-90: Timeout-based batching test looks good.The test correctly validates that messages are batched and flushed based on timeout when batch size is not reached.
92-237: Multi-partition tests validate correct batching behavior.The tests correctly verify that messages are batched separately per topic-partition combination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In @packages/kafka/lib/utils/KafkaMessageBatchStream.ts:
- Around line 97-111: flushCurrentBatchMessages currently calls onBatch even for
empty message arrays and leaves per-partition keys allocated; change it to
compute key via getTopicPartitionKey(topic, partition), clear and unset the
timeout entry from batchTimeoutPerTopicPartition as now, then read messages from
currentBatchPerTopicPartition[key] and if messages.length === 0 skip calling
onBatch; after a successful onBatch invocation (or when skipping due to empty)
delete both currentBatchPerTopicPartition[key] and
batchTimeoutPerTopicPartition[key] to release per-partition state and prevent
empty-batch callbacks.
- Around line 84-88: The _flush override must wait for any in-flight
timeout-triggered batch flushes and propagate errors to the stream by calling
callback(err) on failure; update flushAllBatches (or add tracking of pending
flush promises) so it awaits any timer-initiated onBatch promises before
returning, and change _flush to try { await this.flushAllBatches(); callback();
} catch (err) { callback(err); } so outstanding async flushes are awaited and
errors are forwarded.
🧹 Nitpick comments (2)
packages/kafka/lib/utils/KafkaMessageBatchStream.ts (2)
33-43: UseKafkaMessageBatchOptions+ validatebatchSize/timeoutMillisecondsearly.
Avoid duplicating the options type and fail fast on invalid configuration.Proposed diff
- constructor( - onBatch: OnMessageBatchCallback<TMessage>, - options: { batchSize: number; timeoutMilliseconds: number }, - ) { + constructor(onBatch: OnMessageBatchCallback<TMessage>, options: KafkaMessageBatchOptions) { super({ objectMode: true }) this.onBatch = onBatch + + if (options.batchSize <= 0) throw new RangeError('batchSize must be > 0') + if (options.timeoutMilliseconds < 0) + throw new RangeError('timeoutMilliseconds must be >= 0') + this.batchSize = options.batchSize this.timeout = options.timeoutMilliseconds this.currentBatchPerTopicPartition = {} this.batchTimeoutPerTopicPartition = {} }
114-122: MakesplitTopicPartitionKey()robust (avoidsplit(':')surprises).
UsinglastIndexOf(':')avoids accidental mis-parsing if the delimiter ever appears intopic.Proposed diff
const splitTopicPartitionKey = (key: string): { topic: string; partition: number } => { - const [topic, partition] = key.split(':') + const idx = key.lastIndexOf(':') + const topic = idx >= 0 ? key.slice(0, idx) : '' + const partition = idx >= 0 ? key.slice(idx + 1) : '' /* v8 ignore start */ if (!topic || !partition) throw new Error('Invalid topic-partition key format') /* v8 ignore stop */ - return { topic, partition: Number.parseInt(partition, 10) } + const partitionNumber = Number.parseInt(partition, 10) + if (Number.isNaN(partitionNumber)) throw new Error('Invalid partition in topic-partition key') + return { topic, partition: partitionNumber } }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
packages/kafka/lib/utils/KafkaMessageBatchStream.spec.tspackages/kafka/lib/utils/KafkaMessageBatchStream.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (24.x, @message-queue-toolkit/metrics) / build
- GitHub Check: general (20.x, @message-queue-toolkit/outbox-core) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (24.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (22.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (20.x, @message-queue-toolkit/metrics) / build
- GitHub Check: general (20.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (22.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (24.x, @message-queue-toolkit/gcs-payload-store) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
- GitHub Check: kafka (24.x) / build
- GitHub Check: kafka (22.x) / build
🔇 Additional comments (1)
packages/kafka/lib/utils/KafkaMessageBatchStream.ts (1)
1-27: Transform + awaited batch callback is the right direction for backpressure.
The new contract (“don’t accept more untilonBatchresolves”) matches the OOM fix intent.
Problem
The previous implementation used a Duplex stream with
push()to emit batches. This approach had a backpressure issue:_write()method accumulated messages and calledpush()synchronously when batches were readyfor awaitloop in AbstractKafkaConsumer) processed batches asynchronouslyThe root cause:
push()emits data without waiting for the consumer to finish processing, breaking the backpressure mechanism.Solution
Refactored to use a Transform stream with an async callback pattern:
await this.onBatch(batch)in_transform(), ensuring the stream doesn't process the next message until the callback completesChanges
Duplex→Transformstreamnew KafkaMessageBatchStream(onBatch, options)push()withawait onBatch()for proper async flow controlSummary by CodeRabbit
Bug Fixes
Refactor
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.