Skip to content

Commit b1fab87

Browse files
craig[bot]dt
andcommitted
Merge #143090
143090: tracing,execinfrapb: include child metadata in trace agg metas r=dt a=dt This captures and includes in aggregated trace info the "child span metadata", i.e. the occurrence count count and sum duration of each uniquely named child span. These are included in the "per-component" and "cluster wide" trace dumps utilized by various jobs' advance debugging tools. Release note: none. Epic: none. Co-authored-by: David Taylor <[email protected]>
2 parents 697f493 + 4164a3c commit b1fab87

File tree

12 files changed

+127
-73
lines changed

12 files changed

+127
-73
lines changed

pkg/backup/backup_job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ func backup(
332332
// Update the running aggregate of the component with the latest received
333333
// aggregate.
334334
resumer.mu.Lock()
335-
resumer.mu.perNodeAggregatorStats[componentID] = agg.Events
335+
resumer.mu.perNodeAggregatorStats[componentID] = *agg
336336
resumer.mu.Unlock()
337337
}
338338
return nil

pkg/backup/restore_job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ func restore(
498498
// Update the running aggregate of the component with the latest received
499499
// aggregate.
500500
resumer.mu.Lock()
501-
resumer.mu.perNodeAggregatorStats[componentID] = agg.Events
501+
resumer.mu.perNodeAggregatorStats[componentID] = *agg
502502
resumer.mu.Unlock()
503503
}
504504
return nil

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ func (rh *rowHandler) handleTraceAgg(agg *execinfrapb.TracingAggregatorEvents) {
751751
// aggregate.
752752
rh.r.mu.Lock()
753753
defer rh.r.mu.Unlock()
754-
rh.r.mu.perNodeAggregatorStats[componentID] = agg.Events
754+
rh.r.mu.perNodeAggregatorStats[componentID] = *agg
755755
}
756756

757757
func (rh *rowHandler) handleMeta(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {

pkg/crosscluster/physical/stream_ingestion_dist.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func startDistIngestion(
172172
// Update the running aggregate of the component with the latest received
173173
// aggregate.
174174
resumer.mu.Lock()
175-
resumer.mu.perNodeAggregatorStats[componentID] = agg.Events
175+
resumer.mu.perNodeAggregatorStats[componentID] = *agg
176176
resumer.mu.Unlock()
177177
}
178178
return nil

pkg/jobs/adopt.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, j
7676
}
7777

7878
resumerTraceFilename := fmt.Sprintf("%s/resumer-trace/%s",
79-
r.ID().String(), timeutil.Now().Format("20060102_150405.00"))
79+
timeutil.Now().Format("20060102_150405.00"), r.ID().String())
8080
td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()}
8181
if err := r.db.Txn(dumpCtx, func(ctx context.Context, txn isql.Txn) error {
8282
return WriteProtobinExecutionDetailFile(dumpCtx, resumerTraceFilename, &td, txn, jobID)

pkg/server/job_profiler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func (e *executionDetailsBuilder) addLabelledGoroutines(ctx context.Context) {
127127
log.Errorf(ctx, "failed to collect goroutines for job %d: %v", e.jobID, err.Error())
128128
return
129129
}
130-
filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00"))
130+
filename := fmt.Sprintf("%s/job-goroutines.txt", timeutil.Now().Format("20060102_150405.00"))
131131
if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
132132
return jobs.WriteExecutionDetailFile(ctx, filename, resp.Data, txn, e.jobID)
133133
}); err != nil {
@@ -146,7 +146,7 @@ func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) {
146146
}
147147
if row != nil && row[0] != tree.DNull {
148148
dspDiagramURL := string(tree.MustBeDString(row[0]))
149-
filename := fmt.Sprintf("distsql.%s.html", timeutil.Now().Format("20060102_150405.00"))
149+
filename := fmt.Sprintf("%s/distsql-plan.html", timeutil.Now().Format("20060102_150405.00"))
150150
if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
151151
return jobs.WriteExecutionDetailFile(ctx, filename,
152152
[]byte(fmt.Sprintf(`<meta http-equiv="Refresh" content="0; url=%s">`, dspDiagramURL)),
@@ -172,7 +172,7 @@ func (e *executionDetailsBuilder) addClusterWideTraces(ctx context.Context) {
172172
return
173173
}
174174

175-
filename := fmt.Sprintf("trace.%s.zip", timeutil.Now().Format("20060102_150405.00"))
175+
filename := fmt.Sprintf("%s/trace.zip", timeutil.Now().Format("20060102_150405.00"))
176176
if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
177177
return jobs.WriteExecutionDetailFile(ctx, filename, zippedTrace, txn, e.jobID)
178178
}); err != nil {

pkg/sql/execinfrapb/data.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,4 +379,6 @@ message TracingAggregatorEvents {
379379
(gogoproto.customname) = "FlowID",
380380
(gogoproto.customtype) = "FlowID"];
381381
map<string, bytes> events = 3;
382+
383+
map<string, util.tracing.tracingpb.OperationMetadata> span_totals = 4 [(gogoproto.nullable) = false];
382384
}

