-
Notifications
You must be signed in to change notification settings - Fork 20
Description
right now consumers in this library return a single readable stream, which automatically handles backpressure (which is awesome). this works great if the consumer treats messages from all partitions as a single big firehose of data that can all be consumed in any order β for example analytics event ingestion or similar. but for cases where the consumer relies heavily on message ordering within a single partition, backpressure management via a single stream doesn't work as well.
in our app, we use KafkaJS to fill a search index based on incoming replication data from an upstream DB. the DB data is sharded across partitions by tenant/workspace. we route the messages from each partition to a separate instance of our search indexing service, and within each partition order is extremely important. the throughput of each partition varies, and we don't want a big spike in throughput from a single partition to slow down other partitions. we also don't want issues with one instance of our search indexing system to cause backpressure on all partitions, slowing them all down.
in KafkaJS's callback based api, there is a pause function that can be called per-partition to achieve this:
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
await processMessage(message)
} catch (e) {
if (e instanceof TooManyRequestsError) {
// Pause only this specific partition
consumer.pause([{ topic, partitions: [partition] }])
// Resume this partition after a delay
setTimeout(() => {
consumer.resume([{ topic, partitions: [partition] }])
}, e.retryAfter * 1000)
throw e
}
}
}
})as i understand it (please correct me if mistaken), in this library's readable stream implementation, this kind of per-partition backpressure management is impossible. if there is congestion or an issue downstream on one partition, that will cause pressure to build up in the stream across all partitions, slowing unrelated partitions down.
i love the readable stream api you guys have right now β in our app we actually wrap KafkaJS in a thing that returns a readable stream per-partition and causes pause() when a partition's stream fills up. it would be great if we could opt-in to getting back a readable stream per partition (i.e. in an array) instead of a single stream:
const streams = await consumer.consume({
autocommit: true,
topics: ['my-topic'],
streamingMode: StreamingModes.Partition // the options here could be Topic (default, current behavior) or Partition (new option)?
})
// in "partition" streaming mode, the consumer returns an array of streams β one per partition
for (const stream of streams) {
// do thing with partition-level stream
}let me know if there's another way to do this already, or if you think this is a good idea. i may or may not be able to figure out how to do a PR for it myself if you like the idea π