Skip to content

Commit 33885ac

Browse files
committed
logstore: add EntryStats
Epic: none Release note: none
1 parent 0674711 commit 33885ac

File tree

4 files changed

+49
-40
lines changed

4 files changed

+49
-40
lines changed

pkg/kv/kvserver/logstore/logstore.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,28 @@ type RaftState struct {
8484
ByteSize int64
8585
}
8686

87+
// EntryStats contains stats about the appended log slice.
88+
type EntryStats struct {
89+
RegularEntries int
90+
RegularBytes int64
91+
SideloadedEntries int
92+
SideloadedBytes int64
93+
}
94+
95+
// Add increments the stats with the given delta.
96+
func (e *EntryStats) Add(delta EntryStats) {
97+
e.RegularEntries += delta.RegularEntries
98+
e.RegularBytes += delta.RegularBytes
99+
e.SideloadedEntries += delta.SideloadedEntries
100+
e.SideloadedBytes += delta.SideloadedBytes
101+
}
102+
87103
// AppendStats describes a completed log storage append operation.
88104
type AppendStats struct {
89105
Begin crtime.Mono
90106
End crtime.Mono
91107

92-
RegularEntries int
93-
RegularBytes int64
94-
SideloadedEntries int
95-
SideloadedBytes int64
108+
EntryStats
96109

97110
PebbleBegin crtime.Mono
98111
PebbleEnd crtime.Mono
@@ -186,22 +199,19 @@ func (s *LogStore) storeEntriesAndCommitBatch(
186199
stats.Begin = crtime.NowMono()
187200
// All of the entries are appended to distinct keys, returning a new
188201
// last index.
189-
thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := MaybeSideloadEntries(ctx, m.Entries, s.Sideload)
202+
thinEntries, entryStats, err := MaybeSideloadEntries(ctx, m.Entries, s.Sideload)
190203
if err != nil {
191204
const expl = "during sideloading"
192205
return RaftState{}, errors.Wrap(err, expl)
193206
}
194-
state.ByteSize += sideLoadedEntriesSize
207+
stats.EntryStats.Add(entryStats) // TODO(pav-kv): just return the stats.
208+
state.ByteSize += entryStats.SideloadedBytes
195209
if state, err = logAppend(
196210
ctx, s.StateLoader.RaftLogPrefix(), batch, state, thinEntries,
197211
); err != nil {
198212
const expl = "during append"
199213
return RaftState{}, errors.Wrap(err, expl)
200214
}
201-
stats.RegularEntries += len(thinEntries) - numSideloaded
202-
stats.RegularBytes += otherEntriesSize
203-
stats.SideloadedEntries += numSideloaded
204-
stats.SideloadedBytes += sideLoadedEntriesSize
205215
stats.End = crtime.NowMono()
206216
}
207217

pkg/kv/kvserver/logstore/sideload.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,21 +69,17 @@ type SideloadStorage interface {
6969
// in parts or entirely by the same memory.
7070
func MaybeSideloadEntries(
7171
ctx context.Context, input []raftpb.Entry, sideloaded SideloadStorage,
72-
) (
73-
_ []raftpb.Entry,
74-
numSideloaded int,
75-
sideloadedEntriesSize int64,
76-
otherEntriesSize int64,
77-
_ error,
78-
) {
72+
) ([]raftpb.Entry, EntryStats, error) {
73+
var stats EntryStats
7974
var output []raftpb.Entry
8075
for i := range input {
8176
typ, pri, err := raftlog.EncodingOf(input[i])
8277
if err != nil {
83-
return nil, 0, 0, 0, err
78+
return nil, EntryStats{}, err
8479
}
8580
if !typ.IsSideloaded() {
86-
otherEntriesSize += int64(len(input[i].Data))
81+
stats.RegularEntries++
82+
stats.RegularBytes += int64(len(input[i].Data))
8783
continue
8884
}
8985

@@ -100,7 +96,7 @@ func MaybeSideloadEntries(
10096
// Unmarshal the command into an object that we can mutate.
10197
e, err := raftlog.NewEntry(input[i])
10298
if err != nil {
103-
return nil, 0, 0, 0, err
99+
return nil, EntryStats{}, err
104100
}
105101
if e.Cmd.ReplicatedEvalResult.AddSSTable == nil {
106102
// Still no AddSSTable; someone must've proposed a v2 command
@@ -109,7 +105,6 @@ func MaybeSideloadEntries(
109105
log.Warning(ctx, "encountered sideloaded Raft command without inlined payload")
110106
continue
111107
}
112-
numSideloaded++
113108

114109
// Actually strip the command.
115110
dataToSideload := e.Cmd.ReplicatedEvalResult.AddSSTable.Data
@@ -123,28 +118,30 @@ func MaybeSideloadEntries(
123118
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID, pri)
124119
_, err := protoutil.MarshalToSizedBuffer(&e.Cmd, data[raftlog.RaftCommandPrefixLen:])
125120
if err != nil {
126-
return nil, 0, 0, 0, errors.Wrap(err, "while marshaling stripped sideloaded command")
121+
return nil, EntryStats{}, errors.Wrap(err, "while marshaling stripped sideloaded command")
127122
}
128123
outputEnt.Data = data
129124
}
130125

131126
log.Eventf(ctx, "writing payload at index=%d term=%d", outputEnt.Index, outputEnt.Term)
132127
if err := sideloaded.Put(ctx, kvpb.RaftIndex(outputEnt.Index), kvpb.RaftTerm(outputEnt.Term), dataToSideload); err != nil { // TODO could verify checksum here
133-
return nil, 0, 0, 0, err
128+
return nil, EntryStats{}, err
134129
}
135-
sideloadedEntriesSize += int64(len(dataToSideload))
130+
131+
stats.SideloadedEntries++
132+
stats.SideloadedBytes += int64(len(dataToSideload))
136133
}
137134

138135
if output != nil { // there is at least one sideloaded command
139136
// Sync the sideloaded storage directory so that the commands are durable.
140137
if err := sideloaded.Sync(); err != nil {
141-
return nil, 0, 0, 0, err
138+
return nil, EntryStats{}, err
142139
}
143140
} else { // we never saw a sideloaded command
144141
output = input
145142
}
146143

147-
return output, numSideloaded, sideloadedEntriesSize, otherEntriesSize, nil
144+
return output, stats, nil
148145
}
149146

150147
// MaybeInlineSideloadedRaftCommand takes an entry and inspects it. If its

pkg/kv/kvserver/logstore/sideload_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -520,22 +520,22 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {
520520
eng := storage.NewDefaultInMemForTesting()
521521
defer eng.Close()
522522
sideloaded := newTestingSideloadStorage(eng)
523-
postEnts, numSideloaded, size, nonSideloadedSize, err := MaybeSideloadEntries(ctx, test.preEnts, sideloaded)
523+
postEnts, stats, err := MaybeSideloadEntries(ctx, test.preEnts, sideloaded)
524524
if err != nil {
525525
t.Fatal(err)
526526
}
527527
if len(addSST.Data) == 0 {
528528
t.Fatal("invocation mutated original AddSSTable struct in memory")
529529
}
530-
require.Equal(t, test.nonSideloadedSize, nonSideloadedSize)
530+
require.Equal(t, test.nonSideloadedSize, stats.RegularBytes)
531531
var expNumSideloaded int
532532
if test.size > 0 {
533533
expNumSideloaded = 1
534534
}
535-
require.Equal(t, expNumSideloaded, numSideloaded)
535+
require.Equal(t, expNumSideloaded, stats.SideloadedEntries)
536536
require.Equal(t, test.postEnts, postEnts)
537-
if test.size != size {
538-
t.Fatalf("expected %d sideloadedSize, but found %d", test.size, size)
537+
if test.size != stats.SideloadedBytes {
538+
t.Fatalf("expected %d sideloadedSize, but found %d", test.size, stats.SideloadedBytes)
539539
}
540540
actKeys, err := sideloaded.eng.Env().List(sideloaded.Dir())
541541
if oserror.IsNotExist(err) {

pkg/kv/kvserver/replica_raft_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,17 @@ func Test_handleRaftReadyStats_SafeFormat(t *testing.T) {
107107
numConfChangeEntries: 6,
108108
},
109109
append: logstore.AppendStats{
110-
Begin: ts(2),
111-
End: ts(3),
112-
RegularEntries: 7,
113-
RegularBytes: 1024,
114-
SideloadedEntries: 3,
115-
SideloadedBytes: 5 * (1 << 20),
116-
PebbleBegin: ts(3),
117-
PebbleEnd: ts(4),
118-
PebbleBytes: 1024 * 5,
110+
Begin: ts(2),
111+
End: ts(3),
112+
EntryStats: logstore.EntryStats{
113+
RegularEntries: 7,
114+
RegularBytes: 1024,
115+
SideloadedEntries: 3,
116+
SideloadedBytes: 5 * (1 << 20),
117+
},
118+
PebbleBegin: ts(3),
119+
PebbleEnd: ts(4),
120+
PebbleBytes: 1024 * 5,
119121
PebbleCommitStats: storage.BatchCommitStats{
120122
BatchCommitStats: pebble.BatchCommitStats{
121123
TotalDuration: 100 * time.Millisecond,

0 commit comments

Comments
 (0)