pkg/sql/jobs_profiler_execution_details_test.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -378,12 +378,12 @@ func TestListProfilerExecutionDetails(t *testing.T) {
378378
files := listExecutionDetails(t, s, jobspb.JobID(importJobID))
379379

380380
patterns := []string{
381-
"distsql\\..*\\.html",
381+
".*/distsql-plan.html",
382382
}
383383
if !s.DeploymentMode().IsExternal() {
384-
patterns = append(patterns, "goroutines\\..*\\.txt")
384+
patterns = append(patterns, ".*/job-goroutines.txt")
385385
}
386-
patterns = append(patterns, "trace\\..*\\.zip")
386+
patterns = append(patterns, ".*/trace.zip")
387387

388388
require.Len(t, files, len(patterns))
389389
for i, pattern := range patterns {
@@ -426,17 +426,19 @@ func TestListProfilerExecutionDetails(t *testing.T) {
426426
return nil
427427
})
428428
patterns = []string{
429-
"[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb",
430-
"[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb.txt",
431-
"[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb",
432-
"[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb.txt",
433-
"distsql\\..*\\.html",
434-
"distsql\\..*\\.html",
429+
".*/distsql-plan.html",
430+
".*/distsql-plan.html",
435431
}
436432
if !s.DeploymentMode().IsExternal() {
437-
patterns = append(patterns, "goroutines\\..*\\.txt", "goroutines\\..*\\.txt")
433+
patterns = append(patterns, ".*/job-goroutines.txt", ".*/job-goroutines.txt")
438434
}
439-
patterns = append(patterns, "trace\\..*\\.zip", "trace\\..*\\.zip")
435+
patterns = append(patterns,
436+
"[0-9_.]*/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb",
437+
"[0-9_.]*/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb",
438+
"[0-9_.]*/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb.txt",
439+
"[0-9_.]*/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb.txt",
440+
)
441+
patterns = append(patterns, ".*/trace.zip", ".*/trace.zip")
440442
for i, pattern := range patterns {
441443
require.Regexp(t, pattern, files[i])
442444
}
@@ -465,8 +467,9 @@ func listExecutionDetails(
465467

466468
edResp := serverpb.ListJobProfilerExecutionDetailsResponse{}
467469
require.NoError(t, protoutil.Unmarshal(body, &edResp))
470+
// Sort the responses with the variable date/time digits in the prefix removed.
468471
sort.Slice(edResp.Files, func(i, j int) bool {
469-
return edResp.Files[i] < edResp.Files[j]
472+
return strings.TrimLeft(edResp.Files[i], "0123456789_.") < strings.TrimLeft(edResp.Files[j], "0123456789_.")
470473
})
471474
return edResp.Files
472475
}

pkg/util/bulk/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ go_library(
1818
"//pkg/util/log",
1919
"//pkg/util/timeutil",
2020
"//pkg/util/tracing",
21+
"//pkg/util/tracing/tracingpb",
2122
],
2223
)

pkg/util/bulk/aggregator_stats.go

Lines changed: 85 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"bytes"
1010
"context"
1111
"fmt"
12+
"sort"
1213

1314
"github.com/cockroachdb/cockroach/pkg/base"
1415
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -19,6 +20,7 @@ import (
1920
"github.com/cockroachdb/cockroach/pkg/util/log"
2021
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2122
"github.com/cockroachdb/cockroach/pkg/util/tracing"
23+
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
2224
)
2325

2426
// ConstructTracingAggregatorProducerMeta constructs a ProducerMetadata that
@@ -45,25 +47,37 @@ func ConstructTracingAggregatorProducerMeta(
4547
}
4648
})
4749

