Skip to content

Conversation

@gunli
Copy link
Contributor

@gunli gunli commented Feb 18, 2025

Fixes #1332

Master Issue: #1332

Motivation

SendAsync() callback should be called to give a response to the user/application when the producer is busy in reconnecting.

Modifications

Run reconnecting in a seperate goroutine.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • run TestProducerKeepReconnectingAndThenCallSendAsync()

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): ( no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@gunli
Copy link
Contributor Author

gunli commented Feb 18, 2025

@gunli
Copy link
Contributor Author

gunli commented Feb 25, 2025

PRs to fix potential write conflicts #1336 and data race #1338 have been pushed.

@gunli gunli marked this pull request as draft March 7, 2025 08:45
@gunli gunli marked this pull request as ready for review March 10, 2025 07:52
@gunli gunli requested a review from shibd March 10, 2025 07:52
Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR changes a critical part of the event loop. I'm concerned it might introduce risks if we don't manage concurrency properly.

I agree with @shibd : "the main purpose of sendAsync is to avoid waiting for the broker's response, not to skip all the logic." This async approach makes network requests asynchronous, similar to how it's implemented in the Java client, not fully asynchronous for all operations.

So, I think it would be easier if we accept this concept. SendAsync can wait until the runEventLoop processes it and pushes it into the pendingQueue or a batch, just like the Java client. Before entering the pendingQueue, SendAsync itself can check for timeouts and handle the callback. After entering the pendingQueue, failTimeoutMessages can manage the timeout.

@gunli
Copy link
Contributor Author

gunli commented Mar 12, 2025

SendAsync can wait until the runEventLoop processes it and pushes it into the pendingQueue or a batch, just like the Java client. Before entering the pendingQueue, SendAsync itself can check for timeouts and handle the callback. After entering the pendingQueue, failTimeoutMessages can manage the timeout.

I believe that performing timeout checks and callback handling before entering the pendingQueue will make the producer increasingly complex, and it will require additional cleanup work before closing the producer.

In my opinion, this approach is essentially the same in nature as performing timeout checks after entering the pendingQueue.

Regarding your concerns about concurrency risks, I don't think there is any such risk. When a producer receives a connectionClosed event, it is removed from the connection's handler list, ensuring that no further connectionClosed events will be triggered to make the producer reconnect again.

Furthermore, even if we don't reconnect in a separate goroutine, the remaining changes in this PR are still necessary. This is because failTimeoutMessages may occur before connection.internalWriteData() when the timeout configuration is set to an extremely short value. In such a case, when a timeout happens, the buffer is released and reallocated to another pendingItem. Then, when the code reaches connection.internalWriteData(), the buffer refers to a new value, resulting in a data race.

Copy link
Member

@nodece nodece left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue arises because the message is not added to the pending queue, preventing the timeout goroutine from triggering a timeout.

To resolve this, we can first add the message to the pending queue before sending it to the data channel.

@nodece
Copy link
Member

nodece commented Mar 12, 2025

I submitted #1345 to fix #1332.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug][Producer] The callback was not invoked during reconnecting.

4 participants