Skip to content

Commit a5d7c25

Browse files
craig[bot]pav-kvsambhav-jain-16
committed
143863: raft: lift max size from raftLog r=tbg a=pav-kv There is no more built-in flow control in the `raftLog` type. This commit moves the final piece of it out. There is only one place where this limit is still used: `hasUnappliedConfChanges` check scans the log when campaigning. Related to #143576 143864: logstore: rm MaybeInlineSideloadedRaftCommand alloc r=tbg a=pav-kv This change makes `MaybeInlineSideloadedRaftCommand` use the single-entry `raftentry.Cache.Get()` instead of a scan, and eliminates unnecessary allocations in this path. Epic: none Release note: none 143937: raft: fix BenchmarkRawNode r=tbg a=pav-kv Epic: none Release note: none 144040: roachtest/tpce: openmetrics bytes emitting wrong new line character r=csgourav a=sambhav-jain-16 This commit addresses an issue in the TPC-E roachtest where the OpenMetrics bytes were emitting incorrect newline characters. Epic: none Release note: None Co-authored-by: Pavel Kalinnikov <[email protected]> Co-authored-by: Sambhav Jain <[email protected]>
5 parents b1839bf + af98a34 + 4d309fc + 86df592 + 7c76c3c commit a5d7c25

File tree

9 files changed

+92
-151
lines changed

9 files changed

+92
-151
lines changed