50+
sp := tracing.SpanFromContext(ctx)
51+
if sp != nil {
52+
recType := sp.RecordingType()
53+
if recType != tracingpb.RecordingOff {
54+
aggEvents.SpanTotals = sp.GetFullTraceRecording(recType).Root.ChildrenMetadata
55+
}
56+
}
4857
return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents}
4958
}
5059

5160
// ComponentAggregatorStats is a mapping from a component to all the Aggregator
5261
// Stats collected for that component.
53-
type ComponentAggregatorStats map[execinfrapb.ComponentID]map[string][]byte
62+
type ComponentAggregatorStats map[execinfrapb.ComponentID]execinfrapb.TracingAggregatorEvents
5463

5564
// DeepCopy takes a deep copy of the component aggregator stats map.
5665
func (c ComponentAggregatorStats) DeepCopy() ComponentAggregatorStats {
5766
mapCopy := make(ComponentAggregatorStats, len(c))
5867
for k, v := range c {
59-
innerMap := make(map[string][]byte, len(v))
60-
for k2, v2 := range v {
68+
copied := v
69+
copied.Events = make(map[string][]byte, len(v.Events))
70+
copied.SpanTotals = make(map[string]tracingpb.OperationMetadata, len(v.SpanTotals))
71+
for k2, v2 := range v.Events {
6172
// Create a copy of the byte slice to avoid modifying the original data.
6273
dataCopy := make([]byte, len(v2))
6374
copy(dataCopy, v2)
64-
innerMap[k2] = dataCopy
75+
copied.Events[k2] = dataCopy
76+
}
77+
for k2, v2 := range v.SpanTotals {
78+
copied.SpanTotals[k2] = v2
6579
}
66-
mapCopy[k] = innerMap
80+
mapCopy[k] = copied
6781
}
6882
return mapCopy
6983
}
@@ -82,58 +96,76 @@ func FlushTracingAggregatorStats(
8296
db isql.DB,
8397
perNodeAggregatorStats ComponentAggregatorStats,
8498
) error {
85-
return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
86-
clusterWideAggregatorStats := make(map[string]tracing.AggregatorEvent)
87-
asOf := timeutil.Now().Format("20060102_150405.00")
99+
clusterWideSpanStats := make(map[string]tracingpb.OperationMetadata)
100+
clusterWideAggregatorStats := make(map[string]tracing.AggregatorEvent)
101+
ids := make([]execinfrapb.ComponentID, 0, len(perNodeAggregatorStats))
88102

89-
var clusterWideSummary bytes.Buffer
90-
for component, nameToEvent := range perNodeAggregatorStats {
91-
clusterWideSummary.WriteString(fmt.Sprintf("## SQL Instance ID: %s; Flow ID: %s\n\n",
92-
component.SQLInstanceID.String(), component.FlowID.String()))
93-
for name, event := range nameToEvent {
94-
// Write a proto file per tag. This machine-readable file can be consumed
95-
// by other places we want to display this information egs: annotated
96-
// DistSQL diagrams, DBConsole etc.
97-
filename := fmt.Sprintf("%s/%s",
98-
component.SQLInstanceID.String(), asOf)
99-
msg, err := protoreflect.DecodeMessage(name, event)
100-
if err != nil {
101-
clusterWideSummary.WriteString(fmt.Sprintf("invalid protocol message: %v", err))
102-
// If we failed to decode the event write the error to the file and
103-
// carry on.
104-
continue
105-
}
106-
107-
if err := jobs.WriteProtobinExecutionDetailFile(ctx, filename, msg, txn, jobID); err != nil {
108-
return err
109-
}
110-
111-
// Construct a single text file that contains information on a per-node
112-
// basis as well as a cluster-wide aggregate.
113-
clusterWideSummary.WriteString(fmt.Sprintf("# %s\n", name))
114-
115-
aggEvent := msg.(tracing.AggregatorEvent)
116-
clusterWideSummary.WriteString(aggEvent.String())
117-
clusterWideSummary.WriteString("\n")
118-
119-
if _, ok := clusterWideAggregatorStats[name]; ok {
120-
clusterWideAggregatorStats[name].Combine(aggEvent)
121-
} else {
122-
clusterWideAggregatorStats[name] = aggEvent
123-
}
124-
}
103+
for component := range perNodeAggregatorStats {
104+
ids = append(ids, component)
105+
}
106+
sort.Slice(ids, func(i, j int) bool { return ids[i].SQLInstanceID < ids[j].SQLInstanceID })
107+
108+
// Write a summary for each per-node to a buffer. While doing so, accumulate a
109+
// cluster-wide summary as well to be written to a second buffer below.
110+
var perNode bytes.Buffer
111+
fmt.Fprintf(&perNode, "# Per-componant Details (%d)\n", len(perNodeAggregatorStats))
112+
for _, component := range ids {
113+
nodeStats := perNodeAggregatorStats[component]
114+
fmt.Fprintf(&perNode, "# SQL Instance ID: %s (%s); Flow/proc ID: %s/%d\n\n",
115+
component.SQLInstanceID, component.Region, component.FlowID, component.ID)
116+
117+
// Print span stats.
118+
perNode.WriteString("## Span Totals\n\n")
119+
for name, stats := range nodeStats.SpanTotals {
120+
fmt.Fprintf(&perNode, "- %-40s (%d):\t%s\n", name, stats.Count, stats.Duration)
121+
}
122+
perNode.WriteString("\n")
123+
124+
// Add span stats to the cluster-wide span stats.
125+
for spanName, totals := range nodeStats.SpanTotals {
126+
clusterWideSpanStats[spanName] = clusterWideSpanStats[spanName].Combine(totals)
125127
}
126128

127-
for tag, event := range clusterWideAggregatorStats {
128-
clusterWideSummary.WriteString("## Cluster-wide\n\n")
129-
clusterWideSummary.WriteString(fmt.Sprintf("# %s\n", tag))
130-
clusterWideSummary.WriteString(event.String())
129+
perNode.WriteString("## Aggregate Stats\n\n")
130+
for name, event := range nodeStats.Events {
131+
msg, err := protoreflect.DecodeMessage(name, event)
132+
if err != nil {
133+
continue
134+
}
135+
aggEvent := msg.(tracing.AggregatorEvent)
136+
fmt.Fprintf(&perNode, "- %s:\n%s\n\n", name, aggEvent)
137+
138+
// Populate the cluster-wide aggregator stats.
139+
if _, ok := clusterWideAggregatorStats[name]; ok {
140+
clusterWideAggregatorStats[name].Combine(aggEvent)
141+
} else {
142+
clusterWideAggregatorStats[name] = aggEvent
143+
}
131144
}
145+
perNode.WriteString("\n")
146+
}
132147

133-
// Ensure the file always has a trailing newline, regardless of whether or
134-
// not the loops above wrote anything.
135-
clusterWideSummary.WriteString("\n")
136-
filename := fmt.Sprintf("aggregatorstats.%s.txt", asOf)
137-
return jobs.WriteExecutionDetailFile(ctx, filename, clusterWideSummary.Bytes(), txn, jobID)
148+
// Write the cluster-wide summary.
149+
var combined bytes.Buffer
150+
combined.WriteString("# Cluster-wide\n\n")
151+
combined.WriteString("## Span Totals\n\n")
152+
for name, stats := range clusterWideSpanStats {
153+
fmt.Fprintf(&combined, " - %-40s (%d):\t%s\n", name, stats.Count, stats.Duration)
154+
}
155+
combined.WriteString("\n")
156+
combined.WriteString("## Aggregate Stats\n\n")
157+
for name, ev := range clusterWideAggregatorStats {
158+
fmt.Fprintf(&combined, " - %s:\n%s\n", name, ev)
159+
}
160+
combined.WriteString("\n")
161+
162+
return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
163+
asOf := timeutil.Now().Format("20060102_150405.00")
164+
combinedFilename := fmt.Sprintf("%s/trace-stats-cluster-wide.txt", asOf)
165+
perNodeFilename := fmt.Sprintf("%s/trace-stats-by-node.txt", asOf)
166+
if err := jobs.WriteExecutionDetailFile(ctx, combinedFilename, combined.Bytes(), txn, jobID); err != nil {
167+
return err
168+
}
169+
return jobs.WriteExecutionDetailFile(ctx, perNodeFilename, perNode.Bytes(), txn, jobID)
138170
})
139171
}

0 commit comments

Comments
 (0)