Skip to content

Commit f19de6c

Browse files
committed
tracing,execinfrapb: include child metadata in trace agg metas
Release note: none. Epic: none.
1 parent 6ba3e71 commit f19de6c

File tree

9 files changed

+54
-11
lines changed

9 files changed

+54
-11
lines changed

pkg/backup/backup_job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ func backup(
332332
// Update the running aggregate of the component with the latest received
333333
// aggregate.
334334
resumer.mu.Lock()
335-
resumer.mu.perNodeAggregatorStats[componentID] = agg.Events
335+
resumer.mu.perNodeAggregatorStats[componentID] = *agg
336336
resumer.mu.Unlock()
337337
}
338338
return nil

pkg/backup/restore_job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ func restore(
498498
// Update the running aggregate of the component with the latest received
499499
// aggregate.
500500
resumer.mu.Lock()
501-
resumer.mu.perNodeAggregatorStats[componentID] = agg.Events
501+
resumer.mu.perNodeAggregatorStats[componentID] = *agg
502502
resumer.mu.Unlock()
503503
}
504504
return nil

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ func (rh *rowHandler) handleTraceAgg(agg *execinfrapb.TracingAggregatorEvents) {
751751
// aggregate.
752752
rh.r.mu.Lock()
753753
defer rh.r.mu.Unlock()
754-
rh.r.mu.perNodeAggregatorStats[componentID] = agg.Events
754+
rh.r.mu.perNodeAggregatorStats[componentID] = *agg
755755
}
756756

757757
func (rh *rowHandler) handleMeta(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {

pkg/crosscluster/physical/stream_ingestion_dist.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func startDistIngestion(
172172
// Update the running aggregate of the component with the latest received
173173
// aggregate.
174174
resumer.mu.Lock()
175-
resumer.mu.perNodeAggregatorStats[componentID] = agg.Events
175+
resumer.mu.perNodeAggregatorStats[componentID] = *agg
176176
resumer.mu.Unlock()
177177
}
178178
return nil

pkg/sql/execinfrapb/data.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,4 +379,6 @@ message TracingAggregatorEvents {
379379
(gogoproto.customname) = "FlowID",
380380
(gogoproto.customtype) = "FlowID"];
381381
map<string, bytes> events = 3;
382+
383+
map<string, util.tracing.tracingpb.OperationMetadata> span_totals = 4 [(gogoproto.nullable) = false];
382384
}

pkg/util/bulk/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ go_library(
1818
"//pkg/util/log",
1919
"//pkg/util/timeutil",
2020
"//pkg/util/tracing",
21+
"//pkg/util/tracing/tracingpb",
2122
],
2223
)

pkg/util/bulk/aggregator_stats.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/util/log"
2020
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2121
"github.com/cockroachdb/cockroach/pkg/util/tracing"
22+
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
2223
)
2324

2425
// ConstructTracingAggregatorProducerMeta constructs a ProducerMetadata that
@@ -45,25 +46,37 @@ func ConstructTracingAggregatorProducerMeta(
4546
}
4647
})
4748

49+
sp := tracing.SpanFromContext(ctx)
50+
if sp != nil {
51+
recType := sp.RecordingType()
52+
if recType != tracingpb.RecordingOff {
53+
aggEvents.SpanTotals = sp.GetFullTraceRecording(recType).Root.ChildrenMetadata
54+
}
55+
}
4856
return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents}
4957
}
5058

5159
// ComponentAggregatorStats is a mapping from a component to all the Aggregator
5260
// Stats collected for that component.
53-
type ComponentAggregatorStats map[execinfrapb.ComponentID]map[string][]byte
61+
type ComponentAggregatorStats map[execinfrapb.ComponentID]execinfrapb.TracingAggregatorEvents
5462

