@@ -115,6 +115,12 @@ public class TransportGetStackTracesAction extends TransportAction<GetStackTrace
115115 */
116116 private static final String CUSTOM_EVENT_SUB_AGGREGATION_NAME = "custom_event_group" ;
117117
118+ /**
119+ * This is the default sampling rate for profiling events that we use if no sampling rate is
120+ * stored in the backend (backwards compatibility).
121+ */
122+ public static final double DEFAULT_SAMPLING_FREQUENCY = 19.0d ;
123+
118124 private final NodeClient nodeClient ;
119125 private final ProfilingLicenseChecker licenseChecker ;
120126 private final ClusterService clusterService ;
@@ -249,7 +255,6 @@ private void searchGenericEventGroupedByStackTrace(
249255 ActionListener <GetStackTracesResponse > submitListener ,
250256 GetStackTracesResponseBuilder responseBuilder
251257 ) {
252-
253258 CountedTermsAggregationBuilder groupByStackTraceId = new CountedTermsAggregationBuilder ("group_by" ).size (
254259 MAX_TRACE_EVENTS_RESULT_SIZE
255260 ).field (request .getStackTraceIdsField ());
@@ -286,7 +291,7 @@ private void searchGenericEventGroupedByStackTrace(
286291
287292 String stackTraceID = stacktraceBucket .getKeyAsString ();
288293
289- TraceEventID eventID = new TraceEventID ("" , "" , "" , stackTraceID );
294+ TraceEventID eventID = new TraceEventID ("" , "" , "" , stackTraceID , DEFAULT_SAMPLING_FREQUENCY );
290295 TraceEvent event = stackTraceEvents .computeIfAbsent (eventID , k -> new TraceEvent ());
291296 event .count += count ;
292297 subGroups .collectResults (stacktraceBucket , event );
@@ -337,6 +342,16 @@ private void searchEventGroupedByStackTrace(
337342 // Especially with high cardinality fields, this makes aggregations really slow.
338343 .executionHint ("map" )
339344 .subAggregation (groupByHostId );
345+ TermsAggregationBuilder groupByExecutableName = new TermsAggregationBuilder ("group_by" )
346+ // 'size' specifies the max number of host IDs we support per request.
347+ .size (MAX_TRACE_EVENTS_RESULT_SIZE )
348+ .field ("process.executable.name" )
349+ // missing("") is used to include documents where the field is missing.
350+ .missing ("" )
351+ // 'execution_hint: map' skips the slow building of ordinals that we don't need.
352+ // Especially with high cardinality fields, this makes aggregations really slow.
353+ .executionHint ("map" )
354+ .subAggregation (groupByThreadName );
340355 SubGroupCollector subGroups = SubGroupCollector .attach (groupByStackTraceId , request .getAggregationFields ());
341356 client .prepareSearch (eventsIndex .getName ())
342357 .setTrackTotalHits (false )
@@ -351,53 +366,89 @@ private void searchEventGroupedByStackTrace(
351366 new TermsAggregationBuilder ("group_by" )
352367 // 'size' specifies the max number of host ID we support per request.
353368 .size (MAX_TRACE_EVENTS_RESULT_SIZE )
354- .field ("process.executable.name " )
355- // missing("" ) is used to include documents where the field is missing.
356- .missing ("" )
369+ .field ("Stacktrace.sampling_frequency " )
370+ // missing(DEFAULT_SAMPLING_RATE ) is used to include documents where the field is missing.
371+ .missing (( long ) DEFAULT_SAMPLING_FREQUENCY )
357372 // 'execution_hint: map' skips the slow building of ordinals that we don't need.
358373 // Especially with high cardinality fields, this makes aggregations really slow.
359374 .executionHint ("map" )
360- .subAggregation (groupByThreadName )
375+ .subAggregation (groupByExecutableName )
376+ .subAggregation (new SumAggregationBuilder ("total_count" ).field ("Stacktrace.count" ))
361377 )
362378 .addAggregation (new SumAggregationBuilder ("total_count" ).field ("Stacktrace.count" ))
363379 .execute (handleEventsGroupedByStackTrace (submitTask , client , responseBuilder , submitListener , searchResponse -> {
364- long totalCount = getAggValueAsLong (searchResponse , "total_count" );
380+ // The count values for events are scaled up to the highest sampling frequency.
381+ // For example, if the highest sampling frequency is 100, an event with frequency=20 and count=1
382+ // will be upscaled to count=5 (100/20 * count).
383+ // For this, we need to find the highest frequency in the result set.
384+ long maxSamplingFrequency = 0 ;
385+ Terms samplingFrequencies = searchResponse .getAggregations ().get ("group_by" );
386+ for (Terms .Bucket samplingFrequencyBucket : samplingFrequencies .getBuckets ()) {
387+ final double samplingFrequency = samplingFrequencyBucket .getKeyAsNumber ().doubleValue ();
388+ if (samplingFrequency > maxSamplingFrequency ) {
389+ maxSamplingFrequency = (long ) samplingFrequency ;
390+ }
391+ }
392+
393+ // Calculate a scaled-up total count (scaled up to the highest sampling frequency).
394+ long totalCount = 0 ;
395+ for (Terms .Bucket samplingFrequencyBucket : samplingFrequencies .getBuckets ()) {
396+ InternalNumericMetricsAggregation .SingleValue count = samplingFrequencyBucket .getAggregations ().get ("total_count" );
397+ final double samplingFrequency = samplingFrequencyBucket .getKeyAsNumber ().doubleValue ();
398+ final double samplingFactor = maxSamplingFrequency / samplingFrequency ;
399+ totalCount += Math .round (count .value () * samplingFactor );
400+ }
365401
366402 Resampler resampler = new Resampler (request , responseBuilder .getSamplingRate (), totalCount );
367403
368404 // Sort items lexicographically to access Lucene's term dictionary more efficiently when issuing an mget request.
369- // The term dictionary is lexicographically sorted and using the same order reduces the number of page faults
405+ // The term dictionary is lexicographically sorted, and using the same order reduces the number of page faults
370406 // needed to load it.
371407 long totalFinalCount = 0 ;
372408 Map <TraceEventID , TraceEvent > stackTraceEvents = new HashMap <>(MAX_TRACE_EVENTS_RESULT_SIZE );
373409
374- Terms executableNames = searchResponse .getAggregations ().get ("group_by" );
375- for (Terms .Bucket executableBucket : executableNames .getBuckets ()) {
376- String executableName = executableBucket .getKeyAsString ();
377-
378- Terms threads = executableBucket .getAggregations ().get ("group_by" );
379- for (Terms .Bucket threadBucket : threads .getBuckets ()) {
380- String threadName = threadBucket .getKeyAsString ();
381-
382- Terms hosts = threadBucket .getAggregations ().get ("group_by" );
383- for (Terms .Bucket hostBucket : hosts .getBuckets ()) {
384- String hostID = hostBucket .getKeyAsString ();
385-
386- Terms stacktraces = hostBucket .getAggregations ().get ("group_by" );
387- for (Terms .Bucket stacktraceBucket : stacktraces .getBuckets ()) {
388- Sum count = stacktraceBucket .getAggregations ().get ("count" );
389- int finalCount = resampler .adjustSampleCount ((int ) count .value ());
390- if (finalCount <= 0 ) {
391- continue ;
410+ // Walk over all nested aggregations.
411+ // The outermost aggregation is the sampling frequency.
412+ // The next level is the executable name, followed by the thread name, host ID and stacktrace ID.
413+ // the innermost aggregation contains the count of samples for each stacktrace ID.
414+ for (Terms .Bucket samplingFrequencyBucket : samplingFrequencies .getBuckets ()) {
415+ final double samplingFrequency = samplingFrequencyBucket .getKeyAsNumber ().doubleValue ();
416+ final double samplingFactor = maxSamplingFrequency / samplingFrequency ;
417+
418+ Terms executableNames = samplingFrequencyBucket .getAggregations ().get ("group_by" );
419+ for (Terms .Bucket executableBucket : executableNames .getBuckets ()) {
420+ String executableName = executableBucket .getKeyAsString ();
421+
422+ Terms threads = executableBucket .getAggregations ().get ("group_by" );
423+ for (Terms .Bucket threadBucket : threads .getBuckets ()) {
424+ String threadName = threadBucket .getKeyAsString ();
425+
426+ Terms hosts = threadBucket .getAggregations ().get ("group_by" );
427+ for (Terms .Bucket hostBucket : hosts .getBuckets ()) {
428+ String hostID = hostBucket .getKeyAsString ();
429+
430+ Terms stacktraces = hostBucket .getAggregations ().get ("group_by" );
431+ for (Terms .Bucket stacktraceBucket : stacktraces .getBuckets ()) {
432+ Sum count = stacktraceBucket .getAggregations ().get ("count" );
433+ int finalCount = resampler .adjustSampleCount ((int ) Math .round (count .value () * samplingFactor ));
434+ if (finalCount <= 0 ) {
435+ continue ;
436+ }
437+
438+ totalFinalCount += finalCount ;
439+
440+ String stackTraceID = stacktraceBucket .getKeyAsString ();
441+ TraceEventID eventID = new TraceEventID (
442+ executableName ,
443+ threadName ,
444+ hostID ,
445+ stackTraceID ,
446+ maxSamplingFrequency
447+ );
448+ TraceEvent event = stackTraceEvents .computeIfAbsent (eventID , k -> new TraceEvent ());
449+ event .count += finalCount ;
450+ subGroups .collectResults (stacktraceBucket , event );
392451 }
393- totalFinalCount += finalCount ;
394-
395- String stackTraceID = stacktraceBucket .getKeyAsString ();
396-
397- TraceEventID eventID = new TraceEventID (executableName , threadName , hostID , stackTraceID );
398- TraceEvent event = stackTraceEvents .computeIfAbsent (eventID , k -> new TraceEvent ());
399- event .count += finalCount ;
400- subGroups .collectResults (stacktraceBucket , event );
401452 }
402453 }
403454 }
@@ -629,8 +680,8 @@ public void calculateCO2AndCosts() {
629680 );
630681
631682 responseBuilder .getStackTraceEvents ().forEach ((eventId , event ) -> {
632- event .annualCO2Tons += co2Calculator .getAnnualCO2Tons (eventId .hostID (), event .count );
633- event .annualCostsUSD += costCalculator .annualCostsUSD (eventId .hostID (), event .count );
683+ event .annualCO2Tons += co2Calculator .getAnnualCO2Tons (eventId .hostID (), event .count , eventId . samplingFrequency () );
684+ event .annualCostsUSD += costCalculator .annualCostsUSD (eventId .hostID (), event .count , eventId . samplingFrequency () );
634685 });
635686
636687 log .debug (watch ::report );
0 commit comments