Skip to content

Commit 249a674

Browse files
craig[bot]yuzefovich
andcommitted
Merge #148974
148974: util/bufalloc: fix misuse and improve ByteAllocator API r=yuzefovich a=yuzefovich **kvserver/gc,rowcontainer: fix misuse of ByteAllocator API** `bufalloc.ByteAllocator.Copy` has an option to allocate extra capacity, meaning that it copies the passed-in byte slice into a new memory region while also adding extra capacity to be utilized by caller. Crucially, that extra capacity lives in the returned byte slice containing the copy. We use this feature in two places throughout the codebase, and in both places it was used incorrectly, resulting in a wasteful allocation. Consider the following setup: k = [1, 2], v = [3, 4, 5, 6] With invocation like `alloc, k = alloc.Copy(k, len(v))`, we would get len(k) = 2, cap(k) = 6 However, in both spots we copied `v` via `alloc, v = alloc.Copy(v, 0)`, which would actually create another allocation. This commit fixes this oversight (both were introduced in 2022 by different authors), and the following commit will clean up the API to avoid possible confusion. Informs: #147601. **util/bufalloc: clean up ByteAllocator API** `bufalloc.ByteAllocator`, as evidenced by a fix in the previous commit, has confusing API for allocating "extra capacity". This commit cleans it up by removing the option altogether. The only two spots that utilized the extra capacity previously now explicitly request combined allocated upfront and manage the copying behavior. Release note: None Co-authored-by: Yahor Yuzefovich <[email protected]>
2 parents 96f6d6b + daad9f2 commit 249a674

File tree

22 files changed

+112
-60
lines changed

22 files changed

+112
-60
lines changed