5563
// DeepCopy takes a deep copy of the component aggregator stats map.
5664
func (c ComponentAggregatorStats) DeepCopy() ComponentAggregatorStats {
5765
mapCopy := make(ComponentAggregatorStats, len(c))
5866
for k, v := range c {
59-
innerMap := make(map[string][]byte, len(v))
60-
for k2, v2 := range v {
67+
copied := v
68+
copied.Events = make(map[string][]byte, len(v.Events))
69+
copied.SpanTotals = make(map[string]tracingpb.OperationMetadata, len(v.SpanTotals))
70+
for k2, v2 := range v.Events {
6171
// Create a copy of the byte slice to avoid modifying the original data.
6272
dataCopy := make([]byte, len(v2))
6373
copy(dataCopy, v2)
64-
innerMap[k2] = dataCopy
74+
copied.Events[k2] = dataCopy
6575
}
66-
mapCopy[k] = innerMap
76+
for k2, v2 := range v.SpanTotals {
77+
copied.SpanTotals[k2] = v2
78+
}
79+
mapCopy[k] = copied
6780
}
6881
return mapCopy
6982
}
@@ -84,13 +97,17 @@ func FlushTracingAggregatorStats(
8497
) error {
8598
return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
8699
clusterWideAggregatorStats := make(map[string]tracing.AggregatorEvent)
100+
clusterWideOpMetadata := make(map[string]tracingpb.OperationMetadata)
87101
asOf := timeutil.Now().Format("20060102_150405.00")
88102

89103
var clusterWideSummary bytes.Buffer
90104
for component, nameToEvent := range perNodeAggregatorStats {
91-
clusterWideSummary.WriteString(fmt.Sprintf("## SQL Instance ID: %s; Flow ID: %s\n\n",
105+
clusterWideSummary.WriteString(fmt.Sprintf("## SQL Instance ID: %s; Flow ID: %s\n",
92106
component.SQLInstanceID.String(), component.FlowID.String()))
93-
for name, event := range nameToEvent {
107+
108+
clusterWideSummary.WriteString("### aggregated events\n\n")
109+
110+
for name, event := range nameToEvent.Events {
94111
// Write a proto file per tag. This machine-readable file can be consumed
95112
// by other places we want to display this information egs: annotated
96113
// DistSQL diagrams, DBConsole etc.
@@ -122,6 +139,13 @@ func FlushTracingAggregatorStats(
122139
clusterWideAggregatorStats[name] = aggEvent
123140
}
124141
}
142+
143+
clusterWideSummary.WriteString("### span metadata\n\n")
144+
145+
for name, metadata := range nameToEvent.SpanTotals {
146+
fmt.Fprintf(&clusterWideSummary, " - %s (%d): %s\n", name, metadata.Count, metadata.Duration)
147+
clusterWideOpMetadata[name] = clusterWideOpMetadata[name].Combine(metadata)
148+
}
125149
}
126150

127151
for tag, event := range clusterWideAggregatorStats {

pkg/util/tracing/span.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,16 @@ func (sp *Span) GetRecording(recType tracingpb.RecordingType) tracingpb.Recordin
315315
return sp.i.GetRecording(recType, false /* finishing */)
316316
}
317317

318+
func (sp *Span) GetFullTraceRecording(recType tracingpb.RecordingType) Trace {
319+
if sp.detectUseAfterFinish() {
320+
return Trace{}
321+
}
322+
if sp.RecordingType() == tracingpb.RecordingOff {
323+
return Trace{}
324+
}
325+
return sp.i.GetFullTraceRecording(recType)
326+
}
327+
318328
// GetConfiguredRecording is like GetRecording, except the type of recording it
319329
// returns is the one that the span has been previously configured with.
320330
//

pkg/util/tracing/span_inner.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ func (s *spanInner) GetTraceRecording(recType tracingpb.RecordingType, finishing
6565
return s.crdb.GetRecording(recType, finishing)
6666
}
6767

68+
// GetFullTraceRecording returns the span's full recording, including detached
69+
// children, as a Trace. See GetTraceRecording and WithDetachedRecording.
70+
func (s *spanInner) GetFullTraceRecording(recType tracingpb.RecordingType) Trace {
71+
return s.crdb.GetFullRecording(recType)
72+
}
73+
6874
// GetRecording returns the span's recording.
6975
//
7076
// finishing indicates whether s is in the process of finishing. If it isn't,

0 commit comments

Comments
 (0)