Skip to content

Commit d99d2c0

Browse files
authored
[improve] modify the negativeACK structure to reduce memory overhead (#1410)
* improve: modify the structure of NegativeAckTracker to reduce memory overhead * fix TestNegativeAckPrecisionBitCnt bug * chore: formatting issues * fix: fix issues with TestConsumerNack * chore: revert changes to TestConsumerNack * fix: add defaultNackPrecisionBitVal * fix: revert changes to TestZeroQueueConsumer_Nack * add TestNackPrecisionBitDefaultBehavior * chore: update TestNegativeAckPrecisionBitCnt test * fix: make TestNegativeAckPrecisionBitCnt more stable
1 parent d85f096 commit d99d2c0

File tree

8 files changed

+224
-49
lines changed

8 files changed

+224
-49
lines changed

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ require (
66
github.com/99designs/keyring v1.2.1
77
github.com/AthenZ/athenz v1.12.13
88
github.com/DataDog/zstd v1.5.0
9-
github.com/bits-and-blooms/bitset v1.4.0
9+
github.com/RoaringBitmap/roaring/v2 v2.8.0
10+
github.com/bits-and-blooms/bitset v1.12.0
1011
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
1112
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
13+
github.com/emirpasic/gods v1.18.1
1214
github.com/docker/docker v28.0.0+incompatible
1315
github.com/docker/go-connections v0.5.0
1416
github.com/golang-jwt/jwt/v5 v5.2.2
@@ -78,6 +80,7 @@ require (
7880
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
7981
github.com/modern-go/reflect2 v1.0.2 // indirect
8082
github.com/morikuni/aec v1.0.0 // indirect
83+
github.com/mschoch/smat v0.2.0 // indirect
8184
github.com/mtibben/percent v0.2.1 // indirect
8285
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
8386
github.com/nxadm/tail v1.4.8 // indirect

go.sum

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo=
1414
github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
1515
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
1616
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
17+
github.com/RoaringBitmap/roaring/v2 v2.8.0 h1:y1rdtixfXvaITKzkfiKvScI0hlBJHe9sfzJp8cgeM7w=
18+
github.com/RoaringBitmap/roaring/v2 v2.8.0/go.mod h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0=
1719
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
1820
github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
1921
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
2022
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
21-
github.com/bits-and-blooms/bitset v1.4.0 h1:+YZ8ePm+He2pU3dZlIZiOeAKfrBkXi1lSrXJ/Xzgbu8=
22-
github.com/bits-and-blooms/bitset v1.4.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
23+
github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
24+
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
2325
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
2426
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
2527
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
@@ -53,6 +55,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
5355
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
5456
github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
5557
github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
58+
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
59+
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
5660
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
5761
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
5862
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@@ -154,6 +158,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
154158
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
155159
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
156160
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
161+
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
162+
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
157163
github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
158164
github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
159165
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
@@ -346,6 +352,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
346352
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
347353
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
348354
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
355+
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
349356
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
350357
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
351358
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=

pulsar/consumer.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,11 @@ type ConsumerOptions struct {
191191
// processed. Default is 1 min. (See `Consumer.Nack()`)
192192
NackRedeliveryDelay time.Duration
193193

194+
// NackPrecisionBit specifies the precision bit for nack redelivery delay.
195+
// This is used to trim the lower bits of the nack redelivery delay to reduce memory usage.
196+
// Default is 8 bits.
197+
NackPrecisionBit *int64
198+
194199
// Name specifies the consumer name.
195200
Name string
196201

pulsar/consumer_impl.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
118118
options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
119119
}
120120

121+
if options.NackPrecisionBit == nil {
122+
options.NackPrecisionBit = ptr(defaultNackPrecisionBit)
123+
} else if *options.NackPrecisionBit < 0 {
124+
return nil, newError(InvalidConfiguration, "NackPrecisionBit cannot be negative")
125+
}
126+
121127
// did the user pass in a message channel?
122128
messageCh := options.MessageChannel
123129
if options.MessageChannel == nil {
@@ -452,6 +458,7 @@ func newPartitionConsumerOpts(topic, consumerName string, idx int, options Consu
452458
receiverQueueSize: options.ReceiverQueueSize,
453459
nackRedeliveryDelay: nackRedeliveryDelay,
454460
nackBackoffPolicy: options.NackBackoffPolicy,
461+
nackPrecisionBit: options.NackPrecisionBit,
455462
metadata: options.Properties,
456463
subProperties: options.SubscriptionProperties,
457464
replicateSubscriptionState: options.ReplicateSubscriptionState,

pulsar/consumer_partition.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ type partitionConsumerOpts struct {
104104
autoReceiverQueueSize bool
105105
nackRedeliveryDelay time.Duration
106106
nackBackoffPolicy NackBackoffPolicy
107+
nackPrecisionBit *int64
107108
metadata map[string]string
108109
subProperties map[string]string
109110
replicateSubscriptionState bool
@@ -424,7 +425,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
424425

425426
pc.decryptor = decryptor
426427

427-
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)
428+
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log,
429+
options.nackPrecisionBit)
428430

429431
err := pc.grabConn("")
430432
if err != nil {

pulsar/consumer_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1225,6 +1225,102 @@ func TestConsumerNack(t *testing.T) {
12251225
}
12261226
}
12271227

1228+
func TestNegativeAckPrecisionBitCnt(t *testing.T) {
1229+
// Validate behavior across precision bits and default (nil -> 8)
1230+
const delay = 300 * time.Millisecond // Tracker scans every 100ms (delay/3)
1231+
ctx := context.Background()
1232+
1233+
client, err := NewClient(ClientOptions{URL: lookupURL})
1234+
assert.Nil(t, err)
1235+
defer client.Close()
1236+
1237+
// Helper to verify behavior for a given NackPrecisionBit and boundary bits.
1238+
testPrecisionBitBehavior := func(nackPrecisionBit *int64, boundaryBits int64) {
1239+
// Create topic, consumer and producer inside the function
1240+
topicName := fmt.Sprintf("testNackPrecisionBit-%d-%d", boundaryBits, time.Now().UnixNano())
1241+
consumer, err := client.Subscribe(ConsumerOptions{
1242+
Topic: topicName,
1243+
SubscriptionName: fmt.Sprintf("sub-%d", boundaryBits),
1244+
Type: Shared,
1245+
NackRedeliveryDelay: delay,
1246+
NackPrecisionBit: nackPrecisionBit, // can be nil for default behavior
1247+
})
1248+
assert.Nil(t, err)
1249+
defer consumer.Close()
1250+
1251+
producer, err := client.CreateProducer(ProducerOptions{Topic: topicName})
1252+
assert.Nil(t, err)
1253+
defer producer.Close()
1254+
1255+
// Align to the next window boundary based on boundaryBits
1256+
windowMs := int64(1) << boundaryBits
1257+
nowMs := time.Now().UnixMilli()
1258+
nextBoundaryMs := ((nowMs / windowMs) + 1) * windowMs // Next boundary
1259+
time.Sleep(time.Duration(nextBoundaryMs-nowMs) * time.Millisecond)
1260+
1261+
// Send first message at the boundary
1262+
content1 := fmt.Sprintf("msg1-p%d", boundaryBits)
1263+
_, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(content1)})
1264+
assert.Nil(t, err)
1265+
1266+
// Send second message around 3/4 into the window (still in same window)
1267+
time.Sleep(time.Duration(windowMs*3/4) * time.Millisecond)
1268+
content2 := fmt.Sprintf("msg2-p%d", boundaryBits)
1269+
_, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(content2)})
1270+
assert.Nil(t, err)
1271+
1272+
// Receive and nack both messages
1273+
m1, err := consumer.Receive(ctx)
1274+
assert.Nil(t, err)
1275+
assert.Equal(t, content1, string(m1.Payload()))
1276+
consumer.Nack(m1)
1277+
m2, err := consumer.Receive(ctx)
1278+
assert.Nil(t, err)
1279+
assert.Equal(t, content2, string(m2.Payload()))
1280+
consumer.Nack(m2)
1281+
1282+
// Expected redelivery window considering precision and tracker tick
1283+
expected := time.Now().Add(delay)
1284+
deviation := time.Duration(windowMs) * time.Millisecond
1285+
1286+
// Both should be redelivered in the same cycle
1287+
rm1, err := consumer.Receive(ctx)
1288+
assert.Nil(t, err)
1289+
redeliveryTime1 := time.Now()
1290+
rm2, err := consumer.Receive(ctx)
1291+
assert.Nil(t, err)
1292+
redeliveryTime2 := time.Now()
1293+
1294+
// For both the default precision (nil) and precisionBit=8, boundaryBits is 8.
1295+
// This checks that the default precisionBit is correctly set to 8,
1296+
// and that its redelivery behavior matches a consumer explicitly configured with precisionBit=8.
1297+
if boundaryBits == 8 {
1298+
assert.InDelta(t, redeliveryTime1.UnixMilli(), redeliveryTime2.UnixMilli(), 1)
1299+
}
1300+
1301+
// Redelivery should occur within [expected-window, expected+buffer]
1302+
minExpected := expected.Add(-deviation)
1303+
maxExpected := expected.Add(150 * time.Millisecond)
1304+
assert.GreaterOrEqual(t, redeliveryTime1.UnixMilli(), minExpected.UnixMilli())
1305+
assert.LessOrEqual(t, redeliveryTime2.UnixMilli(), maxExpected.UnixMilli())
1306+
1307+
consumer.Ack(rm1)
1308+
consumer.Ack(rm2)
1309+
}
1310+
1311+
// Run for precision bits 1...8 with matching boundary bits
1312+
for bits := int64(1); bits <= int64(8); bits++ {
1313+
t.Run(fmt.Sprintf("PrecisionBits=%d", bits), func(_ *testing.T) {
1314+
testPrecisionBitBehavior(ptr(bits), bits)
1315+
})
1316+
}
1317+
1318+
// Default behavior (nil) should match precision bit 8
1319+
t.Run("DefaultPrecisionBits=8", func(_ *testing.T) {
1320+
testPrecisionBitBehavior(nil, int64(8))
1321+
})
1322+
}
1323+
12281324
func TestConsumerCompression(t *testing.T) {
12291325
client, err := NewClient(ClientOptions{
12301326
URL: lookupURL,

0 commit comments

Comments
 (0)