Skip to content

Commit 933807b

Browse files
committed
Fix writeChan and improve test
1 parent 687c80a commit 933807b

File tree

2 files changed

+34
-21
lines changed

2 files changed

+34
-21
lines changed

pulsar/producer_partition.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
185185
options: options,
186186
producerID: client.rpcClient.NewProducerID(),
187187
dataChan: make(chan *sendRequest, maxPendingMessages),
188-
writeChan: make(chan *pendingItem),
188+
writeChan: make(chan *pendingItem, maxPendingMessages),
189189
cmdChan: make(chan interface{}, 10),
190190
connectClosedCh: make(chan *connectionClosed, 1),
191191
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),

pulsar/producer_test.go

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2611,7 +2611,7 @@ func TestProducerKeepReconnectingAndThenCallSendAsync(t *testing.T) {
26112611
testProducerKeepReconnectingAndThenCallSendAsync(t, true)
26122612
}
26132613

2614-
func testProducerKeepReconnectingAndThenCallSendAsync(t *testing.T, isEnabledBatching bool) {
2614+
func testProducerKeepReconnectingAndThenCallSendAsync(t *testing.T, isDisableBatching bool) {
26152615
t.Helper()
26162616

26172617
req := testcontainers.ContainerRequest{
@@ -2641,45 +2641,58 @@ func testProducerKeepReconnectingAndThenCallSendAsync(t *testing.T, isEnabledBat
26412641
var testProducer Producer
26422642
require.Eventually(t, func() bool {
26432643
testProducer, err = client.CreateProducer(ProducerOptions{
2644-
Topic: newTopicName(),
2645-
Schema: NewBytesSchema(nil),
2646-
SendTimeout: 3 * time.Second,
2647-
DisableBatching: isEnabledBatching,
2644+
Topic: newTopicName(),
2645+
Schema: NewBytesSchema(nil),
2646+
SendTimeout: 3 * time.Second,
2647+
DisableBatching: isDisableBatching,
2648+
BatchingMaxMessages: 5,
26482649
})
26492650
return err == nil
26502651
}, 30*time.Second, 1*time.Second)
26512652

2652-
// send a message
2653-
errChan := make(chan error)
2654-
defer close(errChan)
2655-
2656-
testProducer.SendAsync(context.Background(), &ProducerMessage{
2657-
Payload: []byte("test"),
2658-
}, func(_ MessageID, _ *ProducerMessage, err error) {
2659-
errChan <- err
2660-
})
2661-
select {
2662-
case <-time.After(10 * time.Second):
2663-
t.Fatal("test timeout")
2664-
case err := <-errChan:
2653+
numMessages := 10
2654+
// Send 10 messages synchronously
2655+
for i := 0; i < numMessages; i++ {
2656+
send, err := testProducer.Send(context.Background(), &ProducerMessage{Payload: []byte("test")})
26652657
require.NoError(t, err)
2658+
require.NotNil(t, send)
2659+
}
2660+
2661+
// Send 10 messages asynchronously
2662+
errs := make(chan error, numMessages)
2663+
for i := 0; i < numMessages; i++ {
2664+
testProducer.SendAsync(context.Background(), &ProducerMessage{
2665+
Payload: []byte("test"),
2666+
}, func(id MessageID, producerMessage *ProducerMessage, err error) {
2667+
errs <- err
2668+
})
2669+
}
2670+
2671+
for i := 0; i < numMessages; i++ {
2672+
select {
2673+
case <-time.After(10 * time.Second):
2674+
t.Fatal("test timeout")
2675+
case err := <-errs:
2676+
require.NoError(t, err)
2677+
}
26662678
}
26672679

26682680
// stop pulsar server
26692681
timeout := 10 * time.Second
26702682
err = c.Stop(context.Background(), &timeout)
26712683
require.NoError(t, err)
26722684

2685+
finalErr := make(chan error, 1)
26732686
// send again
26742687
testProducer.SendAsync(context.Background(), &ProducerMessage{
26752688
Payload: []byte("test"),
26762689
}, func(_ MessageID, _ *ProducerMessage, err error) {
2677-
errChan <- err
2690+
finalErr <- err
26782691
})
26792692
select {
26802693
case <-time.After(10 * time.Second):
26812694
t.Fatal("test timeout")
2682-
case err = <-errChan:
2695+
case err = <-finalErr:
26832696
// should get a timeout error
26842697
require.ErrorIs(t, err, ErrSendTimeout)
26852698
}

0 commit comments

Comments
 (0)