@@ -69,6 +69,8 @@ const (
69
69
70
70
// the path pattern to search for specific artifacts in the debug zip directory
71
71
zippedProfilePattern = "nodes/*/*.pprof"
72
+ zippedCPUProfilePattern = "nodes/*/cpuprof/*.pprof"
73
+ zippedHeapProfilePattern = "nodes/*/heapprof/*.pprof"
72
74
zippedLogsPattern = "nodes/*/logs/*"
73
75
zippedNodeTableDumpsPattern = "nodes/*/*.txt"
74
76
@@ -83,6 +85,7 @@ const (
83
85
clusterTag = "cluster"
84
86
ddTagsTag = "ddtags"
85
87
tableTag = "table"
88
+ fileNameTag = "file_name"
86
89
87
90
// datadog endpoint URLs
88
91
datadogProfileUploadURLTmpl = "https://intake.profile.%s/api/v2/profile"
@@ -296,61 +299,147 @@ func validateZipUploadReadiness() error {
296
299
return nil
297
300
}
298
301
302
+ // profilePathInfo holds the information about a profile file to be uploaded
303
+ // in Datadog. This is used to pass the information to the upload workers
304
+ // through upload channel.
305
+ type profilePathInfo struct {
306
+ nodeID string
307
+ filepath string
308
+ }
309
+
299
310
func uploadZipProfiles (ctx context.Context , uploadID string , debugDirPath string ) error {
300
- paths , err := expandPatterns ([]string {path .Join (debugDirPath , zippedProfilePattern )})
311
+
312
+ paths , err := expandPatterns ([]string {
313
+ path .Join (debugDirPath , zippedProfilePattern ),
314
+ path .Join (debugDirPath , zippedCPUProfilePattern ),
315
+ path .Join (debugDirPath , zippedHeapProfilePattern )})
316
+
301
317
if err != nil {
302
318
return err
303
319
}
304
320
321
+ if len (paths ) == 0 {
322
+ return nil
323
+ }
324
+
325
+ var (
326
+ noOfWorkers = min (debugZipUploadOpts .maxConcurrentUploads , len (paths ))
327
+ uploadChan = make (chan profilePathInfo , noOfWorkers * 2 ) // 2x the number of workers to keep them busy
328
+ uploadWG = sync.WaitGroup {}
329
+ profileUploadState struct {
330
+ syncutil.Mutex
331
+ isSingleUploadSucceeded bool
332
+ }
333
+ // regex to match the profile directories. This is used to extract the node ID.
334
+ reProfileDirectories = regexp .MustCompile (`.*(heapprof|cpuprof).*\.pprof$` )
335
+ )
336
+
337
+ markSuccessOnce := sync .OnceFunc (func () {
338
+ profileUploadState .isSingleUploadSucceeded = true
339
+ })
340
+
305
341
pathsByNode := make (map [string ][]string )
342
+ maxProfilesOfNode := 0
306
343
for _ , path := range paths {
307
- nodeID := filepath .Base (filepath .Dir (path ))
344
+ // extract the node ID from the zippedProfilePattern. If it does not match the
345
+ // nodeID (integer) then we assume the path is from zippedCPUProfilePattern
346
+ // and zippedHeapProfilePattern and try to extract the node ID from the suffix.
347
+ var nodeID = ""
348
+ if reProfileDirectories .MatchString (path ) {
349
+ nodeID = filepath .Base (filepath .Dir (filepath .Dir (path )))
350
+ } else {
351
+ nodeID = filepath .Base (filepath .Dir (path ))
352
+ }
353
+
308
354
if _ , ok := pathsByNode [nodeID ]; ! ok {
309
355
pathsByNode [nodeID ] = []string {}
310
356
}
311
357
312
358
pathsByNode [nodeID ] = append (pathsByNode [nodeID ], path )
359
+ maxProfilesOfNode = max (maxProfilesOfNode , len (pathsByNode [nodeID ]))
313
360
}
314
361
315
- retryOpts := base .DefaultRetryOptions ()
316
- retryOpts .MaxRetries = zipUploadRetries
317
- var req * http.Request
318
- for nodeID , paths := range pathsByNode {
319
- for retry := retry .Start (retryOpts ); retry .Next (); {
320
- req , err = newProfileUploadReq (
321
- ctx , paths , appendUserTags (
322
- append (
323
- defaultDDTags , makeDDTag (nodeIDTag , nodeID ), makeDDTag (uploadIDTag , uploadID ),
324
- makeDDTag (clusterTag , debugZipUploadOpts .clusterName ),
325
- ), // system generated tags
326
- debugZipUploadOpts .tags ... , // user provided tags
327
- ),
328
- )
329
- if err != nil {
330
- continue
331
- }
362
+ // start the upload pool
363
+ noOfWorkers = min (noOfWorkers , maxProfilesOfNode )
364
+ for i := 0 ; i < noOfWorkers ; i ++ {
365
+ go func () {
366
+ for pathInfo := range uploadChan {
367
+ profilePath := pathInfo .filepath
368
+ nodeID := pathInfo .nodeID
332
369
333
- if _ , err = doUploadReq (req ); err == nil {
334
- break
370
+ func () {
371
+ defer uploadWG .Done ()
372
+ fileName , err := uploadProfile (profilePath , ctx , nodeID , uploadID )
373
+ if err != nil {
374
+ fmt .Fprintf (os .Stderr , "failed to upload profile %s of node %s: %s\n " , fileName , nodeID , err )
375
+ return
376
+ }
377
+ markSuccessOnce ()
378
+ }()
335
379
}
336
- }
380
+ }()
381
+ }
337
382
338
- if err != nil {
339
- return fmt .Errorf ("failed to upload profiles of node %s: %w" , nodeID , err )
383
+ for nodeID , paths := range pathsByNode {
384
+ for _ , path := range paths {
385
+ uploadWG .Add (1 )
386
+ uploadChan <- profilePathInfo {nodeID : nodeID , filepath : path }
340
387
}
341
388
342
- fmt .Fprintf (os .Stderr , "Uploaded profiles of node %s to datadog (%s)\n " , nodeID , strings .Join (paths , ", " ))
343
- fmt .Fprintf (os .Stderr , "Explore the profiles on datadog: " +
344
- "https://%s/profiling/explorer?query=%s:%s\n " , ddSiteToHostMap [debugZipUploadOpts .ddSite ],
345
- uploadIDTag , uploadID ,
346
- )
389
+ uploadWG .Wait ()
390
+ fmt .Fprintf (os .Stderr , "Uploaded profiles of node %s to datadog\n " , nodeID )
391
+ }
392
+
393
+ uploadWG .Wait ()
394
+ close (uploadChan )
395
+
396
+ if ! profileUploadState .isSingleUploadSucceeded {
397
+ return errors .Newf ("failed to upload profiles to Datadog" )
347
398
}
348
399
400
+ toUnixTimestamp := getCurrentTime ().UnixMilli ()
401
+ //create timestamp for T-30 days.
402
+ fromUnixTimestamp := toUnixTimestamp - (30 * 24 * 60 * 60 * 1000 )
403
+
404
+ fmt .Fprintf (os .Stderr , "Explore the profiles on datadog: " +
405
+ "https://%s/profiling/explorer?query=%s:%s&viz=stream&from_ts=%d&to_ts=%d&live=false\n " , ddSiteToHostMap [debugZipUploadOpts .ddSite ],
406
+ uploadIDTag , uploadID , fromUnixTimestamp , toUnixTimestamp ,
407
+ )
408
+
349
409
return nil
350
410
}
351
411
412
+ func uploadProfile (
413
+ profilePath string , ctx context.Context , nodeID string , uploadID string ,
414
+ ) (string , error ) {
415
+ fileName := filepath .Base (profilePath )
416
+
417
+ req , err := newProfileUploadReq (
418
+ ctx , profilePath , appendUserTags (
419
+ append (
420
+ defaultDDTags , makeDDTag (nodeIDTag , nodeID ), makeDDTag (uploadIDTag , uploadID ),
421
+ makeDDTag (clusterTag , debugZipUploadOpts .clusterName ), makeDDTag (fileNameTag , fileName ),
422
+ ), // system generated tags
423
+ debugZipUploadOpts .tags ... , // user provided tags
424
+ ),
425
+ )
426
+
427
+ retryOpts := base .DefaultRetryOptions ()
428
+ retryOpts .MaxRetries = zipUploadRetries
429
+ for retry := retry .Start (retryOpts ); retry .Next (); {
430
+ if err != nil {
431
+ continue
432
+ }
433
+
434
+ if _ , err = doUploadReq (req ); err == nil {
435
+ break
436
+ }
437
+ }
438
+ return fileName , err
439
+ }
440
+
352
441
func newProfileUploadReq (
353
- ctx context.Context , profilePaths [] string , tags []string ,
442
+ ctx context.Context , profilePath string , tags []string ,
354
443
) (* http.Request , error ) {
355
444
var (
356
445
body bytes.Buffer
@@ -370,26 +459,36 @@ func newProfileUploadReq(
370
459
}
371
460
)
372
461
373
- for _ , profilePath := range profilePaths {
374
- fileName := filepath .Base (profilePath )
375
- event .Attachments = append (event .Attachments , fileName )
462
+ fileName := filepath .Base (profilePath )
376
463
377
- f , err := mw .CreateFormFile (fileName , fileName )
378
- if err != nil {
379
- return nil , err
380
- }
464
+ // Datadog only accepts CPU and heap profiles with filename as "cpu.pprof" or "heap.pprof".
465
+ // The cpu profile files has "cpu" in the filename prefix and heap profile files
466
+ // has "memprof/heap" in the filename prefix. Hence we are renaming the files accordingly
467
+ // so that Datadog can recognize and accept them correctly.
468
+ if strings .HasPrefix (fileName , "cpu" ) {
469
+ fileName = "cpu.pprof"
470
+ } else {
471
+ // If the file is not a CPU profile, we assume it is a heap/memory profile.
472
+ fileName = "heap.pprof"
473
+ }
381
474
382
- data , err := os .ReadFile (profilePath )
383
- if err != nil {
384
- return nil , err
385
- }
475
+ event .Attachments = append (event .Attachments , fileName )
386
476
387
- if _ , err := f .Write (data ); err != nil {
388
- return nil , err
389
- }
477
+ f , err := mw .CreateFormFile (fileName , fileName )
478
+ if err != nil {
479
+ return nil , err
480
+ }
481
+
482
+ data , err := os .ReadFile (profilePath )
483
+ if err != nil {
484
+ return nil , err
485
+ }
486
+
487
+ if _ , err := f .Write (data ); err != nil {
488
+ return nil , err
390
489
}
391
490
392
- f , err : = mw .CreatePart (textproto.MIMEHeader {
491
+ f , err = mw .CreatePart (textproto.MIMEHeader {
393
492
httputil .ContentDispositionHeader : []string {`form-data; name="event"; filename="event.json"` },
394
493
httputil .ContentTypeHeader : []string {httputil .JSONContentType },
395
494
})
0 commit comments