Skip to content

Commit 72657a7

Browse files
craig[bot]aa-joshiwenyihu6
committed
149315: cli(debug.zip): upload profiles from node subdirectories r=aa-joshi a=aa-joshi Previously, debug zip upload to Datadog would only considers and uploads cpu and heap profiles which are in node sub-directories. This was inadequate because we have cpu and heap profiles as part of `nodes/*/cpuprof` & `nodes/*/heapprof` directories which will be ignored during upload. To address this, this patch considers profiles of `cpuprof` and `heapprof` directories during the Datadog upload. This patch also updates the profile explorer link generated after upload. Epic: [CRDB-52093](https://cockroachlabs.atlassian.net/browse/CRDB-52093) Part of: CRDB-51119 Release note: None ----- PFA screenshot after changes: <img width="1330" alt="Screenshot 2025-07-09 at 11 09 58 AM" src="https://github.com/user-attachments/assets/c220e519-d55c-4a7b-9c7d-499199317e76" /> 150379: kvserver: record follower write bytes in replica load r=tbg a=wenyihu6 **kvserver: takes write and ingested bytes for recordRequestWriteBytes** Previously, recordRequestWriteBytes took a *kvadmission.StoreWriteBytes, which was unnecessary since we're only recording write and ingested bytes, not anything admission control specific. This commit changes the function to accept the raw byte counts directly. This also sets up a future change where follower replicas would not pass in StoreWriteBytes during recording. Epic: none Release note: none --- **kvserver: record follower write bytes in replica load** Previously, write bytes for range load were only tracked on leaseholder replicas during proposal, leaving write bytes written to follower replicas unaccounted for. This commit updates the tracking to include write bytes on follower replicas as well. Epic: none Release note: none --- **kvserver: record follower write bytes during recordStatsOnCommit** Previously, we recorded write bytes for follower replicas in runPostAddTriggersReplicaOnly, right before apply, which could be inaccurate since stats should be recorded at commit time. This commit moves the recording to recordStatsOnCommit by including the write bytes in the batch stats. Epic: none Release note: none --- **kvserver: rename numWriteBytes to numWriteAndIngestedBytes** This commit renames numWriteBytes in appBatchStats to numWriteAndIngestedBytes for accuracy, as it accounts for both bytes written to the replica and bytes ingested via SSTables. Epic: none Release note: none Co-authored-by: Akshay Joshi <[email protected]> Co-authored-by: wenyihu6 <[email protected]>
3 parents acd7bb6 + 2b659c8 + 3af978d commit 72657a7

File tree

8 files changed

+242
-93
lines changed

8 files changed

+242
-93
lines changed

pkg/cli/testdata/upload/profiles

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ upload-profiles
1414
=== uploading profiles
1515
Upload ID: abc-20241114000000
1616
debug zip upload debugDir --dd-api-key=dd-api-key --dd-app-key=dd-app-key --cluster=ABC --include=profiles
17-
{"start":"","end":"","attachments":["cpu.pprof","heap.pprof"],"tags_profiler":"cluster:ABC,env:debug,node_id:1,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
17+
{"start":"","end":"","attachments":["cpu.pprof"],"tags_profiler":"cluster:ABC,env:debug,file_name:cpu.pprof,node_id:1,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
18+
{"start":"","end":"","attachments":["heap.pprof"],"tags_profiler":"cluster:ABC,env:debug,file_name:heap.pprof,node_id:1,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
1819

1920

2021
# Multi-node - both profiles
@@ -39,9 +40,10 @@ upload-profiles tags=foo:bar
3940
=== uploading profiles
4041
Upload ID: abc-20241114000000
4142
debug zip upload debugDir --dd-api-key=dd-api-key --dd-app-key=dd-app-key --tags=foo:bar --cluster=ABC --include=profiles
42-
{"start":"","end":"","attachments":["cpu.pprof","heap.pprof"],"tags_profiler":"cluster:ABC,env:debug,foo:bar,node_id:1,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
43-
{"start":"","end":"","attachments":["cpu.pprof","heap.pprof"],"tags_profiler":"cluster:ABC,env:debug,foo:bar,node_id:2,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
44-
43+
{"start":"","end":"","attachments":["cpu.pprof"],"tags_profiler":"cluster:ABC,env:debug,file_name:cpu.pprof,foo:bar,node_id:1,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
44+
{"start":"","end":"","attachments":["cpu.pprof"],"tags_profiler":"cluster:ABC,env:debug,file_name:cpu.pprof,foo:bar,node_id:2,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
45+
{"start":"","end":"","attachments":["heap.pprof"],"tags_profiler":"cluster:ABC,env:debug,file_name:heap.pprof,foo:bar,node_id:1,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
46+
{"start":"","end":"","attachments":["heap.pprof"],"tags_profiler":"cluster:ABC,env:debug,file_name:heap.pprof,foo:bar,node_id:2,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
4547

4648
# Single-node - only CPU profile
4749
upload-profiles tags=customer:user-given-name,cluster:XYZ
@@ -58,7 +60,7 @@ upload-profiles tags=customer:user-given-name,cluster:XYZ
5860
=== uploading profiles
5961
Upload ID: abc-20241114000000
6062
debug zip upload debugDir --dd-api-key=dd-api-key --dd-app-key=dd-app-key --tags=customer:user-given-name,cluster:XYZ --cluster=ABC --include=profiles
61-
{"start":"","end":"","attachments":["cpu.pprof"],"tags_profiler":"cluster:XYZ,customer:user-given-name,env:debug,foo:bar,node_id:1,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
63+
{"start":"","end":"","attachments":["cpu.pprof"],"tags_profiler":"cluster:XYZ,customer:user-given-name,env:debug,file_name:cpu.pprof,foo:bar,node_id:1,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
6264

6365

6466
# Single-node - no profiles found
@@ -90,10 +92,11 @@ upload-profiles tags=env:SH
9092
=== uploading profiles
9193
Upload ID: abc-20241114000000
9294
debug zip upload debugDir --dd-api-key=dd-api-key --dd-app-key=dd-app-key --tags=env:SH --cluster=ABC --include=profiles
93-
{"start":"","end":"","attachments":["cpu.pprof","heap.pprof"],"tags_profiler":"cluster:ABC,env:SH,node_id:1,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
95+
{"start":"","end":"","attachments":["cpu.pprof"],"tags_profiler":"cluster:ABC,env:SH,file_name:cpu.pprof,node_id:1,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
96+
{"start":"","end":"","attachments":["heap.pprof"],"tags_profiler":"cluster:ABC,env:SH,file_name:heap.pprof,node_id:1,service:CRDB-SH,source:cockroachdb,upload_id:abc-20241114000000","family":"go","version":"4"}
9497

9598

96-
# Single-node - both profiles
99+
# Single-node - both profiles error
97100
upload-profiles tags=ERR
98101
{
99102
"nodes": {
@@ -107,7 +110,7 @@ upload-profiles tags=ERR
107110
}
108111
----
109112
=== uploading profiles
110-
Failed to upload profiles: failed to upload profiles of node 1: status: 400, body: 'runtime' is a required field
113+
Failed to upload profiles: failed to upload profiles to Datadog
111114
Upload ID: abc-20241114000000
112115
debug zip upload debugDir --dd-api-key=dd-api-key --dd-app-key=dd-app-key --tags=ERR --cluster=ABC --include=profiles
113116

pkg/cli/zip_upload.go

Lines changed: 144 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ const (
7171

7272
// the path pattern to search for specific artifacts in the debug zip directory
7373
zippedProfilePattern = "nodes/*/*.pprof"
74+
zippedCPUProfilePattern = "nodes/*/cpuprof/*.pprof"
75+
zippedHeapProfilePattern = "nodes/*/heapprof/*.pprof"
7476
zippedLogsPattern = "nodes/*/logs/*"
7577
zippedNodeTableDumpsPattern = "nodes/*/*.txt"
7678

@@ -85,6 +87,7 @@ const (
8587
clusterTag = "cluster"
8688
ddTagsTag = "ddtags"
8789
tableTag = "table"
90+
fileNameTag = "file_name"
8891

8992
// datadog endpoint URLs
9093
datadogProfileUploadURLTmpl = "https://intake.profile.%s/api/v2/profile"
@@ -399,61 +402,147 @@ func validateZipUploadReadiness() error {
399402
return nil
400403
}
401404

405+
// profilePathInfo holds the information about a profile file to be uploaded
406+
// in Datadog. This is used to pass the information to the upload workers
407+
// through upload channel.
408+
type profilePathInfo struct {
409+
nodeID string
410+
filepath string
411+
}
412+
402413
func uploadZipProfiles(ctx context.Context, uploadID string, debugDirPath string) error {
403-
paths, err := expandPatterns([]string{path.Join(debugDirPath, zippedProfilePattern)})
414+
415+
paths, err := expandPatterns([]string{
416+
path.Join(debugDirPath, zippedProfilePattern),
417+
path.Join(debugDirPath, zippedCPUProfilePattern),
418+
path.Join(debugDirPath, zippedHeapProfilePattern)})
419+
404420
if err != nil {
405421
return err
406422
}
407423

424+
if len(paths) == 0 {
425+
return nil
426+
}
427+
428+
var (
429+
noOfWorkers = min(debugZipUploadOpts.maxConcurrentUploads, len(paths))
430+
uploadChan = make(chan profilePathInfo, noOfWorkers*2) // 2x the number of workers to keep them busy
431+
uploadWG = sync.WaitGroup{}
432+
profileUploadState struct {
433+
syncutil.Mutex
434+
isSingleUploadSucceeded bool
435+
}
436+
// regex to match the profile directories. This is used to extract the node ID.
437+
reProfileDirectories = regexp.MustCompile(`.*(heapprof|cpuprof).*\.pprof$`)
438+
)
439+
440+
markSuccessOnce := sync.OnceFunc(func() {
441+
profileUploadState.isSingleUploadSucceeded = true
442+
})
443+
408444
pathsByNode := make(map[string][]string)
445+
maxProfilesOfNode := 0
409446
for _, path := range paths {
410-
nodeID := filepath.Base(filepath.Dir(path))
447+
// extract the node ID from the zippedProfilePattern. If it does not match the
448+
// nodeID (integer) then we assume the path is from zippedCPUProfilePattern
449+
// and zippedHeapProfilePattern and try to extract the node ID from the suffix.
450+
var nodeID = ""
451+
if reProfileDirectories.MatchString(path) {
452+
nodeID = filepath.Base(filepath.Dir(filepath.Dir(path)))
453+
} else {
454+
nodeID = filepath.Base(filepath.Dir(path))
455+
}
456+
411457
if _, ok := pathsByNode[nodeID]; !ok {
412458
pathsByNode[nodeID] = []string{}
413459
}
414460

415461
pathsByNode[nodeID] = append(pathsByNode[nodeID], path)
462+
maxProfilesOfNode = max(maxProfilesOfNode, len(pathsByNode[nodeID]))
416463
}
417464

418-
retryOpts := base.DefaultRetryOptions()
419-
retryOpts.MaxRetries = zipUploadRetries
420-
var req *http.Request
421-
for nodeID, paths := range pathsByNode {
422-
for retry := retry.Start(retryOpts); retry.Next(); {
423-
req, err = newProfileUploadReq(
424-
ctx, paths, appendUserTags(
425-
append(
426-
defaultDDTags, makeDDTag(nodeIDTag, nodeID), makeDDTag(uploadIDTag, uploadID),
427-
makeDDTag(clusterTag, debugZipUploadOpts.clusterName),
428-
), // system generated tags
429-
debugZipUploadOpts.tags..., // user provided tags
430-
),
431-
)
432-
if err != nil {
433-
continue
434-
}
465+
// start the upload pool
466+
noOfWorkers = min(noOfWorkers, maxProfilesOfNode)
467+
for i := 0; i < noOfWorkers; i++ {
468+
go func() {
469+
for pathInfo := range uploadChan {
470+
profilePath := pathInfo.filepath
471+
nodeID := pathInfo.nodeID
435472

436-
if _, err = doUploadReq(req); err == nil {
437-
break
473+
func() {
474+
defer uploadWG.Done()
475+
fileName, err := uploadProfile(profilePath, ctx, nodeID, uploadID)
476+
if err != nil {
477+
fmt.Fprintf(os.Stderr, "failed to upload profile %s of node %s: %s\n", fileName, nodeID, err)
478+
return
479+
}
480+
markSuccessOnce()
481+
}()
438482
}
439-
}
483+
}()
484+
}
440485

441-
if err != nil {
442-
return fmt.Errorf("failed to upload profiles of node %s: %w", nodeID, err)
486+
for nodeID, paths := range pathsByNode {
487+
for _, path := range paths {
488+
uploadWG.Add(1)
489+
uploadChan <- profilePathInfo{nodeID: nodeID, filepath: path}
443490
}
444491

445-
fmt.Fprintf(os.Stderr, "Uploaded profiles of node %s to datadog (%s)\n", nodeID, strings.Join(paths, ", "))
446-
fmt.Fprintf(os.Stderr, "Explore the profiles on datadog: "+
447-
"https://%s/profiling/explorer?query=%s:%s\n", ddSiteToHostMap[debugZipUploadOpts.ddSite],
448-
uploadIDTag, uploadID,
449-
)
492+
uploadWG.Wait()
493+
fmt.Fprintf(os.Stderr, "Uploaded profiles of node %s to datadog\n", nodeID)
494+
}
495+
496+
uploadWG.Wait()
497+
close(uploadChan)
498+
499+
if !profileUploadState.isSingleUploadSucceeded {
500+
return errors.Newf("failed to upload profiles to Datadog")
450501
}
451502

503+
toUnixTimestamp := getCurrentTime().UnixMilli()
504+
//create timestamp for T-30 days.
505+
fromUnixTimestamp := toUnixTimestamp - (30 * 24 * 60 * 60 * 1000)
506+
507+
fmt.Fprintf(os.Stderr, "Explore the profiles on datadog: "+
508+
"https://%s/profiling/explorer?query=%s:%s&viz=stream&from_ts=%d&to_ts=%d&live=false\n", ddSiteToHostMap[debugZipUploadOpts.ddSite],
509+
uploadIDTag, uploadID, fromUnixTimestamp, toUnixTimestamp,
510+
)
511+
452512
return nil
453513
}
454514

515+
func uploadProfile(
516+
profilePath string, ctx context.Context, nodeID string, uploadID string,
517+
) (string, error) {
518+
fileName := filepath.Base(profilePath)
519+
520+
req, err := newProfileUploadReq(
521+
ctx, profilePath, appendUserTags(
522+
append(
523+
defaultDDTags, makeDDTag(nodeIDTag, nodeID), makeDDTag(uploadIDTag, uploadID),
524+
makeDDTag(clusterTag, debugZipUploadOpts.clusterName), makeDDTag(fileNameTag, fileName),
525+
), // system generated tags
526+
debugZipUploadOpts.tags..., // user provided tags
527+
),
528+
)
529+
530+
retryOpts := base.DefaultRetryOptions()
531+
retryOpts.MaxRetries = zipUploadRetries
532+
for retry := retry.Start(retryOpts); retry.Next(); {
533+
if err != nil {
534+
continue
535+
}
536+
537+
if _, err = doUploadReq(req); err == nil {
538+
break
539+
}
540+
}
541+
return fileName, err
542+
}
543+
455544
func newProfileUploadReq(
456-
ctx context.Context, profilePaths []string, tags []string,
545+
ctx context.Context, profilePath string, tags []string,
457546
) (*http.Request, error) {
458547
var (
459548
body bytes.Buffer
@@ -473,26 +562,36 @@ func newProfileUploadReq(
473562
}
474563
)
475564

476-
for _, profilePath := range profilePaths {
477-
fileName := filepath.Base(profilePath)
478-
event.Attachments = append(event.Attachments, fileName)
565+
fileName := filepath.Base(profilePath)
479566

480-
f, err := mw.CreateFormFile(fileName, fileName)
481-
if err != nil {
482-
return nil, err
483-
}
567+
// Datadog only accepts CPU and heap profiles with filename as "cpu.pprof" or "heap.pprof".
568+
// The cpu profile files has "cpu" in the filename prefix and heap profile files
569+
// has "memprof/heap" in the filename prefix. Hence we are renaming the files accordingly
570+
// so that Datadog can recognize and accept them correctly.
571+
if strings.HasPrefix(fileName, "cpu") {
572+
fileName = "cpu.pprof"
573+
} else {
574+
// If the file is not a CPU profile, we assume it is a heap/memory profile.
575+
fileName = "heap.pprof"
576+
}
484577

485-
data, err := os.ReadFile(profilePath)
486-
if err != nil {
487-
return nil, err
488-
}
578+
event.Attachments = append(event.Attachments, fileName)
489579

490-
if _, err := f.Write(data); err != nil {
491-
return nil, err
492-
}
580+
f, err := mw.CreateFormFile(fileName, fileName)
581+
if err != nil {
582+
return nil, err
583+
}
584+
585+
data, err := os.ReadFile(profilePath)
586+
if err != nil {
587+
return nil, err
588+
}
589+
590+
if _, err := f.Write(data); err != nil {
591+
return nil, err
493592
}
494593

495-
f, err := mw.CreatePart(textproto.MIMEHeader{
594+
f, err = mw.CreatePart(textproto.MIMEHeader{
496595
httputil.ContentDispositionHeader: []string{`form-data; name="event"; filename="event.json"`},
497596
httputil.ContentTypeHeader: []string{httputil.JSONContentType},
498597
})

pkg/kv/kvserver/app_batch.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ type appBatchStats struct {
3333
numEntriesProcessedBytes int64
3434
numEmptyEntries int
3535
numAddSST, numAddSSTCopies int
36+
// numWriteAndIngestedBytes is the sum of number of bytes written to the replica and size
37+
// of the sstable to ingest.
38+
numWriteAndIngestedBytes int64
3639

3740
// NB: update `merge` when adding a new field.
3841
}
@@ -42,6 +45,7 @@ func (s *appBatchStats) merge(ss appBatchStats) {
4245
s.numEntriesProcessed += ss.numEntriesProcessed
4346
s.numEntriesProcessedBytes += ss.numEntriesProcessedBytes
4447
ss.numEmptyEntries += ss.numEmptyEntries
48+
s.numWriteAndIngestedBytes += ss.numWriteAndIngestedBytes
4549
}
4650

4751
// appBatch is the in-progress foundation for standalone log entry

pkg/kv/kvserver/load/replica_load.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ type ReplicaLoadStats struct {
6565
// follower and leaseholder reads.
6666
ReadKeysPerSecond float64
6767
// WriteBytesPerSecond is the replica's average bytes written per second. A
68-
// "Write" is as described in WritesPerSecond.
68+
// "Write" is as described in WritesPerSecond. If the replica is a leaseholder,
69+
// this is recorded as the bytes that will be written by the replica during the
70+
// application of the Raft command including write bytes and ingested bytes for
71+
// AddSSTable requests. If the replica is a follower, this is recorded right
72+
// before a command is applied to the state machine.
6973
WriteBytesPerSecond float64
7074
// ReadBytesPerSecond is the replica's average bytes read per second. A "Read" is as
7175
// described in ReadsPerSecond.

pkg/kv/kvserver/replica_app_batch.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -253,16 +253,20 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly(
253253
// We don't track these stats in standalone log application since they depend
254254
// on whether the proposer is still waiting locally, and this concept does not
255255
// apply in a standalone context.
256-
//
257-
// TODO(irfansharif): This code block can be removed once below-raft
258-
// admission control is the only form of IO admission control. It pre-dates
259-
// it -- these stats were previously used to deduct IO tokens for follower
260-
// writes/ingests without waiting.
261-
if !cmd.IsLocal() && !cmd.ApplyAdmissionControl() {
256+
if !cmd.IsLocal() {
262257
writeBytes, ingestedBytes := cmd.getStoreWriteByteSizes()
263-
b.followerStoreWriteBytes.NumEntries++
264-
b.followerStoreWriteBytes.WriteBytes += writeBytes
265-
b.followerStoreWriteBytes.IngestedBytes += ingestedBytes
258+
if writeBytes > 0 || ingestedBytes > 0 {
259+
b.ab.numWriteAndIngestedBytes += writeBytes + ingestedBytes
260+
}
261+
// TODO(irfansharif): This code block can be removed once below-raft
262+
// admission control is the only form of IO admission control. It pre-dates
263+
// it -- these stats were previously used to deduct IO tokens for follower
264+
// writes/ingests without waiting.
265+
if !cmd.ApplyAdmissionControl() {
266+
b.followerStoreWriteBytes.NumEntries++
267+
b.followerStoreWriteBytes.WriteBytes += writeBytes
268+
b.followerStoreWriteBytes.IngestedBytes += ingestedBytes
269+
}
266270
}
267271

268272
// MVCC history mutations violate the closed timestamp, modifying data that
@@ -721,6 +725,7 @@ func (b *replicaAppBatch) recordStatsOnCommit() {
721725
b.applyStats.appBatchStats.merge(b.ab.appBatchStats)
722726
b.applyStats.numBatchesProcessed++
723727
b.applyStats.followerStoreWriteBytes.Merge(b.followerStoreWriteBytes)
728+
b.r.recordRequestWriteBytes(b.ab.numWriteAndIngestedBytes)
724729

725730
if n := b.ab.numAddSST; n > 0 {
726731
b.r.store.metrics.AddSSTableApplications.Inc(int64(n))

pkg/kv/kvserver/replica_raft.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ var ReplicaLeaderlessUnavailableThreshold = settings.RegisterDurationSettingWith
9696
// terminate execution, although it is given no guarantee that the proposal
9797
// won't still go on to commit and apply at some later time.
9898
// - the proposal's ID.
99+
// - the bytes that will be written by the replica during the application of
100+
// the Raft command.
99101
// - any error obtained during the creation or proposal of the command, in
100102
// which case the other returned values are zero.
101103
func (r *Replica) evalAndPropose(
@@ -821,6 +823,9 @@ func (s handleRaftReadyStats) SafeFormat(p redact.SafePrinter, _ rune) {
821823
p.Printf(" (copies=%d)", c)
822824
}
823825
}
826+
if b := s.apply.numWriteAndIngestedBytes; b > 0 {
827+
p.Printf(", apply-write-bytes=%s", humanizeutil.IBytes(b))
828+
}
824829
p.SafeString("]")
825830

826831
if n := s.apply.assertionsRequested; n > 0 {

0 commit comments

Comments
 (0)