Skip to content

Conversation

@Gilthoniel
Copy link
Contributor

@Gilthoniel Gilthoniel commented May 20, 2025

Fixes #1371

Motivation

Some buffer ends up in two go routines, one reading to write to the connection and another one that writes a new message. This happens because the buffer is released without any guarantee that the connection is done with it.

Modifications

This adds a check to reuse the buffer only when the sending request is successful which means that a receipt has been received and thus that the buffer has been already read. Any error when finalizing the pending item is a risk of the buffer being not completely done.

Verifying this change

  • Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as (please describe tests).

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): no
  • The public API: no
  • The schema: no
  • The default values of configurations: no
  • The wire protocol: no

Documentation

  • Does this pull request introduce a new feature? no

@RobertIndie RobertIndie requested a review from Copilot June 17, 2025 10:41

This comment was marked as outdated.

// pending item is successful because that is the only state that ensure
// a finality of the buffer at that time (an erroneuous item might still
// be in the connection sending queue).
buffersPool.Put(i.buffer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the buffer only returned when there is no error? It seems the buffer will not be returned if this pending item fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct because with current logic, it's impossible to be certain 100% that the buffer is not in a connection queue which is causing data race. So the idea is that in the rare cases when the item fails, the buffer is thrown away instead of putting it back in the pool and causing data races.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see your comment in the issue, let me have a look at it.

Copy link
Contributor Author

@Gilthoniel Gilthoniel Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RobertIndie I've tried a different approach that should go in the direction you were saying. Let me know if that makes sense.

@Gilthoniel Gilthoniel requested a review from RobertIndie June 18, 2025 05:16
@Gilthoniel Gilthoniel force-pushed the bugfix/buffer-reuse branch 4 times, most recently from 028ba95 to 853ac2b Compare June 19, 2025 08:23
@RobertIndie RobertIndie requested a review from Copilot June 23, 2025 10:42
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes an issue where a buffer could be used concurrently in multiple goroutines by ensuring that the buffer is only recycled when it is safely released by all users.

  • Introduces the SharedBuffer type with reference counting.
  • Updates WriteData and related functions to use SharedBuffer.
  • Adds tests to verify the correct behavior of buffer recycling.

Reviewed Changes

Copilot reviewed 6 out of 7 changed files in this pull request and generated no comments.

Show a summary per file
File Description
pulsar/producer_partition.go Update pendingItem and writeData to use SharedBuffer and call Recycle() appropriately.
pulsar/internal/connection_test.go Update tests to validate the new SharedBuffer recycling behavior.
pulsar/internal/connection.go Modify WriteData and run to manage buffer borrowing and recycling with SharedBuffer.
pulsar/internal/buffer_test.go Add tests to verify the reference counting behavior of SharedBuffer.
pulsar/internal/buffer.go Introduce SharedBuffer and reference counting methods.
pulsar/consumer_multitopic_test.go Update WriteData signature to use SharedBuffer.
Comments suppressed due to low confidence (1)

pulsar/internal/connection.go:465

  • [nitpick] Consider including a comment clarifying why Borrow() is called here despite the buffer already being borrowed in the producer. This double borrow ensures that the SharedBuffer tracks independent usages for the pending queue and for the connection write, and clarifies the intended recycling behavior.
	data.Borrow()

Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the overall design should be:

  • Borrow the buffer from the pool.
  • Recycle the buffer by returning it to the pool.
  • Support keeping the buffer for later use.

So, we can add a wrapper called BufferPool for sync.Pool. This wrapper will handle borrowing and recycling buffers within the pool. We only need to add one method, Retain, to the SharedBuffer.

Please note that you should retain the buffer before adding it to the pending queue. It should work like this when writing message data:

  1. Retain the buffer first.
  2. Then, put it into the pending queue. This way, the buffer won't be recycled even if another goroutine (like in failPendingMessages) returns it to the pool.
  3. Finally, write the buffer to the connection.

@Gilthoniel Gilthoniel force-pushed the bugfix/buffer-reuse branch from 853ac2b to e9dea4e Compare June 24, 2025 10:17
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just found another buffer leak issue. We should cleanup the writeRequestsCh when closing the connection

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Just see that you push the new codes. Let me review it again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the cleanup anyway and also fixed an issue with the backoff with the latest merge on master.

@Gilthoniel
Copy link
Contributor Author

Gilthoniel commented Jun 24, 2025

@RobertIndie while implementing your proposal, I discovered another race because we can have the situation where the buffer is sent to multiple connections and read in concurrence. There is then a risk of partial reading even if the buffer is released properly. I've tried something different that should solve it once and for all.

@Gilthoniel Gilthoniel force-pushed the bugfix/buffer-reuse branch from e9dea4e to f43740c Compare June 24, 2025 10:22
@Gilthoniel Gilthoniel force-pushed the bugfix/buffer-reuse branch from f43740c to e88d02a Compare June 24, 2025 11:13
@RobertIndie
Copy link
Member

@RobertIndie while implementing your proposal, I discovered another race because we can have the situation where the buffer is sent to multiple connections and read in concurrence. There is then a risk of partial reading even if the buffer is released properly. I've tried something different that should solve it once and for all.

I think this should be a separate issue. The approach in this PR causes a deep copy of data between the producer partition and the connection. I believe this will affect performance. It's unnecessary to sacrifice the perf to fix a very rare problem.

To solve this, I think we should clear all writeRequestsCh so the old connection does not hold any buffer before the producer reconnects and gets a new connection.

@Gilthoniel Gilthoniel force-pushed the bugfix/buffer-reuse branch from 6768377 to ee826bc Compare June 25, 2025 07:03
@Gilthoniel Gilthoniel requested a review from RobertIndie June 25, 2025 07:03
Comment on lines +257 to +262
if !buffer.isCurrentlyShared.CompareAndSwap(true, false) {
// When swapping fails, it means that a previous go routine has already
// tried to recycle the buffer, indicating that the sharing is not there
// anymore and the buffer can safely be reused.
p.pool.Put(buffer)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If:

  • Buffer A created from pool
  • Call A.Retain, create a new buffer B (isCurrentlyShared=true)
  • Put back the A(isCurrentlyShared=false), the buffer A will be returned to the pool. And the buffer get recycled.
  • Panic! The buffer B still holds this buffer.

Why not just use the ref count here?

defer func() {
if !written {
// Buffer has failed to be written and can now be reused.
c.bufferPool.Put(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a new method like Recycle into the SharedBuffer so we don't need to maintain the bufferPool in the connection. The SharedBuffer can hold the ref to the buffer pool.

@Gilthoniel Gilthoniel closed this Jun 25, 2025
@RobertIndie RobertIndie added this to the v0.16.0 milestone Jul 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Buffer data race

2 participants