Skip to content

Commit 9e002a6

Browse files
authored
Pool gzip writers to reduce RabbitMQ message compression allocations (#3103)
* use sync.Pool for gzip writers to avoid memory hogging * comments * fix deadlock failure * copilot suggestion * fix formatting * use asserts * PR comments
1 parent c39743a commit 9e002a6

File tree

4 files changed

+295
-21
lines changed

4 files changed

+295
-21
lines changed
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
name: go-benchmarks
2+
on:
3+
pull_request:
4+
paths:
5+
- "**.go"
6+
- "go.mod"
7+
- "go.sum"
8+
9+
concurrency:
10+
group: go-benchmarks-${{ github.event.pull_request.number }}
11+
cancel-in-progress: true
12+
13+
jobs:
14+
benchmark:
15+
runs-on: ubicloud-standard-8
16+
timeout-minutes: 15
17+
permissions:
18+
pull-requests: write
19+
steps:
20+
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
21+
with:
22+
fetch-depth: 0
23+
24+
- name: Setup Go
25+
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
26+
with:
27+
go-version: "1.25"
28+
29+
- name: Install benchstat
30+
run: go install golang.org/x/perf/cmd/benchstat@latest
31+
32+
- name: Find packages with benchmark tests
33+
id: packages
34+
run: |
35+
# Find Go packages that contain Benchmark functions in _test.go files
36+
CHANGED_DIRS=$(git diff --name-only ${{ github.event.pull_request.base.sha }}..HEAD -- '*.go' | xargs -I{} dirname {} | sort -u)
37+
BENCH_PKGS=""
38+
for dir in $CHANGED_DIRS; do
39+
if grep -rq '^func Benchmark' "$dir"/*_test.go 2>/dev/null; then
40+
BENCH_PKGS="$BENCH_PKGS ./$dir"
41+
fi
42+
done
43+
if [ -z "$BENCH_PKGS" ]; then
44+
echo "found=false" >> "$GITHUB_OUTPUT"
45+
else
46+
echo "found=true" >> "$GITHUB_OUTPUT"
47+
echo "pkgs=$BENCH_PKGS" >> "$GITHUB_OUTPUT"
48+
fi
49+
50+
- name: Run benchmarks on PR branch
51+
if: steps.packages.outputs.found == 'true'
52+
run: go test -run='^$' -bench=. -benchmem -count=6 -timeout=10m ${{ steps.packages.outputs.pkgs }} 2>/dev/null | tee /tmp/new.txt
53+
54+
- name: Run benchmarks on base branch
55+
if: steps.packages.outputs.found == 'true'
56+
run: |
57+
git checkout ${{ github.event.pull_request.base.sha }}
58+
go test -run='^$' -bench=. -benchmem -count=6 -timeout=10m ${{ steps.packages.outputs.pkgs }} 2>/dev/null | tee /tmp/old.txt
59+
git checkout ${{ github.event.pull_request.head.sha }}
60+
61+
- name: Compare with benchstat
62+
if: steps.packages.outputs.found == 'true'
63+
id: benchstat
64+
run: |
65+
RESULT=$(benchstat /tmp/old.txt /tmp/new.txt 2>&1 || true)
66+
if [ -z "$RESULT" ]; then
67+
echo "empty=true" >> "$GITHUB_OUTPUT"
68+
else
69+
echo "empty=false" >> "$GITHUB_OUTPUT"
70+
{
71+
echo "result<<BENCHSTAT_EOF"
72+
echo "$RESULT"
73+
echo "BENCHSTAT_EOF"
74+
} >> "$GITHUB_OUTPUT"
75+
fi
76+
77+
- name: Post results as PR comment
78+
if: steps.packages.outputs.found == 'true' && steps.benchstat.outputs.empty == 'false'
79+
uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1
80+
with:
81+
script: |
82+
const result = `${{ steps.benchstat.outputs.result }}`;
83+
const lines = [
84+
'## Benchmark results',
85+
'',
86+
'```',
87+
result.trim(),
88+
'```',
89+
'',
90+
`<sub>Compared against \`${{ github.event.pull_request.base.ref }}\` (${`${{ github.event.pull_request.base.sha }}`.slice(0, 8)})</sub>`,
91+
];
92+
const body = lines.join('\n');
93+
94+
const { data: comments } = await github.rest.issues.listComments({
95+
owner: context.repo.owner,
96+
repo: context.repo.repo,
97+
issue_number: context.issue.number,
98+
});
99+
100+
const existing = comments.find(c =>
101+
c.user.type === 'Bot' && c.body.startsWith('## Benchmark results')
102+
);
103+
104+
if (existing) {
105+
await github.rest.issues.updateComment({
106+
owner: context.repo.owner,
107+
repo: context.repo.repo,
108+
comment_id: existing.id,
109+
body,
110+
});
111+
} else {
112+
await github.rest.issues.createComment({
113+
owner: context.repo.owner,
114+
repo: context.repo.repo,
115+
issue_number: context.issue.number,
116+
body,
117+
});
118+
}

.github/workflows/test.yml

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -547,32 +547,51 @@ jobs:
547547
- name: Add go-deadlock dependency
548548
run: go get github.com/sasha-s/go-deadlock@v0.3.6
549549

550-
- name: Patch sync imports to use go-deadlock (sed)
550+
- name: Patch sync.Mutex/RWMutex to use go-deadlock
551551
shell: bash
552552
run: |
553553
set -euo pipefail
554554
555-
# Replace ONLY the stdlib "sync" import with an alias that preserves `sync.X` call sites.
556-
# - `import "sync"` -> `import sync "github.com/sasha-s/go-deadlock"`
557-
# - within import blocks: `"sync"` -> `sync "github.com/sasha-s/go-deadlock"`
558-
# NOTE: use `-i''` (no backup) for portability across GNU/BSD sed.
559-
find . -name '*.go' -not -path './vendor/*' -print0 | xargs -0 sed -i'' -E \
560-
-e 's/^([[:space:]]*)import[[:space:]]+"sync"[[:space:]]*$/\1import sync "github.com\/sasha-s\/go-deadlock"/' \
561-
-e 's/^([[:space:]]*)"sync"[[:space:]]*$/\1sync "github.com\/sasha-s\/go-deadlock"/'
555+
# Find only files that actually use sync.Mutex or sync.RWMutex.
556+
grep -rl --include='*.go' -E 'sync\.(RW)?Mutex' . | grep -v '/vendor/' > /tmp/mutex_files.txt || true
562557
563-
# Keep formatting/import grouping consistent after rewriting.
564-
find . -name '*.go' -not -path './vendor/*' -print0 | xargs -0 gofmt -w
558+
if [ ! -s /tmp/mutex_files.txt ]; then
559+
echo "No files with sync.Mutex or sync.RWMutex found"
560+
exit 0
561+
fi
562+
563+
echo "Patching $(wc -l < /tmp/mutex_files.txt) file(s):"
564+
cat /tmp/mutex_files.txt
565+
566+
while IFS= read -r f; do
567+
# Rewrite type references. RWMutex first so its 'Mutex' suffix isn't clobbered.
568+
sed -i'' -E \
569+
-e 's/sync\.RWMutex/deadlocksync.RWMutex/g' \
570+
-e 's/sync\.Mutex/deadlocksync.Mutex/g' \
571+
"$f"
572+
573+
# \bsync\. won't match inside "deadlocksync.", so this correctly detects
574+
# remaining usages of other sync primitives (WaitGroup, Once, Map, etc.).
575+
if grep -qE '\bsync\.' "$f"; then
576+
# File still needs the sync package; insert deadlocksync import alongside it.
577+
awk '
578+
/^\t"sync"$/ { print; print "\tdeadlocksync \"github.com/sasha-s/go-deadlock\""; next }
579+
{ print }
580+
' "$f" > "${f}.tmp" && mv "${f}.tmp" "$f"
581+
else
582+
# sync package no longer needed; replace its import with the deadlocksync alias.
583+
sed -i'' \
584+
-e 's|^\t"sync"$|\tdeadlocksync "github.com/sasha-s/go-deadlock"|' \
585+
-e 's|^import "sync"$|import deadlocksync "github.com/sasha-s/go-deadlock"|' \
586+
"$f"
587+
fi
588+
done < /tmp/mutex_files.txt
589+
590+
xargs gofmt -w < /tmp/mutex_files.txt
565591
566-
# Evidence in CI logs that rewriting happened (or not).
567592
echo "Changed Go files (after patch):"
568593
git diff --name-only -- '*.go' || true
569594
570-
echo ""
571-
echo "Contents of pkg/scheduling/v1/scheduler.go after patch:"
572-
echo "----"
573-
cat pkg/scheduling/v1/scheduler.go
574-
echo "----"
575-
576595
- name: Test (deadlock-instrumented)
577596
run: |
578597
# Disable gzip compression for load tests to reduce CPU overhead

internal/msgqueue/rabbitmq/gzip.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"compress/gzip"
66
"fmt"
77
"io"
8+
9+
"sync"
810
)
911

1012
type CompressionResult struct {
@@ -17,6 +19,17 @@ type CompressionResult struct {
1719
CompressionRatio float64
1820
}
1921

22+
// gzipWriterPool reuses gzip.Writer instances to avoid repeated allocations.
23+
// No explicit size cap is needed: sync.Pool is self-limiting because the Go
24+
// runtime evicts pooled objects during GC, so the pool cannot grow unbounded.
25+
// In practice the pool size is also bounded by the number of goroutines
26+
// concurrently compressing, which is small for a RabbitMQ publish path.
27+
var gzipWriterPool = sync.Pool{
28+
New: func() any {
29+
return gzip.NewWriter(nil)
30+
},
31+
}
32+
2033
func getPayloadSize(payloads [][]byte) int {
2134
totalSize := 0
2235
for _, payload := range payloads {
@@ -53,17 +66,21 @@ func (t *MessageQueueImpl) compressPayloads(payloads [][]byte) (*CompressionResu
5366

5467
for i, payload := range payloads {
5568
var buf bytes.Buffer
56-
gzipWriter := gzip.NewWriter(&buf)
5769

58-
if _, err := gzipWriter.Write(payload); err != nil {
59-
gzipWriter.Close()
70+
w := gzipWriterPool.Get().(*gzip.Writer)
71+
w.Reset(&buf)
72+
73+
if _, err := w.Write(payload); err != nil {
74+
w.Close()
6075
return nil, fmt.Errorf("failed to write to gzip writer: %w", err)
6176
}
6277

63-
if err := gzipWriter.Close(); err != nil {
78+
if err := w.Close(); err != nil {
6479
return nil, fmt.Errorf("failed to close gzip writer: %w", err)
6580
}
6681

82+
gzipWriterPool.Put(w)
83+
6784
compressed[i] = buf.Bytes()
6885
compressedSize += len(compressed[i])
6986
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package rabbitmq
2+
3+
import (
4+
"bytes"
5+
"crypto/rand"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func generatePayloads(count, size int) [][]byte {
12+
payloads := make([][]byte, count)
13+
14+
for i := range payloads {
15+
data := make([]byte, size)
16+
17+
_, err := rand.Read(data)
18+
if err != nil {
19+
panic(err)
20+
}
21+
22+
payloads[i] = data
23+
}
24+
25+
return payloads
26+
}
27+
28+
func newMQ() *MessageQueueImpl {
29+
return &MessageQueueImpl{
30+
compressionEnabled: true,
31+
compressionThreshold: 0,
32+
}
33+
}
34+
35+
func TestCompressDecompressRoundtrip(t *testing.T) {
36+
mq := newMQ()
37+
payloads := generatePayloads(5, 10*1024)
38+
result, err := mq.compressPayloads(payloads)
39+
40+
assert.NoError(t, err)
41+
assert.True(t, result.WasCompressed, "expected WasCompressed to be true")
42+
assert.Equal(t, len(payloads), len(result.Payloads), "expected %d payloads, got %d", len(payloads), len(result.Payloads))
43+
44+
decompressed, err := mq.decompressPayloads(result.Payloads)
45+
46+
assert.NoError(t, err)
47+
48+
for i := range payloads {
49+
assert.Equal(t, len(payloads[i]), len(decompressed[i]), "payload %d: expected len %d, got %d", i, len(payloads[i]), len(decompressed[i]))
50+
assert.True(t, bytes.Equal(decompressed[i], payloads[i]), "payload %d: decompressed payload does not match original payload", i)
51+
}
52+
}
53+
54+
func TestCompressPayloadsDisabled(t *testing.T) {
55+
mq := &MessageQueueImpl{
56+
compressionEnabled: false,
57+
compressionThreshold: 0,
58+
}
59+
60+
payloads := generatePayloads(3, 1024)
61+
result, err := mq.compressPayloads(payloads)
62+
63+
assert.NoError(t, err)
64+
assert.False(t, result.WasCompressed, "expected WasCompressed to be false when compression is disabled")
65+
}
66+
67+
func TestCompressPayloadsBelowThreshold(t *testing.T) {
68+
mq := &MessageQueueImpl{
69+
compressionEnabled: true,
70+
compressionThreshold: 100 * 1024,
71+
}
72+
73+
payloads := generatePayloads(1, 1024)
74+
result, err := mq.compressPayloads(payloads)
75+
76+
assert.NoError(t, err)
77+
assert.False(t, result.WasCompressed, "expected WasCompressed to be false when below threshold")
78+
}
79+
80+
func BenchmarkCompressPayloads_1x10KiB(b *testing.B) {
81+
mq := newMQ()
82+
payloads := generatePayloads(1, 10*1024)
83+
b.ResetTimer()
84+
b.ReportAllocs()
85+
for i := 0; i < b.N; i++ {
86+
_, _ = mq.compressPayloads(payloads)
87+
}
88+
}
89+
90+
func BenchmarkCompressPayloads_10x10KiB(b *testing.B) {
91+
mq := newMQ()
92+
payloads := generatePayloads(10, 10*1024)
93+
b.ResetTimer()
94+
b.ReportAllocs()
95+
for i := 0; i < b.N; i++ {
96+
_, _ = mq.compressPayloads(payloads)
97+
}
98+
}
99+
100+
func BenchmarkCompressPayloads_10x100KiB(b *testing.B) {
101+
mq := newMQ()
102+
payloads := generatePayloads(10, 100*1024)
103+
b.ResetTimer()
104+
b.ReportAllocs()
105+
for i := 0; i < b.N; i++ {
106+
_, _ = mq.compressPayloads(payloads)
107+
}
108+
}
109+
110+
func BenchmarkCompressPayloads_Concurrent(b *testing.B) {
111+
mq := newMQ()
112+
payloads := generatePayloads(5, 10*1024)
113+
b.ResetTimer()
114+
b.ReportAllocs()
115+
b.RunParallel(func(pb *testing.PB) {
116+
for pb.Next() {
117+
_, _ = mq.compressPayloads(payloads)
118+
}
119+
})
120+
}

0 commit comments

Comments
 (0)