pkg/cmd/roachtest/tests/tpce.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ func GetTpceOpenmetricsBytes(
400400
buffer.WriteString("# HELP tpce_latency Latency metrics for TPC-E transactions\n")
401401

402402
latencyString := func(quantile, latency string) string {
403-
return fmt.Sprintf(`tpce_latency{%s,unit="ms",is_higher_better="false",quantile="%s"} %s %d\n`, labelString, quantile, latency, now)
403+
return fmt.Sprintf("tpce_latency{%s,unit=\"ms\",is_higher_better=\"false\",quantile=\"%s\"} %s %d\n", labelString, quantile, latency, now)
404404
}
405405

406406
buffer.WriteString(latencyString("0.5", metrics.P50Latency))
@@ -411,7 +411,7 @@ func GetTpceOpenmetricsBytes(
411411
buffer.WriteString(fmt.Sprintf("tpce_latency_sum{%s} %d %d\n", labelString, 0, now))
412412
buffer.WriteString(fmt.Sprintf("tpce_latency_count{%s} %s %d\n", labelString, countOfLatencies, now))
413413
buffer.WriteString("# TYPE tpce_avg_latency gauge\n")
414-
buffer.WriteString(fmt.Sprintf(`tpce_avg_latency{%s,unit="ms",is_higher_better="false"} %s %d\n`, labelString, metrics.AvgLatency, now))
414+
buffer.WriteString(fmt.Sprintf("tpce_avg_latency{%s,unit=\"ms\",is_higher_better=\"false\"} %s %d\n", labelString, metrics.AvgLatency, now))
415415
buffer.WriteString("# EOF")
416416

417417
metricsBytes := buffer.Bytes()

pkg/kv/kvserver/logstore/logstore.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -675,20 +675,14 @@ func LoadEntries(
675675
}
676676
expectedIndex++
677677

678-
typ, _, err := raftlog.EncodingOf(ent)
679-
if err != nil {
678+
if typ, _, err := raftlog.EncodingOf(ent); err != nil {
680679
return err
681-
}
682-
if typ.IsSideloaded() {
683-
newEnt, err := MaybeInlineSideloadedRaftCommand(
680+
} else if typ.IsSideloaded() {
681+
if ent, err = MaybeInlineSideloadedRaftCommand(
684682
ctx, rangeID, ent, sideloaded, eCache,
685-
)
686-
if err != nil {
683+
); err != nil {
687684
return err
688685
}
689-
if newEnt != nil {
690-
ent = *newEnt
691-
}
692686
}
693687

694688
if sh.add(uint64(ent.Size())) {

pkg/kv/kvserver/logstore/sideload.go

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,8 @@ func MaybeSideloadEntries(
149149

150150
// MaybeInlineSideloadedRaftCommand takes an entry and inspects it. If its
151151
// command encoding version indicates a sideloaded entry, it uses the entryCache
152-
// or SideloadStorage to inline the payload, returning a new entry (which must
153-
// be treated as immutable by the caller) or nil (if inlining does not apply)
152+
// or SideloadStorage to inline the payload, and returns a new entry (which must
153+
// be treated as immutable by the caller).
154154
//
155155
// If a payload is missing, returns an error whose Cause() is
156156
// errSideloadedFileNotFound.
@@ -160,36 +160,24 @@ func MaybeInlineSideloadedRaftCommand(
160160
ent raftpb.Entry,
161161
sideloaded SideloadStorage,
162162
entryCache *raftentry.Cache,
163-
) (*raftpb.Entry, error) {
163+
) (raftpb.Entry, error) {
164164
typ, pri, err := raftlog.EncodingOf(ent)
165-
if err != nil {
166-
return nil, err
167-
}
168-
if !typ.IsSideloaded() {
169-
return nil, nil
165+
if err != nil || !typ.IsSideloaded() {
166+
return ent, err
170167
}
171168
log.Event(ctx, "inlining sideloaded SSTable")
172-
// We could unmarshal this yet again, but if it's committed we
173-
// are very likely to have appended it recently, in which case
174-
// we can save work.
175-
cachedSingleton, _, _, _ := entryCache.Scan(
176-
nil, rangeID, kvpb.RaftIndex(ent.Index), kvpb.RaftIndex(ent.Index+1), 1<<20,
177-
)
178-
179-
if len(cachedSingleton) > 0 {
169+
// We could unmarshal this yet again, but if it's committed we are very likely
170+
// to have appended it recently, in which case we can save work.
171+
if entry, hit := entryCache.Get(rangeID, kvpb.RaftIndex(ent.Index)); hit {
180172
log.Event(ctx, "using cache hit")
181-
return &cachedSingleton[0], nil
173+
return entry, nil
182174
}
183175

184-
// Make a shallow copy.
185-
entCpy := ent
186-
ent = entCpy
187-
188176
log.Event(ctx, "inlined entry not cached")
189177
// (Bad) luck, for whatever reason the inlined proposal isn't in the cache.
190178
e, err := raftlog.NewEntry(ent)
191179
if err != nil {
192-
return nil, err
180+
return ent, err
193181
}
194182

195183
if len(e.Cmd.ReplicatedEvalResult.AddSSTable.Data) > 0 {
@@ -201,12 +189,12 @@ func MaybeInlineSideloadedRaftCommand(
201189
// be as a result of log entries that are very old, written
202190
// when sending the log with snapshots was still possible).
203191
log.Event(ctx, "entry already inlined")
204-
return &ent, nil
192+
return ent, nil
205193
}
206194

207195
sideloadedData, err := sideloaded.Get(ctx, kvpb.RaftIndex(ent.Index), kvpb.RaftTerm(ent.Term))
208196
if err != nil {
209-
return nil, errors.Wrap(err, "loading sideloaded data")
197+
return ent, errors.Wrap(err, "loading sideloaded data")
210198
}
211199
e.Cmd.ReplicatedEvalResult.AddSSTable.Data = sideloadedData
212200
// TODO(tbg): there should be a helper that properly encodes a command, given
@@ -216,11 +204,11 @@ func MaybeInlineSideloadedRaftCommand(
216204
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID, pri)
217205
_, err := protoutil.MarshalToSizedBuffer(&e.Cmd, data[raftlog.RaftCommandPrefixLen:])
218206
if err != nil {
219-
return nil, err
207+
return ent, err
220208
}
221209
ent.Data = data
222210
}
223-
return &ent, nil
211+
return ent, nil
224212
}
225213

226214
// AssertSideloadedRaftCommandInlined asserts that if the provided entry is a

pkg/kv/kvserver/logstore/sideload_test.go

Lines changed: 48 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"os"
1414
"path/filepath"
1515
"reflect"
16-
"regexp"
1716
"sort"
1817
"strconv"
1918
"strings"
@@ -368,18 +367,6 @@ func TestRaftSSTableSideloadingInline(t *testing.T) {
368367
v1, v2 := raftlog.EntryEncodingStandardWithAC, raftlog.EntryEncodingSideloadedWithAC
369368
rangeID := roachpb.RangeID(1)
370369

371-
type testCase struct {
372-
// Entry passed into maybeInlineSideloadedRaftCommand and the entry
373-
// after having (perhaps) been modified.
374-
thin, fat raftpb.Entry
375-
// Populate the raft entry cache and sideload storage before running the test.
376-
setup func(*raftentry.Cache, SideloadStorage)
377-
// If nonempty, the error expected from maybeInlineSideloadedRaftCommand.
378-
expErr string
379-
// If nonempty, a regex that the recorded trace span must match.
380-
expTrace string
381-
}
382-
383370
sstFat := kvserverpb.ReplicatedEvalResult_AddSSTable{
384371
Data: []byte("foo"),
385372
CRC32: 0, // not checked
@@ -388,91 +375,84 @@ func TestRaftSSTableSideloadingInline(t *testing.T) {
388375
CRC32: 0, // not checked
389376
}
390377

391-
putOnDisk := func(ec *raftentry.Cache, ss SideloadStorage) {
392-
if err := ss.Put(context.Background(), 5, 6, sstFat.Data); err != nil {
393-
t.Fatal(err)
394-
}
378+
putOnDisk := func(_ *raftentry.Cache, ss SideloadStorage) {
379+
require.NoError(t, ss.Put(context.Background(), 5, 6, sstFat.Data))
395380
}
396381

397-
testCases := map[string]testCase{
382+
for _, test := range []struct {
383+
name string
384+
// Entry passed into maybeInlineSideloadedRaftCommand and the entry
385+
// after having (perhaps) been modified.
386+
thin, fat raftpb.Entry
387+
// Populate the raft entry cache and sideload storage before running the test.
388+
setup func(*raftentry.Cache, SideloadStorage)
389+
// If nonempty, the error expected from maybeInlineSideloadedRaftCommand.
390+
expErr string
391+
// If nonempty, a regex that the recorded trace span must match.
392+
expTrace string
393+
}{
398394
// Plain old v1 Raft command without payload. Don't touch.
399-
"v1-no-payload": {thin: mkEnt(v1, 5, 6, &sstThin), fat: mkEnt(v1, 5, 6, &sstThin)},
395+
{name: "v1-no-payload", thin: mkEnt(v1, 5, 6, &sstThin), fat: mkEnt(v1, 5, 6, &sstThin)},
400396
// With payload, but command is v1. Don't touch. Note that the
401397
// first of the two shouldn't happen in practice or we have a
402398
// huge problem once we try to apply this entry.
403-
"v1-slim-with-payload": {thin: mkEnt(v1, 5, 6, &sstThin), fat: mkEnt(v1, 5, 6, &sstThin)},
404-
"v1-with-payload": {thin: mkEnt(v1, 5, 6, &sstFat), fat: mkEnt(v1, 5, 6, &sstFat)},
399+
{name: "v1-slim-with-payload", thin: mkEnt(v1, 5, 6, &sstThin), fat: mkEnt(v1, 5, 6, &sstThin)},
400+
{name: "v1-with-payload", thin: mkEnt(v1, 5, 6, &sstFat), fat: mkEnt(v1, 5, 6, &sstFat)},
405401
// v2 with payload, but payload is AWOL. This would be fatal in practice.
406-
"v2-with-payload-missing-file": {
402+
{
403+
name: "v2-with-payload-missing-file",
407404
thin: mkEnt(v2, 5, 6, &sstThin), fat: mkEnt(v2, 5, 6, &sstThin),
408405
expErr: "not found",
409406
},
410407
// v2 with payload that's actually there. The request we'll see in
411408
// practice.
412-
"v2-with-payload-with-file-no-cache": {
409+
{
410+
name: "v2-with-payload-with-file-no-cache",
413411
thin: mkEnt(v2, 5, 6, &sstThin), fat: mkEnt(v2, 5, 6, &sstFat),
414412
setup: putOnDisk, expTrace: "inlined entry not cached",
415413
},
416-
"v2-with-payload-with-file-with-cache": {
414+
{
415+
name: "v2-with-payload-with-file-with-cache",
417416
thin: mkEnt(v2, 5, 6, &sstThin), fat: mkEnt(v2, 5, 6, &sstFat),
418417
setup: func(ec *raftentry.Cache, ss SideloadStorage) {
419418
putOnDisk(ec, ss)
420419
ec.Add(rangeID, []raftpb.Entry{mkEnt(v2, 5, 6, &sstFat)}, true)
421420
}, expTrace: "using cache hit",
422421
},
423-
"v2-fat-without-file": {
422+
{
423+
name: "v2-fat-without-file",
424424
thin: mkEnt(v2, 5, 6, &sstFat), fat: mkEnt(v2, 5, 6, &sstFat),
425425
setup: func(ec *raftentry.Cache, ss SideloadStorage) {},
426426
expTrace: "already inlined",
427427
},
428-
}
429-
430-
runOne := func(k string, test testCase) {
431-
ctx, getRecAndFinish := tracing.ContextWithRecordingSpan(
432-
context.Background(), tracing.NewTracer(), "test-recording")
433-
defer getRecAndFinish()
434-
435-
eng := storage.NewDefaultInMemForTesting()
436-
defer eng.Close()
437-
ss := newTestingSideloadStorage(eng)
438-
ec := raftentry.NewCache(1024) // large enough
439-
if test.setup != nil {
440-
test.setup(ec, ss)
441-
}
428+
} {
429+
t.Run(test.name, func(t *testing.T) {
430+
ctx, getRecAndFinish := tracing.ContextWithRecordingSpan(
431+
context.Background(), tracing.NewTracer(), "test-recording")
432+
defer getRecAndFinish()
442433

443-
thinCopy := *(protoutil.Clone(&test.thin).(*raftpb.Entry))
444-
newEnt, err := MaybeInlineSideloadedRaftCommand(ctx, rangeID, thinCopy, ss, ec)
445-
if err != nil {
446-
if test.expErr == "" || !testutils.IsError(err, test.expErr) {
447-
t.Fatalf("%s: %+v", k, err)
434+
eng := storage.NewDefaultInMemForTesting()
435+
defer eng.Close()
436+
ss := newTestingSideloadStorage(eng)
437+
ec := raftentry.NewCache(1024) // large enough
438+
if test.setup != nil {
439+
test.setup(ec, ss)
448440
}
449-
} else if test.expErr != "" {
450-
t.Fatalf("%s: success, but expected error: %s", k, test.expErr)
451-
} else {
452-
mustEntryEq(t, thinCopy, test.thin)
453-
}
454-
455-
if newEnt == nil {
456-
newEnt = &thinCopy
457-
}
458-
mustEntryEq(t, *newEnt, test.fat)
459441

460-
if dump := getRecAndFinish().String(); test.expTrace != "" {
461-
if ok, err := regexp.MatchString(test.expTrace, dump); err != nil {
462-
t.Fatalf("%s: %+v", k, err)
463-
} else if !ok {
464-
t.Fatalf("%s: expected trace matching:\n%s\n\nbut got\n%s", k, test.expTrace, dump)
442+
thinCopy := *(protoutil.Clone(&test.thin).(*raftpb.Entry))
443+
newEnt, err := MaybeInlineSideloadedRaftCommand(ctx, rangeID, thinCopy, ss, ec)
444+
if want := test.expErr; want != "" {
445+
require.ErrorContains(t, err, want)
446+
} else {
447+
require.NoError(t, err)
448+
mustEntryEq(t, thinCopy, test.thin)
465449
}
466-
}
467-
}
450+
mustEntryEq(t, newEnt, test.fat)
468451

469-
keys := make([]string, 0, len(testCases))
470-
for k := range testCases {
471-
keys = append(keys, k)
472-
}
473-
sort.Strings(keys)
474-
for _, k := range keys {
475-
runOne(k, testCases[k])
452+
if want := test.expTrace; want != "" {
453+
require.Contains(t, getRecAndFinish().String(), want)
454+
}
455+
})
476456
}
477457
}
478458

pkg/raft/log.go

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -83,36 +83,20 @@ type raftLog struct {
8383
applied uint64
8484

8585
logger raftlogger.Logger
86-
87-
// maxApplyingEntsSize limits the outstanding byte size of the messages
88-
// returned from calls to nextCommittedEnts that have not been acknowledged
89-
// by a call to appliedTo.
90-
maxApplyingEntsSize entryEncodingSize
9186
}
9287

93-
// newLog returns log using the given storage and default options. It
94-
// recovers the log to the state that it just commits and applies the
95-
// latest snapshot.
88+
// newLog returns a raft log initialized to the state in the given storage.
9689
func newLog(storage Storage, logger raftlogger.Logger) *raftLog {
97-
return newLogWithSize(storage, logger, noLimit)
98-
}
99-
100-
// newLogWithSize returns a log using the given storage and max
101-
// message size.
102-
func newLogWithSize(
103-
storage Storage, logger raftlogger.Logger, maxApplyingEntsSize entryEncodingSize,
104-
) *raftLog {
10590
compacted, lastIndex := storage.Compacted(), storage.LastIndex()
10691
lastTerm, err := storage.Term(lastIndex)
10792
if err != nil {
10893
panic(err) // TODO(pav-kv): the storage should always cache the last term.
10994
}
11095
last := entryID{term: lastTerm, index: lastIndex}
11196
return &raftLog{
112-
storage: storage,
113-
unstable: newUnstable(last, logger),
114-
termCache: newTermCache(termCacheSize, last),
115-
maxApplyingEntsSize: maxApplyingEntsSize,
97+
storage: storage,
98+
unstable: newUnstable(last, logger),
99+
termCache: newTermCache(termCacheSize, last),
116100

117101
// Initialize our committed and applied pointers to the time of the last
118102
// compaction.
@@ -292,7 +276,7 @@ func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
292276
if span.Empty() {
293277
return nil
294278
}
295-
ents, err := l.slice(uint64(span.After), uint64(span.Last), l.maxApplyingEntsSize)
279+
ents, err := l.slice(uint64(span.After), uint64(span.Last), noLimit)
296280
if err != nil {
297281
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
298282
}

pkg/raft/log_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,6 @@ func TestNextCommittedEnts(t *testing.T) {
373373
}
374374

375375
func TestAcceptApplying(t *testing.T) {
376-
maxSize := entryEncodingSize(100)
377376
snap := pb.Snapshot{
378377
Metadata: pb.SnapshotMetadata{Term: 1, Index: 3},
379378
}
@@ -398,7 +397,7 @@ func TestAcceptApplying(t *testing.T) {
398397
t.Run("", func(t *testing.T) {
399398
storage := NewMemoryStorage()
400399
require.NoError(t, storage.ApplySnapshot(snap))
401-
raftLog := newLogWithSize(storage, raftlogger.DiscardLogger, maxSize)
400+
raftLog := newLog(storage, raftlogger.DiscardLogger)
402401
require.True(t, raftLog.append(init))
403402
require.NoError(t, storage.Append(init.sub(3, 4)))
404403
raftLog.checkInvariants(t)

0 commit comments

Comments
 (0)