Skip to content

Calling producer.Flush concurrently would hang indefinitely #1357

@YanshuoH

Description

@YanshuoH

Expected behavior

Calling producer.Flush concurrently should not hang forever

Actual behavior

Calling producer.Flush with multiple go-routines simutaneously, only one would succeed, others would hang forever.

Steps to reproduce

Assuming having a function that would send async messages and flush immediately.
Disclamer:

  1. I know this is not a common usage but just to illustrate the situation
  2. Normally we would add context with timeout for function calling, again, just to illustrate the situation, I've used the context.Background()
func test3(disableBatching bool) {
	fmt.Println("running test 3 with disableBatching", disableBatching)
	client, err := pulsarsdk.NewClient(pulsarsdk.ClientOptions{
		URL:            pulsarEndpoint,
		Authentication: pulsarsdk.NewAuthenticationToken(pulsarToken),
	})
	if err != nil {
		panic(err)
	}
	defer client.Close()
	producer, err := client.CreateProducer(pulsarsdk.ProducerOptions{
		Topic:           pulsarTopic,
		Name:            uuid.NewString(),
		DisableBatching: disableBatching,
	})
	if err != nil {
		panic(err)
	}
	defer producer.Close()

	ctx, cancel := registerExit()
	defer cancel()

	const n = 10
	var (
		wg sync.WaitGroup
	)
	wg.Add(n)
	for i := 0; i < n; i++ {
		go func(i int) {
			producer.SendAsync(ctx, &pulsarsdk.ProducerMessage{
				Payload: []byte(uuid.NewString()),
			}, func(id pulsarsdk.MessageID, message *pulsarsdk.ProducerMessage, err error) {
				wg.Done()
				if err != nil {
					panic(err)
				}
			})

			if err := producer.Flush(); err != nil {
				panic(err)
			}
		}(i)
	}

	wg.Wait()

With additional logs, you can see that one call of producer.Flush succeeds, while others hang indefinitely.

Check the flush related codes, first we can observe that the expected case <-flushReq.doneCh is never triggered.

func (p *partitionProducer) FlushWithCtx(ctx context.Context) error {
	if p.getProducerState() != producerReady {
		return ErrProducerClosed
	}

	flushReq := &flushRequest{
		doneCh: make(chan struct{}),
		err:    nil,
	}
	select {
	case <-ctx.Done():
		return ctx.Err()
	case p.cmdChan <- flushReq:
	}

	// wait for the flush request to complete
	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-flushReq.doneCh:
		return flushReq.err
	}
}

The root cause might be here:

func (p *partitionProducer) internalFlush(fr *flushRequest) {
	p.clearPendingSendRequests()

	if !p.options.DisableBatching {
		p.internalFlushCurrentBatch()
	}

	pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
	if !ok {
		close(fr.doneCh)
		return
	}

	// lock the pending request while adding requests
	// since the ReceivedSendReceipt func iterates over this list
	pi.Lock()
	defer pi.Unlock()

	if pi.isDone {
		// The last item in the queue has been completed while we were
		// looking at it. It's safe at this point to assume that every
		// message enqueued before Flush() was called are now persisted
		close(fr.doneCh)
		return
	}

	pi.flushCallback = func(err error) {
		fr.err = err
		close(fr.doneCh)
	}
}

pi, ok := p.pendingQueue.PeekLast().(*pendingItem) returns the same pi for all concurrent flush calls, causing pi.flushCallback to be overwritten. As a result, only the most recent fr.doneCh will be closed, causing all other flush requests to hang indefinitely.

Possible fix

I've applied a monkey patch of pendingItem by altering:

type pendingItem struct {
	sync.Mutex
	buffer        internal.Buffer
	sequenceID    uint64
	createdAt     time.Time
	sentAt        time.Time
	sendRequests  []interface{}
	isDone        bool
	flushCallback func(err error)
}

to:

type pendingItem struct {
	sync.Mutex
	buffer        internal.Buffer
	sequenceID    uint64
	createdAt     time.Time
	sentAt        time.Time
	sendRequests  []interface{}
	isDone        bool
	flushCallbacks []func(err error)
}

And:

func (i *pendingItem) done(err error) {
	if i.isDone {
		return
	}
	i.isDone = true
	buffersPool.Put(i.buffer)
	if i.flushCallback != nil {
		i.flushCallback(err)
	}
}

to

func (i *pendingItem) done(err error) {
	if i.isDone {
		return
	}
	i.isDone = true
	buffersPool.Put(i.buffer)
	for _, cb := range i.flushCallback {
		cb(err)
	}
}

Same applies to internalFlush:

	pi.flushCallbacks = append(pi.flushCallbacks, func(err error) {
		fr.err = err
		close(fr.doneCh)
	})

And the test3 would pass.

System configuration

Pulsar version: 3.2.4
Pulsar-client-go version: 0.14.0

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions