Skip to content

feat: support blocking send_non_blocking() and other RPCs#355

Merged
BewareMyPower merged 6 commits intomasterfrom
bewaremypower/block-if-queue-full
Sep 15, 2025
Merged

feat: support blocking send_non_blocking() and other RPCs#355
BewareMyPower merged 6 commits intomasterfrom
bewaremypower/block-if-queue-full

Conversation

@BewareMyPower
Copy link
Contributor

Fixes #342

Motivation

#312 changes connection's outbound channel from unbounded to bounded and once the channel is full, all RPC requests will fail with the SlowDown error. However, other RPCs like Ack could also be affected. This is different from other Pulsar client SDKs because only pending Send RPCs should be limited.

This behavior is not documented and hence very confusing.

Modifications

Add a block_if_queue_full option to simulate the feature from Java client:
https://github.com/apache/pulsar/blob/14543d3fde935d1b70e3707a8b2c0294eb53dccb/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java#L241

  • For Send RPC
    • call async_channel::Sender::send when this option is enabled (disabled by default)
    • keep calling async_channel::Sender::try_send and fail with SlowDown if queue is full
  • For other RPCs: call async_channel::Sender::send

Add block_if_queue_full to show the difference between the option is enabled and disabled.

Additionally, add documents to this new option, as well as send_non_blocking because it's hard for users to know how to handle this SlowDown error without the documentation.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for blocking behavior when the internal pending queue is full during send operations, addressing issue #342. The change allows users to choose between blocking and non-blocking behavior when the outbound channel reaches capacity.

  • Adds block_queue_if_full option to ProducerOptions to control blocking behavior for send operations
  • Modifies connection logic to differentiate between send RPCs (which can fail fast) and other RPCs (which should block)
  • Enhances documentation for the new option and existing send_non_blocking method with usage examples

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
src/producer.rs Adds block_queue_if_full option, updates documentation with usage examples, and includes comprehensive tests
src/connection.rs Refactors message sending logic to support both blocking and non-blocking modes based on the new parameter
src/client.rs Adds documentation for the with_outbound_channel_size method

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@BewareMyPower BewareMyPower marked this pull request as draft September 12, 2025 03:37
@BewareMyPower BewareMyPower marked this pull request as ready for review September 12, 2025 03:58
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.

Comments suppressed due to low confidence (1)

src/producer.rs:149

  • Typo in comment: 'treshold' should be 'threshold'.
    /// batch size in bytes treshold (only relevant when batch_size active).

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@BewareMyPower BewareMyPower merged commit 51b93ed into master Sep 15, 2025
7 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/block-if-queue-full branch September 15, 2025 07:03
@FlorentinDUBOIS
Copy link
Collaborator

Thank that a great addition!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Does every operation need to check for SlowDown and retry?

3 participants