Skip to content

Commit bfb3aee

Browse files
authored
[Profiling] Manually downsample to 20k events if required (#131196)
* [Profiling] Manually downsample to 20k events if required Since the random sampler aggregation is limited to p values of 0..0.5, add an explicit downsampling for p values of 0.5..1.0. With this, the latency of fetching stacktraces and stackframes can be further reduced. * Fix number of reported samples
1 parent e4ff3f8 commit bfb3aee

File tree

4 files changed

+98
-36
lines changed

4 files changed

+98
-36
lines changed

x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/GetStackTracesResponse.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class GetStackTracesResponse extends ActionResponse implements ChunkedToX
3232
private final Map<TraceEventID, TraceEvent> stackTraceEvents;
3333
private final int totalFrames;
3434
private final double samplingRate;
35+
private final double samplingFrequency;
3536
private final long totalSamples;
3637

3738
public GetStackTracesResponse(
@@ -42,6 +43,28 @@ public GetStackTracesResponse(
4243
int totalFrames,
4344
double samplingRate,
4445
long totalSamples
46+
) {
47+
this(
48+
stackTraces,
49+
stackFrames,
50+
executables,
51+
stackTraceEvents,
52+
totalFrames,
53+
samplingRate,
54+
totalSamples,
55+
TransportGetStackTracesAction.DEFAULT_SAMPLING_FREQUENCY
56+
);
57+
}
58+
59+
public GetStackTracesResponse(
60+
Map<String, StackTrace> stackTraces,
61+
Map<String, StackFrame> stackFrames,
62+
Map<String, String> executables,
63+
Map<TraceEventID, TraceEvent> stackTraceEvents,
64+
int totalFrames,
65+
double samplingRate,
66+
long totalSamples,
67+
double samplingFrequency
4568
) {
4669
this.stackTraces = stackTraces;
4770
this.stackFrames = stackFrames;
@@ -50,6 +73,7 @@ public GetStackTracesResponse(
5073
this.totalFrames = totalFrames;
5174
this.samplingRate = samplingRate;
5275
this.totalSamples = totalSamples;
76+
this.samplingFrequency = samplingFrequency;
5377
}
5478

5579
@Override
@@ -101,7 +125,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
101125
Iterators.map(v.entrySet().iterator(), e -> (b, p) -> b.field(e.getKey().stacktraceID(), e.getValue().count))
102126
)
103127
),
104-
Iterators.single((b, p) -> b.field("sampling_rate", samplingRate).endObject())
128+
Iterators.single((b, p) -> b.field("sampling_rate", samplingRate).field("sampling_frequency", samplingFrequency).endObject())
105129
// the following fields are intentionally not written to the XContent representation (only needed on the transport layer):
106130
//
107131
// * start
@@ -129,6 +153,7 @@ public boolean equals(Object o) {
129153
GetStackTracesResponse response = (GetStackTracesResponse) o;
130154
return totalFrames == response.totalFrames
131155
&& samplingRate == response.samplingRate
156+
&& samplingFrequency == response.samplingFrequency
132157
&& Objects.equals(stackTraces, response.stackTraces)
133158
&& Objects.equals(stackFrames, response.stackFrames)
134159
&& Objects.equals(executables, response.executables)
@@ -137,6 +162,6 @@ public boolean equals(Object o) {
137162

138163
@Override
139164
public int hashCode() {
140-
return Objects.hash(stackTraces, stackFrames, executables, stackTraceEvents, totalFrames, samplingRate);
165+
return Objects.hash(stackTraces, stackFrames, executables, stackTraceEvents, totalFrames, samplingRate, samplingFrequency);
141166
}
142167
}

x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/GetStackTracesResponseBuilder.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ public double getSamplingRate() {
8282
return samplingRate;
8383
}
8484

85+
public void setSamplingFrequency(long samplingFrequency) {
86+
this.samplingFrequency = samplingFrequency;
87+
}
88+
89+
public long getSamplingFrequency() {
90+
return samplingFrequency;
91+
}
92+
8593
public void setRequestedDuration(Double requestedDuration) {
8694
this.requestedDuration = requestedDuration;
8795
}
@@ -154,14 +162,15 @@ public GetStackTracesResponse build() {
154162
}
155163
}
156164
}
157-
return new GetStackTracesResponse(stackTraces, stackFrames, executables, stackTraceEvents, totalFrames, samplingRate, totalSamples);
158-
}
159-
160-
public void setSamplingFrequency(long samplingFrequency) {
161-
this.samplingFrequency = samplingFrequency;
162-
}
163-
164-
public long getSamplingFrequency() {
165-
return samplingFrequency;
165+
return new GetStackTracesResponse(
166+
stackTraces,
167+
stackFrames,
168+
executables,
169+
stackTraceEvents,
170+
totalFrames,
171+
samplingRate,
172+
totalSamples,
173+
samplingFrequency
174+
);
166175
}
167176
}

