Skip to content

Commit aca0a53

Browse files
MagicalTuxclaude
andcommitted
Optimize Batch with double-buffering and slice pooling
- Add overflow buffer for improved batching when pending is full - Add slice pooling (acquireSlice/releaseSlice) to reduce allocations - Fix deadlock: promote pending when current=nil to prevent orphaned batches - Add comprehensive tests and benchmarks for Batch - Consolidate Makefile test target with race detector Benchmarks show 0 allocs/op for parallel operations and 1 alloc/op for sequential (down from 2). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 0ae7d88 commit aca0a53

File tree

3 files changed

+449
-77
lines changed

3 files changed

+449
-77
lines changed

Makefile

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
GOROOT:=$(shell PATH="/pkg/main/dev-lang.go.dev/bin:$$PATH" go env GOROOT)
33
GOPATH:=$(shell $(GOROOT)/bin/go env GOPATH)
44

5-
.PHONY: test deps race
5+
.PHONY: test deps
66

77
all:
88
$(GOPATH)/bin/goimports -w -l .
@@ -12,7 +12,4 @@ deps:
1212
$(GOROOT)/bin/go get -v -t .
1313

1414
test:
15-
$(GOROOT)/bin/go test -v
16-
17-
race:
18-
$(GOROOT)/bin/go test -v -race
15+
$(GOROOT)/bin/go test -v -race ./...

batch.go

Lines changed: 146 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package unison
22

