Skip to content

Commit 0cc854d

Browse files
authored
Add time range bucketing attribute to APM took time latency metrics (#135549)
This is similar to #135524, but adding the time range bucketing attribute to the took time latency metric. That requires a bit of ceremony as the took time metric is recorded on the coordinating node, while the time range filter is parsed on each shard. We don't have mappings available on the coord node, which are needed to parse dates. Thus we need to rely on date parsing done on the data nodes, which requires sending back the parsed value to the coord node, performing some simple reduction on it, and adding it back to the search response. This also includes two improvements to the existing mechanism: - take date precision into account - track the time range as part of query rewrite, additionally to query execution. This way queries that are rewritten to a match_all still see their original range reported
1 parent 414972d commit 0cc854d

28 files changed

+738
-154
lines changed

docs/changelog/135549.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135549
2+
summary: Add time range bucketing attribute to APM took time latency metrics
3+
area: Search
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/action/search/CountOnlyQueryPhaseResultConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
8686
1,
8787
0,
8888
0,
89-
results.isEmpty()
89+
results.isEmpty(),
90+
null
9091
);
9192
if (progressListener != SearchProgressListener.NOOP) {
9293
progressListener.notifyFinalReduce(

server/src/main/java/org/elasticsearch/action/search/RankFeaturePhase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,8 @@ private SearchPhaseController.ReducedQueryPhase newReducedQueryPhaseResults(
251251
reducedQueryPhase.numReducePhases(),
252252
reducedQueryPhase.size(),
253253
reducedQueryPhase.from(),
254-
reducedQueryPhase.isEmptyResult()
254+
reducedQueryPhase.isEmptyResult(),
255+
reducedQueryPhase.timeRangeFilterFromMillis()
255256
);
256257
}
257258

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,8 @@ static ReducedQueryPhase reducedQueryPhase(
487487
numReducePhases,
488488
0,
489489
0,
490-
true
490+
true,
491+
null
491492
);
492493
}
493494
final List<QuerySearchResult> nonNullResults = new ArrayList<>();
@@ -516,6 +517,7 @@ static ReducedQueryPhase reducedQueryPhase(
516517
: Collections.emptyMap();
517518
int from = 0;
518519
int size = 0;
520+
Long timeRangeFilterFromMillis = null;
519521
DocValueFormat[] sortValueFormats = null;
520522
for (QuerySearchResult result : nonNullResults) {
521523
from = result.from();
@@ -525,6 +527,16 @@ static ReducedQueryPhase reducedQueryPhase(
525527
sortValueFormats = result.sortValueFormats();
526528
}
527529

530+
if (result.getTimeRangeFilterFromMillis() != null) {
531+
if (timeRangeFilterFromMillis == null) {
532+
timeRangeFilterFromMillis = result.getTimeRangeFilterFromMillis();
533+
} else {
534+
// all shards should hold the same value, besides edge cases like different mappings
535+
// for event.ingested and @timestamp across indices being searched
536+
timeRangeFilterFromMillis = Math.min(result.getTimeRangeFilterFromMillis(), timeRangeFilterFromMillis);
537+
}
538+
}
539+
528540
if (hasSuggest) {
529541
assert result.suggest() != null;
530542
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
@@ -579,7 +591,8 @@ static ReducedQueryPhase reducedQueryPhase(
579591
numReducePhases,
580592
size,
581593
from,
582-
false
594+
false,
595+
timeRangeFilterFromMillis
583596
);
584597
}
585598

@@ -662,7 +675,8 @@ public record ReducedQueryPhase(
662675
// the offset into the merged top hits
663676
int from,
664677
// <code>true</code> iff the query phase had no results. Otherwise <code>false</code>
665-
boolean isEmptyResult
678+
boolean isEmptyResult,
679+
Long timeRangeFilterFromMillis
666680
) {
667681

668682
public ReducedQueryPhase {
@@ -683,7 +697,8 @@ public SearchResponseSections buildResponse(SearchHits hits, Collection<? extend
683697
timedOut,
684698
terminatedEarly,
685699
buildSearchProfileResults(fetchResults),
686-
numReducePhases
700+
numReducePhases,
701+
timeRangeFilterFromMillis
687702
);
688703
}
689704

server/src/main/java/org/elasticsearch/action/search/SearchRequestAttributesExtractor.java

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.cluster.metadata.DataStream;
15+
import org.elasticsearch.cluster.metadata.IndexMetadata;
1416
import org.elasticsearch.common.regex.Regex;
1517
import org.elasticsearch.common.util.concurrent.EsExecutors;
1618
import org.elasticsearch.core.TimeValue;
@@ -53,11 +55,15 @@ public static Map<String, Object> extractAttributes(SearchRequest searchRequest,
5355
/**
5456
* Introspects the provided shard search request and extracts metadata from it about some of its characteristics.
5557
*/
56-
public static Map<String, Object> extractAttributes(ShardSearchRequest shardSearchRequest, Long rangeTimestampFrom, long nowInMillis) {
58+
public static Map<String, Object> extractAttributes(
59+
ShardSearchRequest shardSearchRequest,
60+
Long timeRangeFilterFromMillis,
61+
long nowInMillis
62+
) {
5763
Map<String, Object> attributes = extractAttributes(
5864
shardSearchRequest.source(),
5965
shardSearchRequest.scroll(),
60-
rangeTimestampFrom,
66+
timeRangeFilterFromMillis,
6167
nowInMillis,
6268
shardSearchRequest.shardId().getIndexName()
6369
);
@@ -69,7 +75,7 @@ public static Map<String, Object> extractAttributes(ShardSearchRequest shardSear
6975
private static Map<String, Object> extractAttributes(
7076
SearchSourceBuilder searchSourceBuilder,
7177
TimeValue scroll,
72-
Long rangeTimestampFrom,
78+
Long timeRangeFilterFromMillis,
7379
long nowInMillis,
7480
String... localIndices
7581
) {
@@ -107,9 +113,9 @@ private static Map<String, Object> extractAttributes(
107113
}
108114

109115
final boolean hasKnn = searchSourceBuilder.knnSearch().isEmpty() == false || queryMetadataBuilder.knnQuery;
110-
String timestampRangeFilter = null;
111-
if (rangeTimestampFrom != null) {
112-
timestampRangeFilter = introspectTimeRange(rangeTimestampFrom, nowInMillis);
116+
String timeRangeFilterFrom = null;
117+
if (timeRangeFilterFromMillis != null) {
118+
timeRangeFilterFrom = introspectTimeRange(timeRangeFilterFromMillis, nowInMillis);
113119
}
114120
return buildAttributesMap(
115121
target,
@@ -119,7 +125,7 @@ private static Map<String, Object> extractAttributes(
119125
queryMetadataBuilder.rangeOnTimestamp,
120126
queryMetadataBuilder.rangeOnEventIngested,
121127
pitOrScroll,
122-
timestampRangeFilter
128+
timeRangeFilterFrom
123129
);
124130
}
125131

@@ -131,7 +137,7 @@ private static Map<String, Object> buildAttributesMap(
131137
boolean rangeOnTimestamp,
132138
boolean rangeOnEventIngested,
133139
String pitOrScroll,
134-
String timestampRangeFilter
140+
String timeRangeFilterFrom
135141
) {
136142
Map<String, Object> attributes = new HashMap<>(5, 1.0f);
137143
attributes.put(TARGET_ATTRIBUTE, target);
@@ -143,14 +149,18 @@ private static Map<String, Object> buildAttributesMap(
143149
if (knn) {
144150
attributes.put(KNN_ATTRIBUTE, knn);
145151
}
146-
if (rangeOnTimestamp) {
147-
attributes.put(RANGE_TIMESTAMP_ATTRIBUTE, rangeOnTimestamp);
148-
}
149-
if (rangeOnEventIngested) {
150-
attributes.put(RANGE_EVENT_INGESTED_ATTRIBUTE, rangeOnEventIngested);
152+
if (rangeOnTimestamp && rangeOnEventIngested) {
153+
attributes.put(
154+
TIME_RANGE_FILTER_FIELD_ATTRIBUTE,
155+
DataStream.TIMESTAMP_FIELD_NAME + "_AND_" + IndexMetadata.EVENT_INGESTED_FIELD_NAME
156+
);
157+
} else if (rangeOnEventIngested) {
158+
attributes.put(TIME_RANGE_FILTER_FIELD_ATTRIBUTE, IndexMetadata.EVENT_INGESTED_FIELD_NAME);
159+
} else if (rangeOnTimestamp) {
160+
attributes.put(TIME_RANGE_FILTER_FIELD_ATTRIBUTE, DataStream.TIMESTAMP_FIELD_NAME);
151161
}
152-
if (timestampRangeFilter != null) {
153-
attributes.put(TIMESTAMP_RANGE_FILTER_ATTRIBUTE, timestampRangeFilter);
162+
if (timeRangeFilterFrom != null) {
163+
attributes.put(TIME_RANGE_FILTER_FROM_ATTRIBUTE, timeRangeFilterFrom);
154164
}
155165
return attributes;
156166
}
@@ -166,9 +176,8 @@ private static final class QueryMetadataBuilder {
166176
static final String QUERY_TYPE_ATTRIBUTE = "query_type";
167177
static final String PIT_SCROLL_ATTRIBUTE = "pit_scroll";
168178
static final String KNN_ATTRIBUTE = "knn";
169-
static final String RANGE_TIMESTAMP_ATTRIBUTE = "range_timestamp";
170-
static final String RANGE_EVENT_INGESTED_ATTRIBUTE = "range_event_ingested";
171-
static final String TIMESTAMP_RANGE_FILTER_ATTRIBUTE = "timestamp_range_filter";
179+
static final String TIME_RANGE_FILTER_FIELD_ATTRIBUTE = "time_range_filter_field";
180+
static final String TIME_RANGE_FILTER_FROM_ATTRIBUTE = "time_range_filter_from";
172181

173182
private static final String TARGET_KIBANA = ".kibana";
174183
private static final String TARGET_ML = ".ml";
@@ -303,6 +312,10 @@ private static void introspectQueryBuilder(QueryBuilder queryBuilder, QueryMetad
303312
introspectQueryBuilder(nested.query(), queryMetadataBuilder, ++level);
304313
break;
305314
case RangeQueryBuilder range:
315+
// Note that the outcome of this switch differs depending on whether it is executed on the coord node, or data node.
316+
// Data nodes perform query rewrite on each shard. That means that a query that reports a certain time range filter at the
317+
// coordinator, may not report the same for all the shards it targets, but rather only for those that do end up executing
318+
// a true range query at the shard level.
306319
switch (range.fieldName()) {
307320
// don't track unbounded ranges, they translate to either match_none if the field does not exist
308321
// or match_all if the field is mapped
@@ -343,9 +356,16 @@ private enum TimeRangeBucket {
343356
}
344357
}
345358

346-
static String introspectTimeRange(long timeRangeFrom, long nowInMillis) {
359+
public static void addTimeRangeAttribute(Long timeRangeFrom, long nowInMillis, Map<String, Object> attributes) {
360+
if (timeRangeFrom != null) {
361+
String timestampRangeFilter = introspectTimeRange(timeRangeFrom, nowInMillis);
362+
attributes.put(TIME_RANGE_FILTER_FROM_ATTRIBUTE, timestampRangeFilter);
363+
}
364+
}
365+
366+
static String introspectTimeRange(long timeRangeFromMillis, long nowInMillis) {
347367
for (TimeRangeBucket value : TimeRangeBucket.values()) {
348-
if (timeRangeFrom >= nowInMillis - value.millis) {
368+
if (timeRangeFromMillis >= nowInMillis - value.millis) {
349369
return value.bucketName;
350370
}
351371
}

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
8787
private final ShardSearchFailure[] shardFailures;
8888
private final Clusters clusters;
8989
private final long tookInMillis;
90+
// only used for telemetry purposes on the coordinating node, where the search response gets created
91+
private transient Long timeRangeFilterFromMillis;
9092

9193
private final RefCounted refCounted = LeakTracker.wrap(new SimpleRefCounted());
9294

@@ -187,6 +189,7 @@ public SearchResponse(
187189
clusters,
188190
pointInTimeId
189191
);
192+
this.timeRangeFilterFromMillis = searchResponseSections.timeRangeFilterFromMillis;
190193
}
191194

192195
public SearchResponse(
@@ -464,6 +467,10 @@ public void writeTo(StreamOutput out) throws IOException {
464467
out.writeOptionalBytesReference(pointInTimeId);
465468
}
466469

470+
public Long getTimeRangeFilterFromMillis() {
471+
return timeRangeFilterFromMillis;
472+
}
473+
467474
@Override
468475
public String toString() {
469476
return hasReferences() == false ? "SearchResponse[released]" : Strings.toString(this);

server/src/main/java/org/elasticsearch/action/search/SearchResponseSections.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public class SearchResponseSections implements Releasable {
3232
false,
3333
null,
3434
null,
35-
1
35+
1,
36+
null
3637
);
3738
public static final SearchResponseSections EMPTY_WITHOUT_TOTAL_HITS = new SearchResponseSections(
3839
SearchHits.EMPTY_WITHOUT_TOTAL_HITS,
@@ -41,7 +42,8 @@ public class SearchResponseSections implements Releasable {
4142
false,
4243
null,
4344
null,
44-
1
45+
1,
46+
null
4547
);
4648
protected final SearchHits hits;
4749
protected final InternalAggregations aggregations;
@@ -50,6 +52,7 @@ public class SearchResponseSections implements Releasable {
5052
protected final boolean timedOut;
5153
protected final Boolean terminatedEarly;
5254
protected final int numReducePhases;
55+
protected final Long timeRangeFilterFromMillis;
5356

5457
public SearchResponseSections(
5558
SearchHits hits,
@@ -58,7 +61,8 @@ public SearchResponseSections(
5861
boolean timedOut,
5962
Boolean terminatedEarly,
6063
SearchProfileResults profileResults,
61-
int numReducePhases
64+
int numReducePhases,
65+
Long timeRangeFilterFromMillis
6266
) {
6367
this.hits = hits;
6468
this.aggregations = aggregations;
@@ -67,6 +71,7 @@ public SearchResponseSections(
6771
this.timedOut = timedOut;
6872
this.terminatedEarly = terminatedEarly;
6973
this.numReducePhases = numReducePhases;
74+
this.timeRangeFilterFromMillis = timeRangeFilterFromMillis;
7075
}
7176

7277
public final SearchHits hits() {

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,12 @@ public void onFailure(Exception e) {
430430
Arrays.stream(resolvedIndices.getConcreteLocalIndices()).map(Index::getName).toArray(String[]::new)
431431
);
432432
if (collectCCSTelemetry == false || resolvedIndices.getRemoteClusterIndices().isEmpty()) {
433-
searchResponseActionListener = new SearchTelemetryListener(delegate, searchResponseMetrics, searchRequestAttributes);
433+
searchResponseActionListener = new SearchTelemetryListener(
434+
delegate,
435+
searchResponseMetrics,
436+
searchRequestAttributes,
437+
timeProvider.absoluteStartMillis()
438+
);
434439
} else {
435440
CCSUsage.Builder usageBuilder = new CCSUsage.Builder();
436441
usageBuilder.setRemotesCount(resolvedIndices.getRemoteClusterIndices().size());
@@ -459,6 +464,7 @@ public void onFailure(Exception e) {
459464
delegate,
460465
searchResponseMetrics,
461466
searchRequestAttributes,
467+
timeProvider.absoluteStartMillis(),
462468
usageService,
463469
usageBuilder
464470
);
@@ -2046,6 +2052,7 @@ static String[] ignoreBlockedIndices(ProjectState projectState, String[] concret
20462052
private static class SearchTelemetryListener extends DelegatingActionListener<SearchResponse, SearchResponse> {
20472053
private final CCSUsage.Builder usageBuilder;
20482054
private final SearchResponseMetrics searchResponseMetrics;
2055+
private final long nowInMillis;
20492056
private final UsageService usageService;
20502057
private final boolean collectCCSTelemetry;
20512058
private final Map<String, Object> searchRequestAttributes;
@@ -2054,12 +2061,14 @@ private static class SearchTelemetryListener extends DelegatingActionListener<Se
20542061
ActionListener<SearchResponse> listener,
20552062
SearchResponseMetrics searchResponseMetrics,
20562063
Map<String, Object> searchRequestAttributes,
2064+
long nowInMillis,
20572065
UsageService usageService,
20582066
CCSUsage.Builder usageBuilder
20592067
) {
20602068
super(listener);
20612069
this.searchResponseMetrics = searchResponseMetrics;
20622070
this.searchRequestAttributes = searchRequestAttributes;
2071+
this.nowInMillis = nowInMillis;
20632072
this.collectCCSTelemetry = true;
20642073
this.usageService = usageService;
20652074
this.usageBuilder = usageBuilder;
@@ -2068,11 +2077,13 @@ private static class SearchTelemetryListener extends DelegatingActionListener<Se
20682077
SearchTelemetryListener(
20692078
ActionListener<SearchResponse> listener,
20702079
SearchResponseMetrics searchResponseMetrics,
2071-
Map<String, Object> searchRequestAttributes
2080+
Map<String, Object> searchRequestAttributes,
2081+
long nowInMillis
20722082
) {
20732083
super(listener);
20742084
this.searchResponseMetrics = searchResponseMetrics;
20752085
this.searchRequestAttributes = searchRequestAttributes;
2086+
this.nowInMillis = nowInMillis;
20762087
this.collectCCSTelemetry = false;
20772088
this.usageService = null;
20782089
this.usageBuilder = null;
@@ -2081,7 +2092,12 @@ private static class SearchTelemetryListener extends DelegatingActionListener<Se
20812092
@Override
20822093
public void onResponse(SearchResponse searchResponse) {
20832094
try {
2084-
searchResponseMetrics.recordTookTime(searchResponse.getTookInMillis(), searchRequestAttributes);
2095+
searchResponseMetrics.recordTookTime(
2096+
searchResponse.getTookInMillis(),
2097+
searchResponse.getTimeRangeFilterFromMillis(),
2098+
nowInMillis,
2099+
searchRequestAttributes
2100+
);
20852101
SearchResponseMetrics.ResponseCountTotalStatus responseCountTotalStatus =
20862102
SearchResponseMetrics.ResponseCountTotalStatus.SUCCESS;
20872103
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {

0 commit comments

Comments
 (0)