|
9 | 9 | "bytes" |
10 | 10 | "context" |
11 | 11 | "fmt" |
| 12 | + "sort" |
12 | 13 |
|
13 | 14 | "github.com/cockroachdb/cockroach/pkg/base" |
14 | 15 | "github.com/cockroachdb/cockroach/pkg/jobs" |
@@ -95,69 +96,76 @@ func FlushTracingAggregatorStats( |
95 | 96 | db isql.DB, |
96 | 97 | perNodeAggregatorStats ComponentAggregatorStats, |
97 | 98 | ) error { |
98 | | - return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { |
99 | | - clusterWideAggregatorStats := make(map[string]tracing.AggregatorEvent) |
100 | | - clusterWideOpMetadata := make(map[string]tracingpb.OperationMetadata) |
101 | | - asOf := timeutil.Now().Format("20060102_150405.00") |
| 99 | + clusterWideSpanStats := make(map[string]tracingpb.OperationMetadata) |
| 100 | + clusterWideAggregatorStats := make(map[string]tracing.AggregatorEvent) |
| 101 | + ids := make([]execinfrapb.ComponentID, 0, len(perNodeAggregatorStats)) |
102 | 102 |
|
103 | | - var clusterWideSummary bytes.Buffer |
104 | | - for component, nameToEvent := range perNodeAggregatorStats { |
105 | | - clusterWideSummary.WriteString(fmt.Sprintf("## SQL Instance ID: %s; Flow ID: %s\n", |
106 | | - component.SQLInstanceID.String(), component.FlowID.String())) |
107 | | - |
108 | | - clusterWideSummary.WriteString("### aggregated events\n\n") |
109 | | - |
110 | | - for name, event := range nameToEvent.Events { |
111 | | - // Write a proto file per tag. This machine-readable file can be consumed |
112 | | - // by other places we want to display this information egs: annotated |
113 | | - // DistSQL diagrams, DBConsole etc. |
114 | | - filename := fmt.Sprintf("%s/%s", |
115 | | - component.SQLInstanceID.String(), asOf) |
116 | | - msg, err := protoreflect.DecodeMessage(name, event) |
117 | | - if err != nil { |
118 | | - clusterWideSummary.WriteString(fmt.Sprintf("invalid protocol message: %v", err)) |
119 | | - // If we failed to decode the event write the error to the file and |
120 | | - // carry on. |
121 | | - continue |
122 | | - } |
123 | | - |
124 | | - if err := jobs.WriteProtobinExecutionDetailFile(ctx, filename, msg, txn, jobID); err != nil { |
125 | | - return err |
126 | | - } |
127 | | - |
128 | | - // Construct a single text file that contains information on a per-node |
129 | | - // basis as well as a cluster-wide aggregate. |
130 | | - clusterWideSummary.WriteString(fmt.Sprintf("# %s\n", name)) |
131 | | - |
132 | | - aggEvent := msg.(tracing.AggregatorEvent) |
133 | | - clusterWideSummary.WriteString(aggEvent.String()) |
134 | | - clusterWideSummary.WriteString("\n") |
135 | | - |
136 | | - if _, ok := clusterWideAggregatorStats[name]; ok { |
137 | | - clusterWideAggregatorStats[name].Combine(aggEvent) |
138 | | - } else { |
139 | | - clusterWideAggregatorStats[name] = aggEvent |
140 | | - } |
141 | | - } |
| 103 | + for component := range perNodeAggregatorStats { |
| 104 | + ids = append(ids, component) |
| 105 | + } |
| 106 | + sort.Slice(ids, func(i, j int) bool { return ids[i].SQLInstanceID < ids[j].SQLInstanceID }) |
| 107 | + |
| 108 | + // Write a summary for each per-node to a buffer. While doing so, accumulate a |
| 109 | + // cluster-wide summary as well to be written to a second buffer below. |
| 110 | + var perNode bytes.Buffer |
| 111 | + fmt.Fprintf(&perNode, "# Per-componant Details (%d)\n", len(perNodeAggregatorStats)) |
| 112 | + for _, component := range ids { |
| 113 | + nodeStats := perNodeAggregatorStats[component] |
| 114 | + fmt.Fprintf(&perNode, "# SQL Instance ID: %s (%s); Flow/proc ID: %s/%d\n\n", |
| 115 | + component.SQLInstanceID, component.Region, component.FlowID, component.ID) |
| 116 | + |
| 117 | + // Print span stats. |
| 118 | + perNode.WriteString("## Span Totals\n\n") |
| 119 | + for name, stats := range nodeStats.SpanTotals { |
| 120 | + fmt.Fprintf(&perNode, "- %-40s (%d):\t%s\n", name, stats.Count, stats.Duration) |
| 121 | + } |
| 122 | + perNode.WriteString("\n") |
142 | 123 |
|
143 | | - clusterWideSummary.WriteString("### span metadata\n\n") |
| 124 | + // Add span stats to the cluster-wide span stats. |
| 125 | + for spanName, totals := range nodeStats.SpanTotals { |
| 126 | + clusterWideSpanStats[spanName] = clusterWideSpanStats[spanName].Combine(totals) |
| 127 | + } |
144 | 128 |
|
145 | | - for name, metadata := range nameToEvent.SpanTotals { |
146 | | - fmt.Fprintf(&clusterWideSummary, " - %s (%d): %s\n", name, metadata.Count, metadata.Duration) |
147 | | - clusterWideOpMetadata[name] = clusterWideOpMetadata[name].Combine(metadata) |
| 129 | + perNode.WriteString("## Aggregate Stats\n\n") |
| 130 | + for name, event := range nodeStats.Events { |
| 131 | + msg, err := protoreflect.DecodeMessage(name, event) |
| 132 | + if err != nil { |
| 133 | + continue |
| 134 | + } |
| 135 | + aggEvent := msg.(tracing.AggregatorEvent) |
| 136 | + fmt.Fprintf(&perNode, "- %s:\n%s\n\n", name, aggEvent) |
| 137 | + |
| 138 | + // Populate the cluster-wide aggregator stats. |
| 139 | + if _, ok := clusterWideAggregatorStats[name]; ok { |
| 140 | + clusterWideAggregatorStats[name].Combine(aggEvent) |
| 141 | + } else { |
| 142 | + clusterWideAggregatorStats[name] = aggEvent |
148 | 143 | } |
149 | 144 | } |
| 145 | + perNode.WriteString("\n") |
| 146 | + } |
150 | 147 |
|
151 | | - for tag, event := range clusterWideAggregatorStats { |
152 | | - clusterWideSummary.WriteString("## Cluster-wide\n\n") |
153 | | - clusterWideSummary.WriteString(fmt.Sprintf("# %s\n", tag)) |
154 | | - clusterWideSummary.WriteString(event.String()) |
155 | | - } |
| 148 | + // Write the cluster-wide summary. |
| 149 | + var combined bytes.Buffer |
| 150 | + combined.WriteString("# Cluster-wide\n\n") |
| 151 | + combined.WriteString("## Span Totals\n\n") |
| 152 | + for name, stats := range clusterWideSpanStats { |
| 153 | + fmt.Fprintf(&combined, " - %-40s (%d):\t%s\n", name, stats.Count, stats.Duration) |
| 154 | + } |
| 155 | + combined.WriteString("\n") |
| 156 | + combined.WriteString("## Aggregate Stats\n\n") |
| 157 | + for name, ev := range clusterWideAggregatorStats { |
| 158 | + fmt.Fprintf(&combined, " - %s:\n%s\n", name, ev) |
| 159 | + } |
| 160 | + combined.WriteString("\n") |
156 | 161 |
|
157 | | - // Ensure the file always has a trailing newline, regardless of whether or |
158 | | - // not the loops above wrote anything. |
159 | | - clusterWideSummary.WriteString("\n") |
160 | | - filename := fmt.Sprintf("aggregatorstats.%s.txt", asOf) |
161 | | - return jobs.WriteExecutionDetailFile(ctx, filename, clusterWideSummary.Bytes(), txn, jobID) |
| 162 | + return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { |
| 163 | + asOf := timeutil.Now().Format("20060102_150405.00") |
| 164 | + combinedFilename := fmt.Sprintf("%s/trace-stats-cluster-wide.txt", asOf) |
| 165 | + perNodeFilename := fmt.Sprintf("%s/trace-stats-by-node.txt", asOf) |
| 166 | + if err := jobs.WriteExecutionDetailFile(ctx, combinedFilename, combined.Bytes(), txn, jobID); err != nil { |
| 167 | + return err |
| 168 | + } |
| 169 | + return jobs.WriteExecutionDetailFile(ctx, perNodeFilename, perNode.Bytes(), txn, jobID) |
162 | 170 | }) |
163 | 171 | } |
0 commit comments