Skip to content

Commit 50458f1

Browse files
williamhbakerclaude
andcommitted
broker: limit concurrent primary fragment persists
Add a semaphore to Persister that bounds concurrent primary persist goroutines. When many journals share an epoch-aligned flush_interval, they all roll their fragments simultaneously, spawning a potentially large number of concurrent cloud uploads which may allocate significant per-upload buffers and cause memory issues. This semaphore will prevent unbounded memory growth when persisting many journals. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a070522 commit 50458f1

File tree

1 file changed

+9
-3
lines changed

1 file changed

+9
-3
lines changed

broker/fragment/persister.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@ import (
1212
)
1313

1414
const (
15-
persistInterval = time.Minute
15+
persistInterval = time.Minute
16+
maxConcurrentPersists = 32
1617
)
1718

1819
// Persister asynchronously persists completed Fragments to their backing pb.FragmentStore.
1920
type Persister struct {
2021
qA, qB, qC []Spool
2122
mu sync.Mutex
2223
doneCh chan struct{}
24+
sem chan struct{}
2325
ks *keyspace.KeySpace
2426
ticker *time.Ticker
2527
persistFn func(context.Context, Spool, *pb.JournalSpec, bool) error
@@ -29,6 +31,7 @@ type Persister struct {
2931
func NewPersister(ks *keyspace.KeySpace) *Persister {
3032
return &Persister{
3133
doneCh: make(chan struct{}),
34+
sem: make(chan struct{}, maxConcurrentPersists),
3235
ks: ks,
3336
persistFn: Persist,
3437
}
@@ -42,8 +45,11 @@ func (p *Persister) SpoolComplete(spool Spool, primary bool) {
4245
}
4346

4447
if primary {
45-
// Attempt to immediately persist the Spool.
46-
go p.attemptPersist(spool, false /* not exiting */)
48+
go func() {
49+
p.sem <- struct{}{}
50+
defer func() { <-p.sem }()
51+
p.attemptPersist(spool, false /* not exiting */)
52+
}()
4753
} else {
4854
p.queue(spool)
4955
}

0 commit comments

Comments
 (0)