33
import (
4+
"runtime"
45
"runtime/debug"
56
"sync"
67
)
@@ -17,11 +18,13 @@ type batchCall[T any] struct {
1718
// Any calls that arrive while execution is in progress are collected and processed
1819
// together in the next batch once the current execution completes.
1920
type Batch[T any] struct {
20-
mu sync.Mutex
21-
fn func([]T) error
22-
current *batchCall[T]
23-
pending *batchCall[T]
24-
MaxSize int // Maximum batch size, 0 means unlimited
21+
mu sync.Mutex
22+
fn func([]T) error
23+
current *batchCall[T]
24+
pending *batchCall[T]
25+
overflow *batchCall[T] // second buffer when pending is full
26+
spareSlice []T // pooled slice for reuse
27+
MaxSize int // Maximum batch size, 0 means unlimited
2528
}
2629

2730
// NewBatch creates a new Batch with the given processing function.
@@ -30,6 +33,28 @@ func NewBatch[T any](fn func([]T) error) *Batch[T] {
3033
return &Batch[T]{fn: fn}
3134
}
3235

36+
// acquireSlice gets a slice from the pool or returns nil.
37+
// Must be called with b.mu held.
38+
func (b *Batch[T]) acquireSlice() []T {
39+
if b.spareSlice != nil {
40+
s := b.spareSlice[:0]
41+
b.spareSlice = nil
42+
return s
43+
}
44+
return nil
45+
}
46+
47+
// releaseSlice returns a slice to the pool for reuse.
48+
// Must be called with b.mu held.
49+
func (b *Batch[T]) releaseSlice(s []T) {
50+
// Only keep if spare is empty and capacity is reasonable
51+
if b.spareSlice == nil && cap(s) > 0 && cap(s) <= 1024 {
52+
// Clear slice elements to allow GC of referenced values
53+
clear(s[:cap(s)])
54+
b.spareSlice = s[:0]
55+
}
56+
}
57+
3358
// Do adds a value to be processed and waits for the batch to complete.
3459
// All callers in the same batch receive the same error result.
3560
// If MaxSize is set and the pending batch is full, the caller blocks until
@@ -38,58 +63,137 @@ func (b *Batch[T]) Do(value T) error {
3863
b.mu.Lock()
3964

4065
for {
41-
if b.current == nil {
42-
// No current execution, start one immediately
43-
c := &batchCall[T]{values: []T{value}}
44-
c.wg.Add(1)
45-
b.current = c
46-
b.mu.Unlock()
47-
b.run(c)
48-
return c.err
49-
}
66+
// Current is running, try to add to pending or overflow
67+
if b.current != nil {
68+
// Try pending first
69+
p := b.pending
70+
if p == nil || b.MaxSize == 0 || len(p.values) < b.MaxSize {
71+
if p == nil {
72+
p = &batchCall[T]{values: b.acquireSlice()}
73+
p.wg.Add(1)
74+
b.pending = p
75+
}
76+
p.values = append(p.values, value)
77+
myBatch := p
78+
c := b.current
79+
b.mu.Unlock()
80+
81+
// Wait for current to complete
82+
c.wg.Wait()
83+
84+
// Try to become the runner for pending
85+
b.mu.Lock()
86+
if b.pending == myBatch && b.current == nil {
87+
b.pending = b.overflow
88+
b.overflow = nil
89+
b.current = myBatch
90+
b.mu.Unlock()
91+
b.run(myBatch)
92+
return myBatch.err
93+
}
94+
b.mu.Unlock()
95+
96+
// Someone else is running it
97+
myBatch.wg.Wait()
98+
return myBatch.err
99+
}
100+
101+
// Pending full, try overflow
102+
o := b.overflow
103+
if o == nil || b.MaxSize == 0 || len(o.values) < b.MaxSize {
104+
if o == nil {
105+
o = &batchCall[T]{values: b.acquireSlice()}
106+
o.wg.Add(1)
107+
b.overflow = o
108+
}
109+
o.values = append(o.values, value)
110+
myBatch := o
111+
c := b.current
112+
p := b.pending
113+
b.mu.Unlock()
50114

51-
// Current is running, check if we can add to pending
52-
p := b.pending
53-
canAdd := p == nil || b.MaxSize == 0 || len(p.values) < b.MaxSize
115+
// Wait for current to complete
116+
c.wg.Wait()
117+
// Wait for pending to complete (our overflow becomes pending after that)
118+
p.wg.Wait()
119+
120+
// Try to become the runner for our batch (now pending)
121+
b.mu.Lock()
122+
if b.pending == myBatch && b.current == nil {
123+
b.pending = b.overflow
124+
b.overflow = nil
125+
b.current = myBatch
126+
b.mu.Unlock()
127+
b.run(myBatch)
128+
return myBatch.err
129+
}
130+
b.mu.Unlock()
54131

55-
if canAdd {
56-
if p == nil {
57-
p = &batchCall[T]{}
58-
p.wg.Add(1)
59-
b.pending = p
132+
// Someone else is running it
133+
myBatch.wg.Wait()
134+
return myBatch.err
60135
}
61-
p.values = append(p.values, value)
136+
137+
// Both full, wait for current to complete then retry
62138
c := b.current
63139
b.mu.Unlock()
64-
65-
// Wait for current batch to finish
66140
c.wg.Wait()
67-
68-
// Try to become the runner for the pending batch
141+
runtime.Gosched() // Let others wake up too
69142
b.mu.Lock()
70-
if b.pending == p {
71-
b.pending = nil
72-
b.current = p
143+
continue
144+
}
145+
146+
// No current execution
147+
// First check if there's a pending batch to promote
148+
if b.pending != nil {
149+
c := b.pending
150+
// Try to add our value to this batch
151+
if b.MaxSize == 0 || len(c.values) < b.MaxSize {
152+
c.values = append(c.values, value)
153+
b.pending = b.overflow
154+
b.overflow = nil
155+
b.current = c
73156
b.mu.Unlock()
74-
b.run(p)
75-
return p.err
157+
b.run(c)
158+
return c.err
76159
}
77-
b.mu.Unlock()
78160

79-
// Someone else is running it, wait for completion
80-
p.wg.Wait()
81-
return p.err
161+
// Batch is full, create new pending for our value
162+
np := &batchCall[T]{values: b.acquireSlice()}
163+
np.values = append(np.values, value)
164+
np.wg.Add(1)
165+
166+
// Promote pending to current
167+
b.pending = b.overflow
168+
b.overflow = nil
169+
b.current = c
170+
171+
// Place our new batch in the queue
172+
if b.pending == nil {
173+
b.pending = np
174+
} else {
175+
b.overflow = np
176+
}
177+
178+
myBatch := np
179+
b.mu.Unlock()
180+
b.run(c)
181+
myBatch.wg.Wait()
182+
return myBatch.err
82183
}
83184

84-
// Pending is full, wait for current to finish and retry
85-
c := b.current
185+
// No pending, start fresh with our value
186+
c := &batchCall[T]{values: b.acquireSlice()}
187+
c.values = append(c.values, value)
188+
c.wg.Add(1)
189+
b.current = c
86190
b.mu.Unlock()
87-
c.wg.Wait()
88-
b.mu.Lock()
191+
b.run(c)
192+
return c.err
89193
}
90194
}
91195

92-
// run executes a batch.
196+
// run executes a single batch.
93197
func (b *Batch[T]) run(c *batchCall[T]) {
94198
defer func() {
95199
if r := recover(); r != nil {
@@ -98,6 +202,7 @@ func (b *Batch[T]) run(c *batchCall[T]) {
98202

99203
b.mu.Lock()
100204
b.current = nil
205+
b.releaseSlice(c.values)
101206
b.mu.Unlock()
102207

103208
c.wg.Done()

0 commit comments

Comments
 (0)