pkg/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ ALL_TESTS = [
713713
"//pkg/util/binfetcher:binfetcher_test",
714714
"//pkg/util/bitarray:bitarray_test",
715715
"//pkg/util/bitmap:bitmap_test",
716+
"//pkg/util/bufalloc:bufalloc_test",
716717
"//pkg/util/buildutil:buildutil_test",
717718
"//pkg/util/cache:cache_test",
718719
"//pkg/util/caller:caller_test",
@@ -2534,6 +2535,7 @@ GO_TARGETS = [
25342535
"//pkg/util/bitmap:bitmap",
25352536
"//pkg/util/bitmap:bitmap_test",
25362537
"//pkg/util/bufalloc:bufalloc",
2538+
"//pkg/util/bufalloc:bufalloc_test",
25372539
"//pkg/util/buildutil:buildutil",
25382540
"//pkg/util/buildutil:buildutil_test",
25392541
"//pkg/util/bulk:bulk",

pkg/ccl/changefeedccl/event_processing.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,14 +453,14 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
453453
if err != nil {
454454
return err
455455
}
456-
c.scratch, keyCopy = c.scratch.Copy(encodedKey, 0 /* extraCap */)
456+
c.scratch, keyCopy = c.scratch.Copy(encodedKey)
457457
// TODO(yevgeniy): Some refactoring is needed in the encoder: namely, prevRow
458458
// might not be available at all when working with changefeed expressions.
459459
encodedValue, err := c.encoder.EncodeValue(ctx, evCtx, updatedRow, prevRow)
460460
if err != nil {
461461
return err
462462
}
463-
c.scratch, valueCopy = c.scratch.Copy(encodedValue, 0 /* extraCap */)
463+
c.scratch, valueCopy = c.scratch.Copy(encodedValue)
464464

465465
// Since we're done processing/converting this event, and will not use much more
466466
// than len(key)+len(bytes) worth of resources, adjust allocation to match.

pkg/ccl/changefeedccl/sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ func (s *bufferSink) EmitResolvedTimestamp(
494494
if err != nil {
495495
return err
496496
}
497-
s.scratch, payload = s.scratch.Copy(payload, 0 /* extraCap */)
497+
s.scratch, payload = s.scratch.Copy(payload)
498498
s.buf.Push(rowenc.EncDatumRow{
499499
{Datum: tree.DNull}, // resolved span
500500
{Datum: tree.DNull}, // topic

pkg/ccl/changefeedccl/sink_kafka.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ func (s *kafkaSink) EmitResolvedTimestamp(
424424
if err != nil {
425425
return err
426426
}
427-
s.scratch, payload = s.scratch.Copy(payload, 0 /* extraCap */)
427+
s.scratch, payload = s.scratch.Copy(payload)
428428

429429
// sarama caches this, which is why we have to periodically refresh the
430430
// metadata above. Staleness here does not impact correctness. Some new

pkg/ccl/changefeedccl/sink_sql.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (s *sqlSink) EmitResolvedTimestamp(
168168
if err != nil {
169169
return err
170170
}
171-
s.scratch, payload = s.scratch.Copy(payload, 0 /* extraCap */)
171+
s.scratch, payload = s.scratch.Copy(payload)
172172
for partition := int32(0); partition < sqlSinkNumPartitions; partition++ {
173173
if err := s.emit(ctx, topic, partition, noKey, noValue, payload); err != nil {
174174
return err

pkg/crosscluster/streamclient/randclient/random_stream_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -738,7 +738,7 @@ func duplicateEvent(event crosscluster.Event) crosscluster.Event {
738738
for i := range kvs {
739739
res[i].Key = kvs[i].KeyValue.Key.Clone()
740740
res[i].Value.Timestamp = kvs[i].KeyValue.Value.Timestamp
741-
a, res[i].Value.RawBytes = a.Copy(kvs[i].KeyValue.Value.RawBytes, 0)
741+
a, res[i].Value.RawBytes = a.Copy(kvs[i].KeyValue.Value.RawBytes)
742742
}
743743
dup = crosscluster.MakeKVEventFromKVs(res)
744744
case crosscluster.SSTableEvent:

pkg/kv/kvnemesis/engine.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (e *Engine) Get(key roachpb.Key, ts hlc.Timestamp) roachpb.Value {
9393
if err != nil {
9494
panic(err)
9595
}
96-
e.b, valCopy = e.b.Copy(v, 0 /* extraCap */)
96+
e.b, valCopy = e.b.Copy(v)
9797
mvccVal, err := storage.DecodeMVCCValue(valCopy)
9898
if err != nil {
9999
panic(err)
@@ -137,12 +137,12 @@ func (e *Engine) Iterate(
137137
for iter.First(); iter.Valid(); iter.Next() {
138138
hasPoint, _ := iter.HasPointAndRange()
139139
var keyCopy, valCopy []byte
140-
e.b, keyCopy = e.b.Copy(iter.Key(), 0 /* extraCap */)
140+
e.b, keyCopy = e.b.Copy(iter.Key())
141141
v, err := iter.ValueAndErr()
142142
if err != nil {
143143
fn(nil, nil, hlc.Timestamp{}, nil, err)
144144
}
145-
e.b, valCopy = e.b.Copy(v, 0 /* extraCap */)
145+
e.b, valCopy = e.b.Copy(v)
146146
if hasPoint {
147147
key, err := storage.DecodeMVCCKey(keyCopy)
148148
if err != nil {
@@ -153,8 +153,8 @@ func (e *Engine) Iterate(
153153
}
154154
if iter.RangeKeyChanged() {
155155
keyCopy, endKeyCopy := iter.RangeBounds()
156-
e.b, keyCopy = e.b.Copy(keyCopy, 0 /* extraCap */)
157-
e.b, endKeyCopy = e.b.Copy(endKeyCopy, 0 /* extraCap */)
156+
e.b, keyCopy = e.b.Copy(keyCopy)
157+
e.b, endKeyCopy = e.b.Copy(endKeyCopy)
158158
for _, rk := range iter.RangeKeys() {
159159
ts, err := mvccencoding.DecodeMVCCTimestampSuffix(rk.Suffix)
160160
if err != nil {
@@ -170,7 +170,7 @@ func (e *Engine) Iterate(
170170
fn(nil, nil, hlc.Timestamp{}, nil, errors.Errorf("invalid key %q", endKeyCopy))
171171
}
172172

173-
e.b, rk.Value = e.b.Copy(rk.Value, 0)
173+
e.b, rk.Value = e.b.Copy(rk.Value)
174174

175175
fn(engineKey.Key, engineEndKey.Key, ts, rk.Value, nil)
176176
}

pkg/kv/kvserver/gc/gc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,7 @@ func (b *gcKeyBatcher) foundGarbage(
809809
// Whenever new key is started or new batch is started with the same key in
810810
// it, record key value using batches' allocator.
811811
if b.prevWasNewest || len(b.pointsBatches[i].batchGCKeys) == 0 {
812-
b.pointsBatches[i].alloc, key = b.pointsBatches[i].alloc.Copy(cur.key.Key, 0)
812+
b.pointsBatches[i].alloc, key = b.pointsBatches[i].alloc.Copy(cur.key.Key)
813813
b.pointsBatches[i].batchGCKeys = append(b.pointsBatches[i].batchGCKeys,
814814
kvpb.GCRequest_GCKey{Key: key, Timestamp: cur.key.Timestamp})
815815
keyMemUsed := len(key) + hlcTimestampSize
@@ -1065,7 +1065,7 @@ func (b *intentBatcher) addAndMaybeFlushIntents(
10651065
// We need to register passed intent regardless of flushing operation result
10661066
// so that batcher is left in consistent state and don't miss any keys if
10671067
// caller resumes batching.
1068-
b.alloc, key = b.alloc.Copy(key, 0)
1068+
b.alloc, key = b.alloc.Copy(key)
10691069
b.pendingLocks = append(b.pendingLocks, roachpb.MakeLock(meta.Txn, key, str))
10701070
b.collectedIntentBytes += int64(len(key))
10711071
b.pendingTxns[txnID] = true

pkg/kv/kvserver/gc/gc_iterator.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,12 @@ func (b *gcIteratorRingBuf) pushBack(
237237
}
238238
i := (b.head + b.len) % gcIteratorRingBufSize
239239
b.allocs[i] = b.allocs[i].Truncate()
240-
b.allocs[i], k.Key = b.allocs[i].Copy(k.Key, len(metaValue))
241-
if len(metaValue) > 0 {
242-
b.allocs[i], metaValue = b.allocs[i].Copy(metaValue, 0)
243-
}
240+
var buf []byte
241+
b.allocs[i], buf = b.allocs[i].Alloc(len(k.Key) + len(metaValue))
242+
copy(buf, k.Key)
243+
k.Key, buf = buf[:len(k.Key):len(k.Key)], buf[len(k.Key):]
244+
copy(buf, metaValue)
245+
metaValue = buf
244246
b.buf[i] = mvccKeyValue{
245247
key: k,
246248
mvccValueLen: mvccValueLen,

pkg/kv/kvserver/rangefeed/catchup_scan.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,8 @@ func (i *CatchUpIterator) CatchUpScan(
198198
rangeKeys := i.RangeKeys()
199199
for j := rangeKeys.Len() - 1; j >= 0; j-- {
200200
var span roachpb.Span
201-
a, span.Key = a.Copy(rangeKeys.Bounds.Key, 0)
202-
a, span.EndKey = a.Copy(rangeKeys.Bounds.EndKey, 0)
201+
a, span.Key = a.Copy(rangeKeys.Bounds.Key)
202+
a, span.EndKey = a.Copy(rangeKeys.Bounds.EndKey)
203203
ts := rangeKeys.Versions[j].Timestamp
204204
err := outputFn(&kvpb.RangeFeedEvent{
205205
DeleteRange: &kvpb.RangeFeedDeleteRange{
@@ -298,7 +298,7 @@ func (i *CatchUpIterator) CatchUpScan(
298298
if err := outputEvents(); err != nil {
299299
return err
300300
}
301-
a, lastKey = a.Copy(unsafeKey.Key, 0)
301+
a, lastKey = a.Copy(unsafeKey.Key)
302302
}
303303
key := lastKey
304304

@@ -316,7 +316,7 @@ func (i *CatchUpIterator) CatchUpScan(
316316
// value.
317317
if !ignore || (withDiff && len(reorderBuf) > 0) {
318318
var val []byte
319-
a, val = a.Copy(unsafeVal, 0)
319+
a, val = a.Copy(unsafeVal)
320320
if withDiff {
321321
// Update the last version with its previous value (this version).
322322
if l := len(reorderBuf) - 1; l >= 0 {

0 commit comments

Comments
 (0)