Skip to content

Commit 1d8a0d3

Browse files
author
Daniel Ferstay
committed
[DefaultRouter] fix unnecessary system clock reads due to races accessing router state
Previously, we used atomic operations to read and update parts of the default router state. Unfortunately, the reads and updates could race under concurrent calls leading to unnecessary clock reads and an associated slowdown in performance. Now, we use atomic addition to increment the message count and batch size. This removes the race condition by ensuring that each go-routine will have a unique messageCount, and hence only one will perform the clock read. Furthermore, we use atomic compare-and-swap to ensure that partitions are not skipped if multiple go-routines attempt to increment the partition cursor. Signed-off-by: Daniel Ferstay <dferstay@splunk.com>
1 parent d5d4903 commit 1d8a0d3

File tree

2 files changed

+34
-29
lines changed

2 files changed

+34
-29
lines changed

pulsar/default_router.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package pulsar
1919

2020
import (
21-
"math"
2221
"math/rand"
2322
"sync/atomic"
2423
"time"
@@ -27,7 +26,7 @@ import (
2726
type defaultRouter struct {
2827
currentPartitionCursor uint32
2928

30-
lastChangeTimestamp int64
29+
lastBatchTimestamp int64
3130
msgCounter uint32
3231
cumulativeBatchSize uint32
3332
}
@@ -45,7 +44,7 @@ func NewDefaultRouter(
4544
disableBatching bool) func(*ProducerMessage, uint32) int {
4645
state := &defaultRouter{
4746
currentPartitionCursor: rand.Uint32(),
48-
lastChangeTimestamp: math.MinInt64,
47+
lastBatchTimestamp: time.Now().UnixNano(),
4948
}
5049

5150
readClockAfterNumMessages := uint32(maxBatchingMessages / 10)
@@ -75,37 +74,38 @@ func NewDefaultRouter(
7574
// If there's no key, we do round-robin across partition, sticking with a given
7675
// partition for a certain amount of messages or volume buffered or the max delay to batch is reached so that
7776
// we ensure having a decent amount of batching of the messages.
78-
// Note that it is possible that we skip more than one partition if multiple goroutines increment
79-
// currentPartitionCursor at the same time. If that happens it shouldn't be a problem because we only want to
80-
// spread the data on different partitions but not necessarily in a specific sequence.
8177
var now int64
8278
size := uint32(len(message.Payload))
83-
previousMessageCount := atomic.LoadUint32(&state.msgCounter)
84-
previousBatchingMaxSize := atomic.LoadUint32(&state.cumulativeBatchSize)
85-
previousLastChange := atomic.LoadInt64(&state.lastChangeTimestamp)
79+
partitionCursor := atomic.LoadUint32(&state.currentPartitionCursor)
80+
messageCount := atomic.AddUint32(&state.msgCounter, 1)
81+
batchSize := atomic.AddUint32(&state.cumulativeBatchSize, size)
8682

87-
messageCountReached := previousMessageCount >= uint32(maxBatchingMessages-1)
88-
sizeReached := (size >= uint32(maxBatchingSize)-previousBatchingMaxSize)
83+
// Note: use greater-than for the threshold check so that we don't route this message to a new partition
84+
// before a batch is complete.
85+
messageCountReached := messageCount > uint32(maxBatchingMessages)
86+
sizeReached := batchSize > uint32(maxBatchingSize)
8987
durationReached := false
90-
if readClockAfterNumMessages == 0 || previousMessageCount%readClockAfterNumMessages == 0 {
88+
if readClockAfterNumMessages == 0 || messageCount%readClockAfterNumMessages == 0 {
9189
now = time.Now().UnixNano()
92-
durationReached = now-previousLastChange >= maxBatchingDelay.Nanoseconds()
90+
lastBatchTime := atomic.LoadInt64(&state.lastBatchTimestamp)
91+
durationReached = now-lastBatchTime > maxBatchingDelay.Nanoseconds()
9392
}
9493
if messageCountReached || sizeReached || durationReached {
95-
atomic.AddUint32(&state.currentPartitionCursor, 1)
96-
atomic.StoreUint32(&state.msgCounter, 0)
97-
atomic.StoreUint32(&state.cumulativeBatchSize, 0)
98-
if now != 0 {
99-
atomic.StoreInt64(&state.lastChangeTimestamp, now)
94+
// Note: CAS to ensure that concurrent go-routines can only move the cursor forward by one so that
95+
// partitions are not skipped.
96+
newCursor := partitionCursor + 1
97+
if atomic.CompareAndSwapUint32(&state.currentPartitionCursor, partitionCursor, newCursor) {
98+
atomic.StoreUint32(&state.msgCounter, 0)
99+
atomic.StoreUint32(&state.cumulativeBatchSize, 0)
100+
if now == 0 {
101+
now = time.Now().UnixNano()
102+
}
103+
atomic.StoreInt64(&state.lastBatchTimestamp, now)
100104
}
101-
return int(state.currentPartitionCursor % numPartitions)
102-
}
103105

104-
atomic.AddUint32(&state.msgCounter, 1)
105-
atomic.AddUint32(&state.cumulativeBatchSize, size)
106-
if now != 0 {
107-
atomic.StoreInt64(&state.lastChangeTimestamp, now)
106+
return int(newCursor % numPartitions)
108107
}
109-
return int(state.currentPartitionCursor % numPartitions)
108+
109+
return int(partitionCursor % numPartitions)
110110
}
111111
}

pulsar/default_router_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,21 @@ func TestDefaultRouterRoutingBecauseMaxNumberOfMessagesReached(t *testing.T) {
7171
const numPartitions = uint32(3)
7272
p1 := router(&ProducerMessage{
7373
Payload: []byte("message 1"),
74-
}, 3)
74+
}, numPartitions)
7575
assert.LessOrEqual(t, p1, int(numPartitions))
7676

7777
p2 := router(&ProducerMessage{
7878
Payload: []byte("message 2"),
7979
}, numPartitions)
80-
if p1 == int(numPartitions-1) {
81-
assert.Equal(t, 0, p2)
80+
assert.Equal(t, p1, p2)
81+
82+
p3 := router(&ProducerMessage{
83+
Payload: []byte("message 3"),
84+
}, numPartitions)
85+
if p2 == int(numPartitions-1) {
86+
assert.Equal(t, 0, p3)
8287
} else {
83-
assert.Equal(t, p1+1, p2)
88+
assert.Equal(t, p2+1, p3)
8489
}
8590
}
8691

0 commit comments

Comments
 (0)