Skip to content

Commit 44d5aba

Browse files
committed
Fix send buffer cleanup in reconnection
1 parent 092cfc6 commit 44d5aba

File tree

3 files changed

+60
-25
lines changed

3 files changed

+60
-25
lines changed

pulsar/internal/connection.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,9 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) {
479479
case <-ctx.Done():
480480
c.log.Debug("Write data context cancelled")
481481
return
482+
case <-c.closeCh:
483+
c.log.Debug("Write data connection closed")
484+
return
482485
default:
483486
// Channel full, fallback to probe if connection is closed
484487
}
@@ -492,6 +495,9 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) {
492495
case <-ctx.Done():
493496
c.log.Debug("Write data context cancelled")
494497
return
498+
case <-c.closeCh:
499+
c.log.Debug("Write data connection closed")
500+
return
495501
case <-time.After(100 * time.Millisecond):
496502
// The channel is either:
497503
// 1. blocked, in which case we need to wait until we have space
@@ -513,6 +519,8 @@ func (c *connection) internalWriteData(ctx context.Context, data Buffer) {
513519
select {
514520
case <-ctx.Done():
515521
return
522+
case <-c.closeCh:
523+
return
516524
default:
517525
if _, err := c.cnx.Write(data.ReadableSlice()); err != nil {
518526
c.log.WithError(err).Warn("Failed to write on connection")

pulsar/producer_partition.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
888888
func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, callbacks []interface{}) {
889889
select {
890890
case <-p.ctx.Done():
891+
buffer.Release()
891892
for _, cb := range callbacks {
892893
if sr, ok := cb.(*sendRequest); ok {
893894
sr.done(nil, ErrProducerClosed)

pulsar/producer_test.go

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
package pulsar
1919

2020
import (
21+
"bufio"
2122
"context"
2223
"errors"
2324
"fmt"
24-
"github.com/prometheus/client_golang/prometheus/promhttp"
2525
"net/http"
2626
"os"
2727
"strconv"
@@ -31,6 +31,8 @@ import (
3131
"testing"
3232
"time"
3333

34+
"github.com/prometheus/client_golang/prometheus/promhttp"
35+
3436
"github.com/stretchr/testify/require"
3537
"github.com/testcontainers/testcontainers-go"
3638
"github.com/testcontainers/testcontainers-go/wait"
@@ -2630,9 +2632,6 @@ func (m *mockConn) SendRequest(requestID uint64, req *pb.BaseCommand, callback f
26302632
}
26312633

26322634
func TestSendBufferRetainWhenConnectionStuck(t *testing.T) {
2633-
go func() {
2634-
log.Println(http.ListenAndServe("localhost:6060", nil))
2635-
}()
26362635
topicName := newTopicName()
26372636

26382637
client, err := NewClient(ClientOptions{
@@ -2650,6 +2649,7 @@ func TestSendBufferRetainWhenConnectionStuck(t *testing.T) {
26502649
assert.NoError(t, err)
26512650
pp := p.(*producer).producers[0].(*partitionProducer)
26522651

2652+
// Create a mock connection that tracks written buffers
26532653
conn := &mockConn{
26542654
dummyConnection: &dummyConnection{},
26552655
buffers: make([]internal.Buffer, 0),
@@ -2663,43 +2663,62 @@ func TestSendBufferRetainWhenConnectionStuck(t *testing.T) {
26632663
Payload: []byte("test"),
26642664
}, nil)
26652665

2666-
go func() {
2667-
log.Info("Starting Prometheus metrics at http://localhost:", 8800, "/metrics")
2668-
http.Handle("/metrics", promhttp.Handler())
2669-
http.ListenAndServe(":"+strconv.Itoa(8800), nil)
2670-
}()
2671-
2666+
// Wait for the buffer to be written to the connection
26722667
assert.Eventually(t, func() bool {
26732668
return len(conn.buffers) != 0
26742669
}, 5*time.Second, 100*time.Millisecond)
26752670

2671+
// Simulate connection failure and verify buffer retention
26762672
pp.failPendingMessages(errors.New("expected error"))
26772673

26782674
assert.Equal(t, 1, len(conn.buffers), "Expected one buffer to be sent")
26792675
b := conn.buffers[0]
26802676
assert.Equal(t, int64(1), b.RefCnt(), "Expected buffer to have a reference count of 1 after sending")
2677+
}
26812678

2682-
time.Sleep(1 * time.Hour)
2679+
func getSendingBuffersCount() (float64, error) {
2680+
resp, err := http.Get("http://localhost:8801/metrics")
2681+
if err != nil {
2682+
return 0, fmt.Errorf("failed to fetch metrics: %v", err)
2683+
}
2684+
defer resp.Body.Close()
2685+
2686+
scanner := bufio.NewScanner(resp.Body)
2687+
for scanner.Scan() {
2688+
line := scanner.Text()
2689+
if strings.Contains(line, "pulsar_client_sending_buffers_count{client=\"go\"}") {
2690+
// Parse the metric line to extract the value
2691+
// Format: pulsar_client_sending_buffers_count{client="go"} 5
2692+
parts := strings.Fields(line)
2693+
if len(parts) >= 2 {
2694+
var value float64
2695+
_, err := fmt.Sscanf(parts[len(parts)-1], "%f", &value)
2696+
if err != nil {
2697+
return 0, fmt.Errorf("failed to parse metric value: %v", err)
2698+
}
2699+
return value, nil
2700+
}
2701+
}
2702+
}
26832703

2704+
return 0, fmt.Errorf("sending_buffers_count metric not found")
26842705
}
26852706

2686-
func TestReconnect(t *testing.T) {
2687-
go func() {
2688-
log.Println(http.ListenAndServe("localhost:6060", nil))
2689-
}()
2690-
2707+
func TestSendingBuffersCleanupAfterMultipleReconnections(t *testing.T) {
2708+
// Start a Prometheus metrics server to expose buffer metrics
26912709
go func() {
2692-
log.Info("Starting Prometheus metrics at http://localhost:", 8800, "/metrics")
2710+
log.Info("Starting Prometheus metrics at http://localhost:", 8801, "/metrics")
26932711
http.Handle("/metrics", promhttp.Handler())
2694-
http.ListenAndServe(":"+strconv.Itoa(8800), nil)
2712+
http.ListenAndServe(":8801", nil)
26952713
}()
26962714
topicName := newTopicName()
26972715

2716+
// Create multiple producers and send messages to generate sending buffers
26982717
for i := 0; i < 10; i++ {
26992718
topicName = topicName + strconv.Itoa(i)
27002719
client, err := NewClient(ClientOptions{
27012720
URL: serviceURL,
2702-
MaxConnectionsPerBroker: 10,
2721+
MaxConnectionsPerBroker: 1,
27032722
OperationTimeout: 3 * time.Second,
27042723
})
27052724
assert.NoError(t, err)
@@ -2712,24 +2731,31 @@ func TestReconnect(t *testing.T) {
27122731
})
27132732
assert.NoError(t, err)
27142733

2734+
// Send many messages asynchronously without waiting for completion
2735+
// This generates a lot of sending buffers that need to be cleaned up
27152736
for j := 0; j < 1000; j++ {
27162737
p.SendAsync(t.Context(), &ProducerMessage{
27172738
Payload: []byte("test"),
27182739
}, nil)
27192740
}
2741+
// Intentionally not wait for the send result to generate a lot of sending buffers
27202742

27212743
p.Close()
2722-
client.Close()
2723-
2744+
client.Close() // Close the client to trigger cleanup of sending buffers in the connection
27242745
}
27252746

2726-
cc, _ := NewClient(ClientOptions{
2747+
// Start a client to expose the metrics
2748+
c, _ := NewClient(ClientOptions{
27272749
URL: serviceURL,
2728-
MaxConnectionsPerBroker: 10,
2750+
MaxConnectionsPerBroker: 1,
27292751
})
27302752

2731-
time.Sleep(1 * time.Hour)
2753+
time.Sleep(1 * time.Second)
27322754

2733-
cc.Close()
2755+
// Verify that all sending buffers have been cleaned up
2756+
sendingBuffersCbt, err := getSendingBuffersCount()
2757+
assert.NoError(t, err)
2758+
assert.Equal(t, float64(0), sendingBuffersCbt, "Expected no sending buffers after closing the client")
27342759

2760+
c.Close()
27352761
}

0 commit comments

Comments
 (0)