Skip to content

Conversation

@RobertIndie
Copy link
Member

@RobertIndie RobertIndie commented Nov 13, 2025

Motivation

When a partition update is triggered and the creation of one of the new producers fails, it currently fails all existing producers here:

for _, producer := range p.producers {
if producer != nil {
producer.Close()
}
}
This causes the entire producer to become unavailable. All subsequent produce operations will fail with a "producer already closed" error.

Modifications

Currently, two variables, producers and producersPtr, are used to manage the producer list inside producer_impl:

producers []Producer
producersPtr unsafe.Pointer

Both are accessed across multiple methods inside producer_impl, which is misleading and makes producersPtr ineffective.

This PR addresses the issue by:

  • Using only producers as a atmoic value to access the producer list inside producer_impl. Only internalCreatePartitionsProducers can modify producers, which protects the producer list.
  • Ensuring that during a partition update, new producers are not added to producers until all new producers are successfully created.

Verifying this change

This change added tests

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@RobertIndie RobertIndie self-assigned this Nov 13, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a critical bug where partition update failures would incorrectly close all existing producers, making the entire producer unavailable. The fix refactors producer list management to use only producersPtr with atomic operations, and ensures that during partition updates, new producers are only added after successful creation.

  • Replaced dual-variable producer management (producers and producersPtr) with single atomic pointer approach using producersPtr
  • Modified error handling to close only newly created producers on failure, preserving existing working producers
  • Added comprehensive test coverage for the partition update failure scenario

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
pulsar/producer_impl.go Core fix: removed producers field, added helper methods getProducer() and getProducers(), updated error handling to only close new producers on failure
pulsar/producer_test.go Updated existing tests to use new getProducer() method, added TestPartitionUpdateFailed to verify fix
pulsar/message_chunking_test.go Updated test to use new getProducer() method

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@RobertIndie RobertIndie added this to the v0.18.0 milestone Nov 13, 2025
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest not refactor the producers related code. Please just address the issue that existing producers are closed for such a failure.

Actually I think the existing design of #286 is not safe.

	producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr))
	if partition >= len(producers) {

producers refers the memory of the slice that p.producersPtr points to, which is the pair of the pointer and the length.

However, when p.producers = make([]Producer, newNumPartitions) is called, the slice content could be modified, so the following race condition could happen:

time goroutine action
t1 A atomic.LoadPointer(&p.producersPtr) loads the pointer p0
t2 B modify the length from 5 to 10
t3 A dereference the slice and read pointer p0 and new length 10
t4 B modify the pointer from p0 to p1

The key point is that the modification of a slice (pointer and length) is not atomic.

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, it seems that p.Lock() in internalCreatePartitionsProducers becomes unnecessary because it's added to guarantee the safety to access the producers slice. With an atomic value, the slice pointed by producers is always immutable.

However, I suggest just keeping producers a slice and acquire read lock in getPartition. Removing the lock in this case looks like a pre-mature optimization. The modification on producers is a rare case, most time it quickly returns in internalCreatePartitionsProducers because oldNumPartitions == newNumPartitions.

@BewareMyPower
Copy link
Contributor

I wrote an example locally:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	slice := make([]int, 10)
	for i := 0; i < len(slice); i++ {
		slice[i] = rand.Intn(100)
	}
	wg := sync.WaitGroup{}
	wg.Add(2)
	lock := sync.Mutex{}
	go func() {
		start := time.Now()
		sum := int64(0)
		const n = 10000000
		for i := 0; i < n; i++ {
			partition := rand.Int()
			lock.Lock()
			sum = sum + int64(slice[partition%len(slice)])
			lock.Unlock()
		}
		finished := time.Since(start)
		fmt.Println(sum, " elapsed time:", finished, " avg: ", finished.Nanoseconds()/n, " ns")
		wg.Done()
	}()
	go func() {
		for i := 0; i < 100; i++ {
			lock.Lock()
			for j := 0; j < 10; j++ {
				slice = append(slice, rand.Intn(100))
			}
			lock.Unlock()
			time.Sleep(1 * time.Millisecond)
		}
		wg.Done()
	}()
	wg.Wait()
}
  • goroutine 1: choose a random element from the slice and calculate the sum
  • goroutine 2: append 10 random values to the slice

Example outputs:

502615270  elapsed time: 166.572958ms  avg:  16  ns

The lock contention is not as scary as thought by many people. I'm wondering how much can be improved by using such an atomic approach?

@BewareMyPower
Copy link
Contributor

Without the lock:

477056558  elapsed time: 79.822416ms  avg:  7  ns
	wg.Add(1)
	//wg.Add(2)
	//lock := sync.Mutex{}
	go func() {
		start := time.Now()
		sum := int64(0)
		const n = 10000000
		for i := 0; i < n; i++ {
			partition := rand.Int()
			//lock.Lock()
			sum = sum + int64(slice[partition%len(slice)])
			//lock.Unlock()
		}
		finished := time.Since(start)
		fmt.Println(sum, " elapsed time:", finished, " avg: ", finished.Nanoseconds()/n, " ns")
		wg.Done()
	}()

@RobertIndie
Copy link
Member Author

After this change, it seems that p.Lock() in internalCreatePartitionsProducers becomes unnecessary because it's added to guarantee the safety to access the producers slice. With an atomic value, the slice pointed by producers is always immutable.

Yes. We don't need the lock here in this PR. As we discussed privately, we’ve decided to remove the lock.

I agree that removing the lock isn’t necessary in this case. However, the immutable-view approach used in this PR is just as straightforward (if we remove the producer lock) and doesn’t introduce any safety concerns. We can add the lock later if we really need one.

@BewareMyPower BewareMyPower merged commit fa2b263 into apache:master Nov 18, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants