Skip to content

Commit 4a7fde9

Browse files
committed
internal/trace: add end-of-generation signal to trace
This change takes the EvEndOfGeneration event and promotes it to a real event that appears in the trace. This allows the trace parser to unambiguously identify truncated traces vs. broken traces. It also makes a lot of the logic around parsing simpler, because there's no more batch spilling necessary. Fixes golang#73904. Change-Id: I37c359b32b6b5f894825aafc02921adeaacf2595 Reviewed-on: https://go-review.googlesource.com/c/go/+/693398 Reviewed-by: Carlos Amedee <[email protected]> Reviewed-by: Michael Pratt <[email protected]> LUCI-TryBot-Result: Go LUCI <[email protected]>
1 parent cb814bd commit 4a7fde9

File tree

11 files changed

+273
-135
lines changed

11 files changed

+273
-135
lines changed

src/internal/trace/batch.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ func (b *batch) isSyncBatch(ver version.Version) bool {
4444
(tracev2.EventType(b.data[0]) == tracev2.EvSync && ver >= version.Go125))
4545
}
4646

47+
func (b *batch) isEndOfGeneration() bool {
48+
return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvEndOfGeneration
49+
}
50+
4751
// readBatch reads the next full batch from r.
4852
func readBatch(r interface {
4953
io.Reader
@@ -54,6 +58,9 @@ func readBatch(r interface {
5458
if err != nil {
5559
return batch{}, 0, err
5660
}
61+
if typ := tracev2.EventType(b); typ == tracev2.EvEndOfGeneration {
62+
return batch{m: NoThread, exp: tracev2.NoExperiment, data: []byte{b}}, 0, nil
63+
}
5764
if typ := tracev2.EventType(b); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
5865
return batch{}, 0, fmt.Errorf("expected batch event, got event %d", typ)
5966
}

src/internal/trace/generation.go

Lines changed: 104 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"bytes"
1010
"cmp"
1111
"encoding/binary"
12+
"errors"
1213
"fmt"
1314
"io"
1415
"slices"
@@ -32,22 +33,102 @@ type generation struct {
3233
*evTable
3334
}
3435

36+
// readGeneration buffers and decodes the structural elements of a trace generation
37+
// out of r.
38+
func readGeneration(r *bufio.Reader, ver version.Version) (*generation, error) {
39+
if ver < version.Go126 {
40+
return nil, errors.New("internal error: readGeneration called for <1.26 trace")
41+
}
42+
g := &generation{
43+
evTable: &evTable{
44+
pcs: make(map[uint64]frame),
45+
},
46+
batches: make(map[ThreadID][]batch),
47+
}
48+
49+
// Read batches one at a time until we either hit the next generation.
50+
for {
51+
b, gen, err := readBatch(r)
52+
if err == io.EOF {
53+
if len(g.batches) != 0 {
54+
return nil, errors.New("incomplete generation found; trace likely truncated")
55+
}
56+
return nil, nil // All done.
57+
}
58+
if err != nil {
59+
return nil, err
60+
}
61+
if g.gen == 0 {
62+
// Initialize gen.
63+
g.gen = gen
64+
}
65+
if b.isEndOfGeneration() {
66+
break
67+
}
68+
if gen == 0 {
69+
// 0 is a sentinel used by the runtime, so we'll never see it.
70+
return nil, fmt.Errorf("invalid generation number %d", gen)
71+
}
72+
if gen != g.gen {
73+
return nil, fmt.Errorf("broken trace: missing end-of-generation event, or generations are interleaved")
74+
}
75+
if g.minTs == 0 || b.time < g.minTs {
76+
g.minTs = b.time
77+
}
78+
if err := processBatch(g, b, ver); err != nil {
79+
return nil, err
80+
}
81+
}
82+
83+
// Check some invariants.
84+
if g.freq == 0 {
85+
return nil, fmt.Errorf("no frequency event found")
86+
}
87+
if !g.hasClockSnapshot {
88+
return nil, fmt.Errorf("no clock snapshot event found")
89+
}
90+
91+
// N.B. Trust that the batch order is correct. We can't validate the batch order
92+
// by timestamp because the timestamps could just be plain wrong. The source of
93+
// truth is the order things appear in the trace and the partial order sequence
94+
// numbers on certain events. If it turns out the batch order is actually incorrect
95+
// we'll very likely fail to advance a partial order from the frontier.
96+
97+
// Compactify stacks and strings for better lookup performance later.
98+
g.stacks.compactify()
99+
g.strings.compactify()
100+
101+
// Validate stacks.
102+
if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
103+
return nil, err
104+
}
105+
106+
// Now that we have the frequency, fix up CPU samples.
107+
fixUpCPUSamples(g.cpuSamples, g.freq)
108+
return g, nil
109+
}
110+
35111
// spilledBatch represents a batch that was read out for the next generation,
36112
// while reading the previous one. It's passed on when parsing the next
37113
// generation.
114+
//
115+
// Used only for trace versions < Go126.
38116
type spilledBatch struct {
39117
gen uint64
40118
*batch
41119
}
42120

43-
// readGeneration buffers and decodes the structural elements of a trace generation
121+
// readGenerationWithSpill buffers and decodes the structural elements of a trace generation
44122
// out of r. spill is the first batch of the new generation (already buffered and
45123
// parsed from reading the last generation). Returns the generation and the first
46124
// batch read of the next generation, if any.
47125
//
48126
// If gen is non-nil, it is valid and must be processed before handling the returned
49127
// error.
50-
func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
128+
func readGenerationWithSpill(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
129+
if ver >= version.Go126 {
130+
return nil, nil, errors.New("internal error: readGenerationWithSpill called for Go 1.26+ trace")
131+
}
51132
g := &generation{
52133
evTable: &evTable{
53134
pcs: make(map[uint64]frame),
@@ -56,15 +137,15 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
56137
}
57138
// Process the spilled batch.
58139
if spill != nil {
140+
// Process the spilled batch, which contains real data.
59141
g.gen = spill.gen
60142
g.minTs = spill.batch.time
61143
if err := processBatch(g, *spill.batch, ver); err != nil {
62144
return nil, nil, err
63145
}
64146
spill = nil
65147
}
66-
// Read batches one at a time until we either hit EOF or
67-
// the next generation.
148+
// Read batches one at a time until we either hit the next generation.
68149
var spillErr error
69150
for {
70151
b, gen, err := readBatch(r)
@@ -73,7 +154,7 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
73154
}
74155
if err != nil {
75156
if g.gen != 0 {
76-
// This is an error reading the first batch of the next generation.
157+
// This may be an error reading the first batch of the next generation.
77158
// This is fine. Let's forge ahead assuming that what we've got so
78159
// far is fine.
79160
spillErr = err
@@ -89,7 +170,8 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
89170
// Initialize gen.
90171
g.gen = gen
91172
}
92-
if gen == g.gen+1 { // TODO: advance this the same way the runtime does.
173+
if gen == g.gen+1 {
174+
// TODO: Increment the generation with wraparound the same way the runtime does.
93175
spill = &spilledBatch{gen: gen, batch: &b}
94176
break
95177
}
@@ -134,15 +216,8 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
134216
return nil, nil, err
135217
}
136218

137-
// Fix up the CPU sample timestamps, now that we have freq.
138-
for i := range g.cpuSamples {
139-
s := &g.cpuSamples[i]
140-
s.time = g.freq.mul(timestamp(s.time))
141-
}
142-
// Sort the CPU samples.
143-
slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
144-
return cmp.Compare(a.time, b.time)
145-
})
219+
// Now that we have the frequency, fix up CPU samples.
220+
fixUpCPUSamples(g.cpuSamples, g.freq)
146221
return g, spill, spillErr
147222
}
148223

@@ -174,6 +249,8 @@ func processBatch(g *generation, b batch, ver version.Version) error {
174249
if err := addExperimentalBatch(g.expBatches, b); err != nil {
175250
return err
176251
}
252+
case b.isEndOfGeneration():
253+
return errors.New("internal error: unexpectedly processing EndOfGeneration; broken trace?")
177254
default:
178255
if _, ok := g.batches[b.m]; !ok {
179256
g.batchMs = append(g.batchMs, b.m)
@@ -512,3 +589,15 @@ func addExperimentalBatch(expBatches map[tracev2.Experiment][]ExperimentalBatch,
512589
})
513590
return nil
514591
}
592+
593+
func fixUpCPUSamples(samples []cpuSample, freq frequency) {
594+
// Fix up the CPU sample timestamps.
595+
for i := range samples {
596+
s := &samples[i]
597+
s.time = freq.mul(timestamp(s.time))
598+
}
599+
// Sort the CPU samples.
600+
slices.SortFunc(samples, func(a, b cpuSample) int {
601+
return cmp.Compare(a.time, b.time)
602+
})
603+
}

src/internal/trace/internal/testgen/trace.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,14 @@ func (g *Generation) writeEventsTo(tw *raw.TextWriter) {
322322
}
323323
}
324324
b.writeEventsTo(tw)
325+
326+
// Write end-of-generation event if necessary.
327+
if g.trace.ver >= version.Go126 {
328+
tw.WriteEvent(raw.Event{
329+
Version: g.trace.ver,
330+
Ev: tracev2.EvEndOfGeneration,
331+
})
332+
}
325333
}
326334

327335
func (g *Generation) newStructuralBatch() *Batch {

0 commit comments

Comments
 (0)