x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/TransportGetStackTracesAction.java

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -400,10 +400,10 @@ private void searchEventGroupedByStackTrace(
400400
StopWatch watch = new StopWatch("createStackTraceEvents");
401401
SingleBucketAggregation sample = searchResponse.getAggregations().get("sample");
402402
InternalComposite stacktraces = sample.getAggregations().get("group_by");
403+
RandomGenerator rng = new Random(rngSeed);
403404
long indexSamplingFactor = Math.round(1 / eventsIndex.getSampleRate()); // for example, 5^2 = 25 for profiling-events-5pow02
404405
int bucketCount = stacktraces.getBuckets().size();
405406
long eventCount = sample.getDocCount();
406-
AtomicLong upscaledEventCount = new AtomicLong(eventCount * indexSamplingFactor);
407407
long maxSamplingFrequency = getAggValueAsLong(searchResponse, "max_freq") <= 0
408408
? (long) DEFAULT_SAMPLING_FREQUENCY
409409
: getAggValueAsLong(searchResponse, "max_freq");
@@ -413,13 +413,35 @@ private void searchEventGroupedByStackTrace(
413413
eventCount,
414414
bucketCount,
415415
indexSamplingFactor,
416-
upscaledEventCount.get()
416+
eventCount * indexSamplingFactor
417417
);
418418

419+
// Since the random sampler aggregation does not support sampling rates between 0.5 and 1.0,
420+
// we can have up to 2x more events in the response as requested by the user.
421+
// In order to reduce latency for stacktrace and stackframe lookups, we add another sampling factor
422+
// to reduce the number of events to match the user request (which reduces the number of unique stacktrace ids).
423+
boolean needAdditionalDownsampling = eventCount > request.getSampleSize();
424+
double downSamplingRate = needAdditionalDownsampling
425+
? (double) request.getSampleSize() / eventCount
426+
: responseBuilder.getSamplingRate();
427+
428+
eventCount = 0;
419429
boolean mixedFrequency = false;
420430
Map<TraceEventID, TraceEvent> stackTraceEvents = new HashMap<>(bucketCount);
421431
for (InternalComposite.InternalBucket stacktraceBucket : stacktraces.getBuckets()) {
422-
long count = stacktraceBucket.getDocCount() * indexSamplingFactor;
432+
long sampledCount;
433+
if (needAdditionalDownsampling) {
434+
sampledCount = downsampleEvents(rng, downSamplingRate, stacktraceBucket.getDocCount());
435+
if (sampledCount <= 0) {
436+
bucketCount--;
437+
continue; // skip bucket
438+
}
439+
} else {
440+
sampledCount = stacktraceBucket.getDocCount();
441+
}
442+
443+
long count = roundWithRandom((sampledCount * indexSamplingFactor) / downSamplingRate, rng);
444+
eventCount += sampledCount;
423445

424446
TraceEventID eventID = getTraceEventID(stacktraceBucket);
425447
stackTraceEvents.compute(eventID, (k, event) -> {
@@ -450,46 +472,52 @@ private void searchEventGroupedByStackTrace(
450472
}
451473
}
452474

475+
AtomicLong upscaledEventCount = new AtomicLong(eventCount * indexSamplingFactor);
453476
if (mixedFrequency) {
454-
RandomGenerator r = new Random(rngSeed);
455477
upscaledEventCount.set(0);
456478

457479
// Events have different frequencies.
458-
// Now upscale the count values to the max sampling frequency,
459-
// also taking into account the stratified downsampling factor (5, 25, 125, etc.).
480+
// Scale the count up to the maximum sampling frequency.
460481
stackTraceEvents.forEach((eventID, event) -> {
461-
if (eventID.samplingFrequency() == maxSamplingFrequency) {
462-
upscaledEventCount.addAndGet(event.count);
463-
return; // no need to upscale
482+
if (eventID.samplingFrequency() != maxSamplingFrequency) {
483+
double samplingFactor = maxSamplingFrequency / eventID.samplingFrequency();
484+
event.count = roundWithRandom(event.count * samplingFactor, rng);
464485
}
465-
466-
// Use randomization, to avoid a systematic rounding issue that would happen
467-
// if we naively do `event.count = Math.round(event.count * samplingFactor)`.
468-
// For example, think of event.count = 1 and samplingFactor = 1.4: the naive approach would not change anything.
469-
double samplingFactor = maxSamplingFrequency / eventID.samplingFrequency();
470-
double newCount = event.count * samplingFactor;
471-
long integerPart = (long) newCount;
472-
double fractionalPart = newCount - integerPart;
473-
event.count = integerPart + (r.nextDouble() < fractionalPart ? 1 : 0);
474486
upscaledEventCount.addAndGet(event.count);
475487
});
476488
}
477489

478490
log.debug(watch::report);
479491

492+
responseBuilder.setSamplingRate(downSamplingRate);
480493
responseBuilder.setSamplingFrequency(maxSamplingFrequency);
481494
responseBuilder.setTotalSamples(upscaledEventCount.get());
482-
log.debug(
483-
"Found [{}] events in [{}] buckets, upscaled to [{}] events).",
484-
eventCount,
485-
bucketCount,
486-
upscaledEventCount.get()
487-
);
495+
log.debug("Use [{}] events in [{}] buckets, upscaled to [{}] events).", eventCount, bucketCount, upscaledEventCount.get());
488496

489497
return stackTraceEvents;
490498
}));
491499
}
492500

