@@ -115,6 +115,12 @@ public class TransportGetStackTracesAction extends TransportAction<GetStackTrace
115115 */
116116 private static final String CUSTOM_EVENT_SUB_AGGREGATION_NAME = "custom_event_group" ;
117117
118+ /**
119+ * This is the default sampling rate for profiling events that we use if no sampling rate is
120+ * stored in the backend (backwards compatibility).
121+ */
122+ public static final double DEFAULT_SAMPLING_FREQUENCY = 19.0d ;
123+
118124 private final NodeClient nodeClient ;
119125 private final ProfilingLicenseChecker licenseChecker ;
120126 private final ClusterService clusterService ;
@@ -249,7 +255,6 @@ private void searchGenericEventGroupedByStackTrace(
249255 ActionListener <GetStackTracesResponse > submitListener ,
250256 GetStackTracesResponseBuilder responseBuilder
251257 ) {
252-
253258 CountedTermsAggregationBuilder groupByStackTraceId = new CountedTermsAggregationBuilder ("group_by" ).size (
254259 MAX_TRACE_EVENTS_RESULT_SIZE
255260 ).field (request .getStackTraceIdsField ());
@@ -286,7 +291,7 @@ private void searchGenericEventGroupedByStackTrace(
286291
287292 String stackTraceID = stacktraceBucket .getKeyAsString ();
288293
289- TraceEventID eventID = new TraceEventID ("" , "" , "" , stackTraceID );
294+ TraceEventID eventID = new TraceEventID ("" , "" , "" , stackTraceID , DEFAULT_SAMPLING_FREQUENCY );
290295 TraceEvent event = stackTraceEvents .computeIfAbsent (eventID , k -> new TraceEvent ());
291296 event .count += count ;
292297 subGroups .collectResults (stacktraceBucket , event );
@@ -337,6 +342,16 @@ private void searchEventGroupedByStackTrace(
337342 // Especially with high cardinality fields, this makes aggregations really slow.
338343 .executionHint ("map" )
339344 .subAggregation (groupByHostId );
345+ TermsAggregationBuilder groupByExecutableName = new TermsAggregationBuilder ("group_by" )
346+ // 'size' specifies the max number of host ID we support per request.
347+ .size (MAX_TRACE_EVENTS_RESULT_SIZE )
348+ .field ("process.executable.name" )
349+ // missing("") is used to include documents where the field is missing.
350+ .missing ("" )
351+ // 'execution_hint: map' skips the slow building of ordinals that we don't need.
352+ // Especially with high cardinality fields, this makes aggregations really slow.
353+ .executionHint ("map" )
354+ .subAggregation (groupByThreadName );
340355 SubGroupCollector subGroups = SubGroupCollector .attach (groupByStackTraceId , request .getAggregationFields ());
341356 client .prepareSearch (eventsIndex .getName ())
342357 .setTrackTotalHits (false )
@@ -351,17 +366,34 @@ private void searchEventGroupedByStackTrace(
351366 new TermsAggregationBuilder ("group_by" )
352367 // 'size' specifies the max number of host ID we support per request.
353368 .size (MAX_TRACE_EVENTS_RESULT_SIZE )
354- .field ("process.executable.name " )
355- // missing("" ) is used to include documents where the field is missing.
356- .missing ("" )
369+ .field ("Stacktrace.sampling_frequency " )
370+ // missing(DEFAULT_SAMPLING_RATE ) is used to include documents where the field is missing.
371+ .missing (( long ) DEFAULT_SAMPLING_FREQUENCY )
357372 // 'execution_hint: map' skips the slow building of ordinals that we don't need.
358373 // Especially with high cardinality fields, this makes aggregations really slow.
359374 .executionHint ("map" )
360- .subAggregation (groupByThreadName )
375+ .subAggregation (groupByExecutableName )
376+ .subAggregation (new SumAggregationBuilder ("total_count" ).field ("Stacktrace.count" ))
361377 )
362378 .addAggregation (new SumAggregationBuilder ("total_count" ).field ("Stacktrace.count" ))
363379 .execute (handleEventsGroupedByStackTrace (submitTask , client , responseBuilder , submitListener , searchResponse -> {
364- long totalCount = getAggValueAsLong (searchResponse , "total_count" );
380+ long maxSamplingFrequency = 0 ;
381+
382+ Terms samplingFrequencies = searchResponse .getAggregations ().get ("group_by" );
383+ for (Terms .Bucket samplingFrequencyBucket : samplingFrequencies .getBuckets ()) {
384+ final double samplingFrequency = samplingFrequencyBucket .getKeyAsNumber ().doubleValue ();
385+ if (samplingFrequency > maxSamplingFrequency ) {
386+ maxSamplingFrequency = (long ) samplingFrequency ;
387+ }
388+ }
389+
390+ long totalCount = 0 ;
391+ for (Terms .Bucket samplingFrequencyBucket : samplingFrequencies .getBuckets ()) {
392+ InternalNumericMetricsAggregation .SingleValue count = samplingFrequencyBucket .getAggregations ().get ("total_count" );
393+ final double samplingFrequency = samplingFrequencyBucket .getKeyAsNumber ().doubleValue ();
394+ final double samplingFactor = maxSamplingFrequency / samplingFrequency ;
395+ totalCount += Math .round (count .value () * samplingFactor );
396+ }
365397
366398 Resampler resampler = new Resampler (request , responseBuilder .getSamplingRate (), totalCount );
367399
@@ -371,33 +403,49 @@ private void searchEventGroupedByStackTrace(
371403 long totalFinalCount = 0 ;
372404 Map <TraceEventID , TraceEvent > stackTraceEvents = new HashMap <>(MAX_TRACE_EVENTS_RESULT_SIZE );
373405
374- Terms executableNames = searchResponse .getAggregations ().get ("group_by" );
375- for (Terms .Bucket executableBucket : executableNames .getBuckets ()) {
376- String executableName = executableBucket .getKeyAsString ();
377-
378- Terms threads = executableBucket .getAggregations ().get ("group_by" );
379- for (Terms .Bucket threadBucket : threads .getBuckets ()) {
380- String threadName = threadBucket .getKeyAsString ();
381-
382- Terms hosts = threadBucket .getAggregations ().get ("group_by" );
383- for (Terms .Bucket hostBucket : hosts .getBuckets ()) {
384- String hostID = hostBucket .getKeyAsString ();
385-
386- Terms stacktraces = hostBucket .getAggregations ().get ("group_by" );
387- for (Terms .Bucket stacktraceBucket : stacktraces .getBuckets ()) {
388- Sum count = stacktraceBucket .getAggregations ().get ("count" );
389- int finalCount = resampler .adjustSampleCount ((int ) count .value ());
390- if (finalCount <= 0 ) {
391- continue ;
406+ for (Terms .Bucket samplingFrequencyBucket : samplingFrequencies .getBuckets ()) {
407+ log .debug (
408+ "Using sampling frequency [{}] for [{}] stacktrace events." ,
409+ samplingFrequencyBucket .getKeyAsString (),
410+ totalCount
411+ );
412+ final double samplingFrequency = samplingFrequencyBucket .getKeyAsNumber ().doubleValue ();
413+ final double samplingFactor = maxSamplingFrequency / samplingFrequency ;
414+
415+ Terms executableNames = samplingFrequencyBucket .getAggregations ().get ("group_by" );
416+ for (Terms .Bucket executableBucket : executableNames .getBuckets ()) {
417+ String executableName = executableBucket .getKeyAsString ();
418+
419+ Terms threads = executableBucket .getAggregations ().get ("group_by" );
420+ for (Terms .Bucket threadBucket : threads .getBuckets ()) {
421+ String threadName = threadBucket .getKeyAsString ();
422+
423+ Terms hosts = threadBucket .getAggregations ().get ("group_by" );
424+ for (Terms .Bucket hostBucket : hosts .getBuckets ()) {
425+ String hostID = hostBucket .getKeyAsString ();
426+
427+ Terms stacktraces = hostBucket .getAggregations ().get ("group_by" );
428+ for (Terms .Bucket stacktraceBucket : stacktraces .getBuckets ()) {
429+ Sum count = stacktraceBucket .getAggregations ().get ("count" );
430+ int finalCount = resampler .adjustSampleCount ((int ) Math .round (count .value () * samplingFactor ));
431+ if (finalCount <= 0 ) {
432+ continue ;
433+ }
434+
435+ totalFinalCount += finalCount ;
436+
437+ String stackTraceID = stacktraceBucket .getKeyAsString ();
438+ TraceEventID eventID = new TraceEventID (
439+ executableName ,
440+ threadName ,
441+ hostID ,
442+ stackTraceID ,
443+ maxSamplingFrequency
444+ );
445+ TraceEvent event = stackTraceEvents .computeIfAbsent (eventID , k -> new TraceEvent ());
446+ event .count += finalCount ;
447+ subGroups .collectResults (stacktraceBucket , event );
392448 }
393- totalFinalCount += finalCount ;
394-
395- String stackTraceID = stacktraceBucket .getKeyAsString ();
396-
397- TraceEventID eventID = new TraceEventID (executableName , threadName , hostID , stackTraceID );
398- TraceEvent event = stackTraceEvents .computeIfAbsent (eventID , k -> new TraceEvent ());
399- event .count += finalCount ;
400- subGroups .collectResults (stacktraceBucket , event );
401449 }
402450 }
403451 }
@@ -629,8 +677,8 @@ public void calculateCO2AndCosts() {
629677 );
630678
631679 responseBuilder .getStackTraceEvents ().forEach ((eventId , event ) -> {
632- event .annualCO2Tons += co2Calculator .getAnnualCO2Tons (eventId .hostID (), event .count );
633- event .annualCostsUSD += costCalculator .annualCostsUSD (eventId .hostID (), event .count );
680+ event .annualCO2Tons += co2Calculator .getAnnualCO2Tons (eventId .hostID (), event .count , eventId . samplingFrequency () );
681+ event .annualCostsUSD += costCalculator .annualCostsUSD (eventId .hostID (), event .count , eventId . samplingFrequency () );
634682 });
635683
636684 log .debug (watch ::report );
0 commit comments