Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,13 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
sequenceID: sequenceID,
sendRequests: callbacks,
})
for _, cb := range callbacks {
if sr, ok := cb.(*sendRequest); ok {
if sr.enqueued != nil {
close(sr.enqueued)
}
}
}
p._getConn().WriteData(ctx, buffer)
}
}
Expand Down Expand Up @@ -1312,6 +1319,7 @@ func (p *partitionProducer) internalSendAsync(
flushImmediately: flushImmediately,
publishTime: time.Now(),
chunkID: -1,
enqueued: make(chan struct{}),
}

if err := p.prepareTransaction(sr); err != nil {
Expand Down Expand Up @@ -1353,6 +1361,17 @@ func (p *partitionProducer) internalSendAsync(
}

p.dataChan <- sr
select {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the select here will block SendAsync() when producer is in reconnecting while the pengding queue is not full, it is not good for those latency-sensitive applications such as game.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I agree. I have put a new PR #1422 which also address this issue. PTAL

case <-sr.enqueued:
case <-ctx.Done():
err := ctx.Err()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, timeout is not set to the ctx provided by application/user, may be we should update the ctx with context.WithTimeout(ctx, config.timeout)?

if errors.Is(err, context.DeadlineExceeded) { // Convert DeadlineExceeded error to ErrSendTimeout
err = ErrSendTimeout
}
sr.callbackOnce.Do(func() {
runCallback(callback, nil, msg, err)
})
}
}

func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
Expand Down Expand Up @@ -1583,6 +1602,7 @@ type sendRequest struct {
chunkID int
uuid string
chunkRecorder *chunkRecorder
enqueued chan struct{}

// resource management

Expand Down
113 changes: 113 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2605,3 +2605,116 @@

client.Close()
}

func TestSendAsyncCouldTimeoutWhileReconnecting(t *testing.T) {
testSendAsyncCouldTimeoutWhileReconnecting(t, false)
testSendAsyncCouldTimeoutWhileReconnecting(t, true)
}

func testSendAsyncCouldTimeoutWhileReconnecting(t *testing.T, isDisableBatching bool) {
t.Helper()

req := testcontainers.ContainerRequest{
Image: getPulsarTestImage(),
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
WaitingFor: wait.ForExposedPort(),
Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
}
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
require.NoError(t, err, "Failed to start the pulsar container")
defer func() {
err := c.Terminate(context.Background())
if err != nil {
t.Fatal("Failed to terminate the pulsar container", err)
}
}()

endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
require.NoError(t, err, "Failed to get the pulsar endpoint")

client, err := NewClient(ClientOptions{
URL: endpoint,
ConnectionTimeout: 5 * time.Second,
OperationTimeout: 5 * time.Second,
})
require.NoError(t, err)
defer client.Close()

var testProducer Producer
require.Eventually(t, func() bool {
testProducer, err = client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
Schema: NewBytesSchema(nil),
SendTimeout: 3 * time.Second,
DisableBatching: isDisableBatching,
BatchingMaxMessages: 5,
MaxPendingMessages: 10,
})
return err == nil
}, 30*time.Second, 1*time.Second)

numMessages := 10
// Send 10 messages synchronously
for i := 0; i < numMessages; i++ {
send, err := testProducer.Send(context.Background(), &ProducerMessage{Payload: []byte("test")})
require.NoError(t, err)
require.NotNil(t, send)
}

// stop pulsar server
timeout := 10 * time.Second
err = c.Stop(context.Background(), &timeout)
require.NoError(t, err)

// Test the SendAsync could be timeout if the producer is reconnecting

finalErr := make(chan error, 1)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
testProducer.SendAsync(ctx, &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
finalErr <- err
})
select {
case <-time.After(10 * time.Second):
t.Fatal("test timeout")
case err = <-finalErr:
// should get a timeout error
require.ErrorIs(t, err, ErrSendTimeout)
}
close(finalErr)

// Test that the SendAsync could be timeout if the pending queue is full

go func() {
// Send 10 messages asynchronously to make the pending queue full
errs := make(chan error, numMessages)
for i := 0; i < numMessages; i++ {
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("test"),
}, func(id MessageID, producerMessage *ProducerMessage, err error) {

Check failure on line 2699 in pulsar/producer_test.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'id' seems to be unused, consider removing or renaming it as _ (revive)
errs <- err
})
}
}()

time.Sleep(3 * time.Second)
finalErr = make(chan error, 1)
testProducer.SendAsync(ctx, &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
finalErr <- err
})
select {
case <-time.After(10 * time.Second):
t.Fatal("test timeout")
case err = <-finalErr:
// should get a timeout error
require.ErrorIs(t, err, ErrSendTimeout)
}
close(finalErr)
}
Loading