501+
private static long roundWithRandom(double value, RandomGenerator r) {
502+
// Use randomization, to avoid a systematic rounding issue that would happen
503+
// if we naively do `Math.round(value)`.
504+
// For example, think of rounding value = 1.4: the naive approach would always drop 0.4 and return 1.
505+
long integerPart = (long) value;
506+
double fractionalPart = value - integerPart;
507+
return integerPart + (r.nextDouble() < fractionalPart ? 1 : 0);
508+
}
509+
510+
private static long downsampleEvents(RandomGenerator r, double samplingRate, long count) {
511+
// Downsampling needs to be applied to each event individually.
512+
long sampledCount = 0;
513+
for (long i = 0; i < count; i++) {
514+
if (r.nextDouble() < samplingRate) {
515+
sampledCount++;
516+
}
517+
}
518+
return sampledCount;
519+
}
520+
493521
private static TraceEventID getTraceEventID(InternalComposite.InternalBucket stacktraceBucket) {
494522
Map<String, Object> key = stacktraceBucket.getKey();
495523
Object samplingFrequency = key.get("freq");

x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/action/GetStackTracesResponseTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private GetStackTracesResponse createTestInstance() {
5858

5959
public void testChunking() {
6060
AbstractChunkedSerializingTestCase.assertChunkCount(createTestInstance(), instance -> {
61-
// start and {sampling_rate; end}; see GetStackTracesResponse.toXContentChunked()
61+
// start and {sampling_rate; sampling_freq; end}; see GetStackTracesResponse.toXContentChunked()
6262
int chunks = 2;
6363
chunks += size(instance.getExecutables());
6464
chunks += size(instance.getStackFrames());

0 commit comments

Comments
 (0)