Skip to content

Commit 2a9065c

Browse files
committed
kv/bulk: close bulk-adder-monitor on error
Previously, we would have left the `bulk-adder-monitor` not closed if we hit an error in `MakeBulkAdder`, and this is now fixed. (The only error possible is actually not enough memory budget to reserve the buffer size.) Release note: None
1 parent 9295740 commit 2a9065c

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

pkg/kv/bulk/buffering_adder.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ var _ kvserverbase.BulkAdder = &BufferingAdder{}
7979
// passed to add into SSTs that are then ingested. rangeCache if set is
8080
// consulted to avoid generating an SST that will span a range boundary and thus
8181
// encounter an error and need to be split and retired to be applied.
82+
//
83+
// The BulkAdder takes ownership of the memory monitor which must be non-nil. In
84+
// case of an error, the monitor will be stopped.
8285
func MakeBulkAdder(
8386
ctx context.Context,
8487
db *kv.DB,
@@ -88,7 +91,15 @@ func MakeBulkAdder(
8891
opts kvserverbase.BulkAdderOptions,
8992
bulkMon *mon.BytesMonitor,
9093
sendLimiter limit.ConcurrentRequestLimiter,
91-
) (*BufferingAdder, error) {
94+
) (_ *BufferingAdder, retErr error) {
95+
if bulkMon == nil {
96+
return nil, errors.New("bulkMon must be non-nil")
97+
}
98+
defer func() {
99+
if retErr != nil {
100+
bulkMon.Stop(ctx)
101+
}
102+
}()
92103
if opts.MinBufferSize == 0 {
93104
opts.MinBufferSize = 32 << 20
94105
}
@@ -166,11 +177,8 @@ func (b *BufferingAdder) Close(ctx context.Context) {
166177
b.sink.mu.Unlock()
167178
}
168179
b.sink.Close(ctx)
169-
170-
if b.bulkMon != nil {
171-
b.memAcc.Close(ctx)
172-
b.bulkMon.Stop(ctx)
173-
}
180+
b.memAcc.Close(ctx)
181+
b.bulkMon.Stop(ctx)
174182
}
175183

176184
// Add adds a key to the buffer and checks if it needs to flush.

0 commit comments

Comments
 (0)