Skip to content

Commit 6cc6491

Browse files
committed
crsync: add sharded counters and general support for sharding
Add support for sharding with CPU locality and add sharded counters. See counters_test.go for benchmark results. Sharding has two implementations, one is be used with the CRDB Go runtime (specifically cockroachdb/go#6) and the `cockroach_go` build tag. We also add a script that can be used to easily run tests against the CRDB Go (along with a CI job).
1 parent 1094cb3 commit 6cc6491

File tree

7 files changed

+497
-6
lines changed

7 files changed

+497
-6
lines changed

.github/workflows/ci.yaml

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ on:
1212
jobs:
1313

1414
linux:
15-
name: go-linux
1615
strategy:
1716
matrix:
1817
go: ["1.22", "1.23"]
@@ -30,7 +29,6 @@ jobs:
3029
- run: go vet ./...
3130

3231
linux-32bit:
33-
name: go-linux-32bit
3432
strategy:
3533
matrix:
3634
go: ["1.22"]
@@ -47,7 +45,6 @@ jobs:
4745
- run: go test ./...
4846

4947
darwin:
50-
name: go-macos
5148
strategy:
5249
matrix:
5350
go: ["1.22"]
@@ -64,7 +61,6 @@ jobs:
6461
- run: go test ./...
6562

6663
linux-stress:
67-
name: go-linux-stress
6864
strategy:
6965
matrix:
7066
go: ["1.22"]
@@ -82,7 +78,6 @@ jobs:
8278
- run: go test ./... -exec 'stress -p 2 -maxruns 1000' -v
8379

8480
linux-stress-race:
85-
name: go-linux-stress-race
8681
strategy:
8782
matrix:
8883
go: ["1.22"]
@@ -98,3 +93,37 @@ jobs:
9893
- run: go install github.com/cockroachdb/stress@latest
9994
- run: go test -tags crlib_invariants ./... -race -exec 'stress -p 1 -maxruns 100' -v
10095
- run: go test ./... -race -exec 'stress -p 1 -maxruns 100' -v
96+
97+
linux-cockroach-go:
98+
runs-on: ubuntu-latest
99+
env:
100+
GO_BRANCH: cockroach-go1.23.12
101+
102+
steps:
103+
- uses: actions/checkout@v4
104+
105+
# Step 1: Fetch the branch tip SHA for cache key
106+
- name: Get cockroachdb/go commit hash
107+
id: go-sha
108+
run: |
109+
SHA=$(git ls-remote https://github.com/cockroachdb/go.git refs/heads/$GO_BRANCH | cut -f1)
110+
echo "GO_SHA=$SHA" >> $GITHUB_ENV
111+
112+
# Step 2: Restore cache (per branch + commit SHA)
113+
- name: Cache custom Go toolchain
114+
uses: actions/cache@v4
115+
with:
116+
path: ~/.cache/cockroachdb-go/${{ env.GO_SHA }}
117+
key: cockroachdb-${{ env.GO_SHA }}
118+
119+
# Step 3: Install bootstrap Go (needed to build fork)
120+
- name: Install bootstrap Go
121+
uses: actions/setup-go@v5
122+
with:
123+
go-version: "1.23.x"
124+
125+
# Step 4: Run tests with custom Go
126+
- run: ./scripts/run-tests-with-custom-go.sh ./...
127+
- run: ./scripts/run-tests-with-custom-go.sh -tags crlib_invariants ./...
128+
- run: ./scripts/run-tests-with-custom-go.sh -race ./...
129+

crsync/counters.go

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied. See the License for the specific language governing
13+
// permissions and limitations under the License.
14+
15+
package crsync
16+
17+
import (
18+
"sync/atomic"
19+
"unsafe"
20+
)
21+
22+
// Counter is a single logical counter backed by a sharded implementation
23+
// (Counters) under the hood.
24+
//
25+
// Properties:
26+
// - Thread-safe increments: Add() can be called concurrently from many
27+
// goroutines.
28+
// - Low write contention: Writes are sharded to minimize cache-line
29+
// ping‑pong.
30+
// - Simple reads: Get() aggregates across shards to return the current value.
31+
// - Construction: Use MakeCounter(). The zero value is NOT ready to use.
32+
// - Performance: Add is O(1) with low contention; Get is O(NumShards()).
33+
// - Consistency: Reads are best-effort snapshots without global locking. Each
34+
// shard is read atomically, but the aggregation is not linearizable with
35+
// respect to concurrent Add calls. This is typically acceptable for metrics
36+
// and counters.
37+
//
38+
// Example:
39+
//
40+
// c := MakeCounter()
41+
// c.Add(1)
42+
// c.Add(41)
43+
// fmt.Println(c.Get()) // 42
44+
type Counter struct {
45+
c Counters
46+
}
47+
48+
// MakeCounter initializes a new Counter.
49+
func MakeCounter() Counter {
50+
return Counter{
51+
c: MakeCounters(1),
52+
}
53+
}
54+
55+
// Add atomically adds delta to the counter. It is safe for concurrent use by
56+
// multiple goroutines; delta may be negative (decrement).
57+
//
58+
// Add is very efficient: a single atomic increment on a mostly uncontended
59+
// cache line.
60+
func (c *Counter) Add(delta int64) {
61+
c.c.Add(0, delta)
62+
}
63+
64+
// Get the current value of the counter.
65+
//
66+
// It safe to call Get() while there are concurrent Add() calls (but there is no
67+
// guarantee wrt which of those are reflected).
68+
//
69+
// Get is O(NumShards()) so it is more expensive than Add().
70+
func (c *Counter) Get() int64 {
71+
return c.c.Get(0)
72+
}
73+
74+
// Counters is a sharded set of logical counters that can be incremented
75+
// concurrently with low contention.
76+
//
77+
// Use when you need N independent counters that are updated from many
78+
// goroutines (e.g., metrics like hits/misses/errors, per-state tallies).
79+
//
80+
// Properties:
81+
// - Thread-safe increments: Add() can be called concurrently from many
82+
// goroutines.
83+
// - Low write contention: Writes are sharded to minimize cache-line
84+
// ping‑pong.
85+
// - Simple reads: Get() aggregates across shards to return the current value.
86+
// - Construction: Use MakeCounter(). The zero value is NOT ready to use.
87+
// - Performance: Add is O(1) with low contention; Get is O(NumShards());
88+
// - Consistency: Reads are best-effort snapshots without global locking. Each
89+
// shard is read atomically, but the aggregation is not linearizable with
90+
// respect to concurrent Add calls. This is typically acceptable for metrics
91+
// and counters.
92+
type Counters struct {
93+
numShards uint32
94+
// shardSize is the number of counters per shard.
95+
shardSize uint32
96+
counters []atomic.Int64
97+
numCounters int
98+
}
99+
100+
// Number of counters per cache line. We assume the typical 64-byte cache line.
101+
// Must be a power of 2.
102+
const countersPerCacheLine = 8
103+
104+
// MakeCounters creates a new Counters with the specified number of counters.
105+
func MakeCounters(numCounters int) Counters {
106+
return makeCounters(NumShards(), numCounters)
107+
}
108+
109+
func makeCounters(numShards, numCounters int) Counters {
110+
// Round up to the nearest cacheline size, to avoid false sharing.
111+
shardSize := (numCounters + countersPerCacheLine - 1) &^ (countersPerCacheLine - 1)
112+
counters := make([]atomic.Int64, shardSize*numShards+countersPerCacheLine)
113+
// Align the slice to a cache line.
114+
if r := (uintptr(unsafe.Pointer(&counters[0])) / 8) & (countersPerCacheLine - 1); r != 0 {
115+
counters = counters[countersPerCacheLine-r:]
116+
}
117+
return Counters{
118+
numShards: uint32(numShards),
119+
shardSize: uint32(shardSize),
120+
counters: counters,
121+
numCounters: numCounters,
122+
}
123+
}
124+
125+
// Add atomically adds delta to the specified counter. It is safe for concurrent
126+
// use by multiple goroutines; delta may be negative (decrement).
127+
//
128+
// Add is very efficient: a single atomic increment on a mostly uncontended
129+
// cache line.
130+
func (c *Counters) Add(counter int, delta int64) {
131+
shard := uint32(CPUBiasedInt()) % c.numShards
132+
c.counters[shard*c.shardSize+uint32(counter)].Add(delta)
133+
}
134+
135+
// Get the current value of the specified counter.
136+
//
137+
// It safe to call Get() while there are concurrent Add() calls (but there is no
138+
// guarantee wrt which of those are reflected).
139+
//
140+
// Get is O(NumShards()) so it is more expensive than Add().
141+
func (c *Counters) Get(counter int) int64 {
142+
var res int64
143+
for shard := range c.numShards {
144+
res += c.counters[shard*c.shardSize+uint32(counter)].Load()
145+
}
146+
return res
147+
}
148+
149+
// All returns the current values of all counters.
150+
// - Length of the returned slice equals the number of logical counters
151+
// passed to MakeCounters.
152+
// - Safe for concurrent use.
153+
// - Complexity is O(NumShards() * numCounters).
154+
// - Snapshot semantics: no ordering guarantees w.r.t. concurrent updates.
155+
func (c *Counters) All() []int64 {
156+
res := make([]int64, c.numCounters)
157+
for i := range c.numShards {
158+
start := int(i * c.shardSize)
159+
for j := range res {
160+
res[j] += c.counters[start+j].Load()
161+
}
162+
}
163+
return res
164+
}

crsync/counters_test.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied. See the License for the specific language governing
13+
// permissions and limitations under the License.
14+
15+
package crsync
16+
17+
import (
18+
"fmt"
19+
"math/rand/v2"
20+
"runtime"
21+
"sync"
22+
"sync/atomic"
23+
"testing"
24+
)
25+
26+
// BenchmarkCounters compares the performance of Counters against simple atomic
27+
// counters, and against sharded counters with 4*P shards and random shard
28+
// choice. There are two Counters versions (crsync and crync-cr), depending on
29+
// whether the CockroachDB Go runtime (and cockroach_go tag) is used.
30+
//
31+
// # Benchmark results
32+
//
33+
// ## Apple M1 Pro (10 core)
34+
//
35+
// benchmark simple randshards crsync crsync-cr
36+
// c=1/p=1-10 6.96ns ± 0% 9.69ns ± 0% 12.02ns ± 0% 7.34ns ± 1%
37+
// c=1/p=4-10 169ns ±10% 53ns ± 4% 21ns ±39% 13ns ±26%
38+
// c=1/p=10-10 752ns ± 3% 125ns ± 0% 51ns ±20% 51ns ± 7% *
39+
// c=1/p=40-10 3.04µs ± 2% 0.67µs ±13% 0.26µs ±31% 0.29µs ±16%
40+
// c=10/p=1-10 4.49ns ± 1% 9.72ns ± 0% 12.39ns ± 0% 4.95ns ± 0%
41+
// c=10/p=4-10 147ns ± 5% 49ns ± 3% 27ns ±33% 6ns ± 1%
42+
// c=10/p=10-10 790ns ± 6% 106ns ± 0% 47ns ±20% 8ns ± 4% *
43+
// c=10/p=40-10 3.24µs ± 2% 0.61µs ±11% 0.24µs ± 9% 0.11µs ±28%
44+
// c=100/p=1-10 4.33ns ± 0% 9.76ns ± 0% 12.41ns ± 0% 4.82ns ± 0%
45+
// c=100/p=4-10 73.9ns ± 4% 46.0ns ± 5% 21.9ns ±22% 6.2ns ± 6%
46+
// c=100/p=10-10 197ns ± 1% 94ns ±10% 53ns ±17% 11ns ± 1% *
47+
// c=100/p=40-10 893ns ± 6% 524ns ± 7% 249ns ±19% 125ns ± 8%
48+
// . * one worker per core
49+
//
50+
// ## Intel(R) Xeon(R) CPU @ 2.80GH (24 core, n2-custom-24-32768 on GCE)
51+
//
52+
// benchmark simple randshards crsync crsync-cr
53+
// c=1/p=1-24 14.1ns ± 0% 21.0ns ± 1% 37.7ns ± 0% 13.5ns ± 0%
54+
// c=1/p=4-24 92.9ns ± 1% 50.1ns ± 1% 63.8ns ±29% 13.3ns ± 0%
55+
// c=1/p=24-24 487ns ±18% 178ns ±105% 144ns ±39% 57ns ±60% *
56+
// c=1/p=96-24 1.84µs ± 2% 0.59µs ± 3% 0.52µs ± 6% 0.29µs ± 7%
57+
// c=10/p=1-24 13.8ns ± 0% 21.2ns ± 1% 38.0ns ± 1% 14.1ns ± 3%
58+
// c=10/p=4-24 91.0ns ± 3% 48.6ns ± 1% 63.8ns ±16% 14.0ns ± 2%
59+
// c=10/p=24-24 461ns ± 8% 176ns ±53% 146ns ±36% 110ns ±84% *
60+
// c=10/p=96-24 1.79µs ± 1% 0.55µs ± 8% 0.52µs ± 6% 0.31µs ± 5%
61+
// c=100/p=1-24 13.7ns ± 0% 22.0ns ± 2% 38.0ns ± 0% 14.1ns ±10%
62+
// c=100/p=4-24 63.5ns ± 1% 46.4ns ± 2% 66.7ns ±30% 14.2ns ± 5%
63+
// c=100/p=24-24 295ns ±27% 87ns ± 1% 121ns ±24% 44ns ±71% *
64+
// c=100/p=96-24 1.11µs ± 2% 0.53µs ± 4% 0.52µs ± 8% 0.31µs ± 5%
65+
// . * one worker per core
66+
func BenchmarkCounters(b *testing.B) {
67+
forEach := func(b *testing.B, fn func(b *testing.B, c, p int)) {
68+
for _, c := range []int{1, 10, 100} {
69+
for _, p := range []int{1, 4, runtime.GOMAXPROCS(0), 4 * runtime.GOMAXPROCS(0)} {
70+
b.Run(fmt.Sprintf("c=%d/p=%d", c, p), func(b *testing.B) {
71+
fn(b, c, p)
72+
})
73+
}
74+
}
75+
}
76+
77+
// simple uses non-sharded atomic counters.
78+
b.Run("simple", func(b *testing.B) {
79+
forEach(b, func(b *testing.B, c, p int) {
80+
counters := make([]atomic.Int64, c)
81+
incCounter := func(counter int) {
82+
counters[counter].Add(1)
83+
}
84+
runCountersBenchmark(b, c, p, incCounter)
85+
})
86+
})
87+
88+
// randshards uses a 4*N shards with random shard choice.
89+
b.Run("randshards", func(b *testing.B) {
90+
forEach(b, func(b *testing.B, c, p int) {
91+
counters := makeCounters(runtime.GOMAXPROCS(0)*4, c)
92+
incCounter := func(counter int) {
93+
shard := rand.Uint32N(counters.numShards)
94+
counters.counters[shard*counters.shardSize+uint32(counter)].Add(1)
95+
}
96+
runCountersBenchmark(b, c, p, incCounter)
97+
})
98+
})
99+
100+
name := "crsync"
101+
if usingCockroachGo {
102+
name += "-cr"
103+
}
104+
b.Run(name, func(b *testing.B) {
105+
forEach(b, func(b *testing.B, c, p int) {
106+
counters := MakeCounters(c)
107+
incCounter := func(counter int) {
108+
counters.Add(counter, 1)
109+
}
110+
runCountersBenchmark(b, c, p, incCounter)
111+
})
112+
})
113+
}
114+
115+
func runCountersBenchmark(
116+
b *testing.B, numCounters, parallelism int, incCounter func(counter int),
117+
) {
118+
const batchSize = 1000
119+
// Each element of ch corresponds to a batch of operations to be performed.
120+
ch := make(chan int, 1+b.N/batchSize)
121+
122+
var wg sync.WaitGroup
123+
for range parallelism {
124+
wg.Add(1)
125+
go func() {
126+
defer wg.Done()
127+
128+
rng := rand.New(rand.NewPCG(rand.Uint64(), rand.Uint64()))
129+
for numOps := range ch {
130+
for range numOps {
131+
incCounter(rng.IntN(numCounters))
132+
}
133+
}
134+
}()
135+
}
136+
137+
numOps := int64(b.N) * int64(parallelism)
138+
for i := int64(0); i < numOps; i += batchSize {
139+
ch <- int(min(batchSize, numOps-i))
140+
}
141+
close(ch)
142+
wg.Wait()
143+
}

0 commit comments

Comments
 (0)