Skip to content

Conversation

@xxzefgh
Copy link
Contributor

@xxzefgh xxzefgh commented Oct 5, 2025

Summary

Added pause/resume APIs for consumer

Related issue: #128


Decided to not mess with streams, as a result .pause will not have an immediate effect and only prevents next fetch requests.

API signature was basically copied from Java client, with added isPaused for fast lookups since it's also used internally.

Java's API also allows to easily pause all assignments with consumer.pause(consumer.assignment()), so maybe we should consider changing GroupAssignment type or reusing it here.


Disclaimer: part of the tests and documentation was written with AI assistance

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina
Copy link
Member

mcollina commented Oct 5, 2025

As an API, I would actually prefer to have a new method that provides a Node.js stream per partition, which would be able to independently have its own backpressure mechanism. That would provide a better DX.

@xxzefgh
Copy link
Contributor Author

xxzefgh commented Oct 5, 2025

As an API, I would actually prefer to have a new method that provides a Node.js stream per partition, which would be able to independently have its own backpressure mechanism. That would provide a better DX.

I think that depends on the usecase, we actually prefer to have pause/resume because if we want to consume messages in some past datetime range, we don't want to continue consuming from partitions that already satisfied the end date, it just adds unnecessary overhead. So, it's not only useful for backpressure control, and we only have to deal with 1 stream.

Just to clarify, I don't find separate streams a bad DX, but rather difficult to implement, at least for me.

@xxzefgh xxzefgh force-pushed the feature/pause-resume branch from f7208ed to 2ce88da Compare October 12, 2025 16:19
@ShogunPanda
Copy link
Contributor

I also agree with @mcollina. Given we have followed a streams only API for consumers, I don't want to have additional method on the consumer itself.

But, @xxzefgh I see your use case. I would add the logic to the MessagesStream, in particular, add the following methods:

  • pauseTopicPartitions
  • resumeTopicPartitions
  • getPausedTopicPartitions

Then you modify #fetch to skipped paused partitions.
This has the advantage to avoid any checks about joining the group and so forth.

Note that if a #fetch call has no partitions after filtering out all paused one then you should immediately pause the stream and bail out of the function.

This also implies that a call to resumeTopicPartions will also have to resume the stream if it was previously paused.

Also, use just this.emit('pausedTopicPartitions') and this.emit('resumedTopicPartitions') where appropriate.

What do you think? Are you willing to implement this?

@xxzefgh
Copy link
Contributor Author

xxzefgh commented Oct 15, 2025

@ShogunPanda Sure, I can get it done. Would you be willing to accept seek API's too in another PR?

@ShogunPanda
Copy link
Contributor

Amazing!
About seek, what is the intended usage?

Copy link
Contributor

@ShogunPanda ShogunPanda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(already answered - Requesting for changes just to make sure this isn't merged by mistake)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants