Skip to content

Commit d4de2d2

Browse files
committed
Remove TestSendingBuffersCleanupAfterMultipleReconnections due to the flaky
1 parent a64593d commit d4de2d2

File tree

1 file changed

+0
-58
lines changed

1 file changed

+0
-58
lines changed

pulsar/producer_test.go

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ import (
3131
"testing"
3232
"time"
3333

34-
"github.com/prometheus/client_golang/prometheus/promhttp"
35-
3634
"github.com/stretchr/testify/require"
3735
"github.com/testcontainers/testcontainers-go"
3836
"github.com/testcontainers/testcontainers-go/wait"
@@ -2705,59 +2703,3 @@ func getSendingBuffersCount() (float64, error) {
27052703

27062704
return 0, fmt.Errorf("sending_buffers_count metric not found")
27072705
}
2708-
2709-
func TestSendingBuffersCleanupAfterMultipleReconnections(t *testing.T) {
2710-
// Start a Prometheus metrics server to expose buffer metrics
2711-
go func() {
2712-
log.Info("Starting Prometheus metrics at http://localhost:", 8801, "/metrics")
2713-
http.Handle("/metrics", promhttp.Handler())
2714-
http.ListenAndServe(":8801", nil)
2715-
}()
2716-
topicName := newTopicName()
2717-
2718-
// Create multiple producers and send messages to generate sending buffers
2719-
for i := 0; i < 10; i++ {
2720-
topicName = topicName + strconv.Itoa(i)
2721-
client, err := NewClient(ClientOptions{
2722-
URL: serviceURL,
2723-
MaxConnectionsPerBroker: 1,
2724-
OperationTimeout: 3 * time.Second,
2725-
})
2726-
assert.NoError(t, err)
2727-
2728-
reconnectNum := uint(1)
2729-
p, err := client.CreateProducer(ProducerOptions{
2730-
Topic: topicName,
2731-
MaxReconnectToBroker: &reconnectNum,
2732-
BatchingMaxMessages: 10,
2733-
})
2734-
assert.NoError(t, err)
2735-
2736-
// Send many messages asynchronously without waiting for completion
2737-
// This generates a lot of sending buffers that need to be cleaned up
2738-
for j := 0; j < 1000; j++ {
2739-
p.SendAsync(context.Background(), &ProducerMessage{
2740-
Payload: []byte("test"),
2741-
}, nil)
2742-
}
2743-
// Intentionally not wait for the send result to generate a lot of sending buffers
2744-
2745-
p.Close()
2746-
client.Close() // Close the client to trigger cleanup of sending buffers in the connection
2747-
}
2748-
2749-
// Start a client to expose the metrics
2750-
c, _ := NewClient(ClientOptions{
2751-
URL: serviceURL,
2752-
MaxConnectionsPerBroker: 1,
2753-
})
2754-
2755-
time.Sleep(1 * time.Second)
2756-
2757-
// Verify that all sending buffers have been cleaned up
2758-
sendingBuffersCbt, err := getSendingBuffersCount()
2759-
assert.NoError(t, err)
2760-
assert.Equal(t, float64(0), sendingBuffersCbt, "Expected no sending buffers after closing the client")
2761-
2762-
c.Close()
2763-
}

0 commit comments

Comments
 (0)