Skip to content

Commit fad3c42

Browse files
committed
Add time range bucketing attribute to APM took time latency metrics
This is similar to elastic#135524, but adding the attribute to the took time latency metric. That requires a bit of ceremony as the took time metric is recorded on the coordinating node, while the time range filter is parsed on each shard. We don't have mappings available on the coord node, which are needed to parse dates on the coord node. Thus we need to rely on date parsing done on the data nodes, which requires sending back the parsed value to the coord node, performing some simple reduction on it, and adding it back to the search response.
1 parent b3124f7 commit fad3c42

File tree

25 files changed

+270
-67
lines changed

25 files changed

+270
-67
lines changed

server/src/main/java/org/elasticsearch/action/search/CountOnlyQueryPhaseResultConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
8686
1,
8787
0,
8888
0,
89-
results.isEmpty()
89+
results.isEmpty(),
90+
null
9091
);
9192
if (progressListener != SearchProgressListener.NOOP) {
9293
progressListener.notifyFinalReduce(

server/src/main/java/org/elasticsearch/action/search/RankFeaturePhase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,8 @@ private SearchPhaseController.ReducedQueryPhase newReducedQueryPhaseResults(
251251
reducedQueryPhase.numReducePhases(),
252252
reducedQueryPhase.size(),
253253
reducedQueryPhase.from(),
254-
reducedQueryPhase.isEmptyResult()
254+
reducedQueryPhase.isEmptyResult(),
255+
reducedQueryPhase.rangeTimestampFrom()
255256
);
256257
}
257258

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,8 @@ static ReducedQueryPhase reducedQueryPhase(
487487
numReducePhases,
488488
0,
489489
0,
490-
true
490+
true,
491+
null
491492
);
492493
}
493494
final List<QuerySearchResult> nonNullResults = new ArrayList<>();
@@ -516,6 +517,7 @@ static ReducedQueryPhase reducedQueryPhase(
516517
: Collections.emptyMap();
517518
int from = 0;
518519
int size = 0;
520+
Long rangeTimestampFrom = null;
519521
DocValueFormat[] sortValueFormats = null;
520522
for (QuerySearchResult result : nonNullResults) {
521523
from = result.from();
@@ -525,6 +527,10 @@ static ReducedQueryPhase reducedQueryPhase(
525527
sortValueFormats = result.sortValueFormats();
526528
}
527529

530+
if (rangeTimestampFrom == null) {
531+
rangeTimestampFrom = result.getRangeTimestampFrom();
532+
}
533+
528534
if (hasSuggest) {
529535
assert result.suggest() != null;
530536
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
@@ -579,7 +585,8 @@ static ReducedQueryPhase reducedQueryPhase(
579585
numReducePhases,
580586
size,
581587
from,
582-
false
588+
false,
589+
rangeTimestampFrom
583590
);
584591
}
585592

@@ -662,7 +669,8 @@ public record ReducedQueryPhase(
662669
// the offset into the merged top hits
663670
int from,
664671
// <code>true</code> iff the query phase had no results. Otherwise <code>false</code>
665-
boolean isEmptyResult
672+
boolean isEmptyResult,
673+
Long rangeTimestampFrom
666674
) {
667675

668676
public ReducedQueryPhase {
@@ -683,7 +691,8 @@ public SearchResponseSections buildResponse(SearchHits hits, Collection<? extend
683691
timedOut,
684692
terminatedEarly,
685693
buildSearchProfileResults(fetchResults),
686-
numReducePhases
694+
numReducePhases,
695+
rangeTimestampFrom
687696
);
688697
}
689698

server/src/main/java/org/elasticsearch/action/search/SearchRequestAttributesExtractor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,13 @@ private enum TimeRangeBucket {
343343
}
344344
}
345345

346+
public static void addTimeRangeAttribute(Long timeRangeFrom, long nowInMillis, Map<String, Object> attributes) {
347+
if (timeRangeFrom != null) {
348+
String timestampRangeFilter = introspectTimeRange(timeRangeFrom, nowInMillis);
349+
attributes.put(TIMESTAMP_RANGE_FILTER_ATTRIBUTE, timestampRangeFilter);
350+
}
351+
}
352+
346353
static String introspectTimeRange(long timeRangeFrom, long nowInMillis) {
347354
for (TimeRangeBucket value : TimeRangeBucket.values()) {
348355
if (timeRangeFrom >= nowInMillis - value.millis) {

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.search;
1111

12+
import org.elasticsearch.TransportVersion;
1213
import org.elasticsearch.TransportVersions;
1314
import org.elasticsearch.action.ActionResponse;
1415
import org.elasticsearch.action.OriginalIndices;
@@ -61,6 +62,8 @@
6162
*/
6263
public class SearchResponse extends ActionResponse implements ChunkedToXContentObject {
6364

65+
public static final TransportVersion TIMESTAMP_RANGE_TELEMETRY = TransportVersion.fromName("timestamp_range_telemetry");
66+
6467
// for cross-cluster scenarios where cluster names are shown in API responses, use this string
6568
// rather than empty string (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) we use internally
6669
public static final String LOCAL_CLUSTER_NAME_REPRESENTATION = "(local)";
@@ -87,6 +90,7 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
8790
private final ShardSearchFailure[] shardFailures;
8891
private final Clusters clusters;
8992
private final long tookInMillis;
93+
private final Long rangeTimestampFrom;
9094

9195
private final RefCounted refCounted = LeakTracker.wrap(new SimpleRefCounted());
9296

@@ -122,6 +126,11 @@ public SearchResponse(StreamInput in) throws IOException {
122126
tookInMillis = in.readVLong();
123127
skippedShards = in.readVInt();
124128
pointInTimeId = in.readOptionalBytesReference();
129+
if (in.getTransportVersion().onOrAfter(TIMESTAMP_RANGE_TELEMETRY)) {
130+
rangeTimestampFrom = in.readOptionalLong();
131+
} else {
132+
rangeTimestampFrom = null;
133+
}
125134
}
126135

127136
public SearchResponse(
@@ -155,6 +164,7 @@ public SearchResponse(
155164
tookInMillis,
156165
shardFailures,
157166
clusters,
167+
null,
158168
null
159169
);
160170
}
@@ -185,7 +195,8 @@ public SearchResponse(
185195
tookInMillis,
186196
shardFailures,
187197
clusters,
188-
pointInTimeId
198+
pointInTimeId,
199+
searchResponseSections.rangeTimestampFrom
189200
);
190201
}
191202

@@ -204,7 +215,8 @@ public SearchResponse(
204215
long tookInMillis,
205216
ShardSearchFailure[] shardFailures,
206217
Clusters clusters,
207-
BytesReference pointInTimeId
218+
BytesReference pointInTimeId,
219+
Long rangeTimestampFrom
208220
) {
209221
this.hits = hits;
210222
hits.incRef();
@@ -225,6 +237,7 @@ public SearchResponse(
225237
assert skippedShards <= totalShards : "skipped: " + skippedShards + " total: " + totalShards;
226238
assert scrollId == null || pointInTimeId == null
227239
: "SearchResponse can't have both scrollId [" + scrollId + "] and searchContextId [" + pointInTimeId + "]";
240+
this.rangeTimestampFrom = rangeTimestampFrom;
228241
}
229242

230243
@Override
@@ -462,6 +475,13 @@ public void writeTo(StreamOutput out) throws IOException {
462475
out.writeVLong(tookInMillis);
463476
out.writeVInt(skippedShards);
464477
out.writeOptionalBytesReference(pointInTimeId);
478+
if (out.getTransportVersion().onOrAfter(TIMESTAMP_RANGE_TELEMETRY)) {
479+
out.writeOptionalLong(rangeTimestampFrom);
480+
}
481+
}
482+
483+
public Long getRangeTimestampFrom() {
484+
return rangeTimestampFrom;
465485
}
466486

467487
@Override
@@ -1152,6 +1172,7 @@ public static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters
11521172
tookInMillisSupplier.get(),
11531173
ShardSearchFailure.EMPTY_ARRAY,
11541174
clusters,
1175+
null,
11551176
null
11561177
);
11571178
}

server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
232232
tookInMillis,
233233
shardFailures,
234234
clusters,
235+
null,
235236
null
236237
);
237238
} finally {

server/src/main/java/org/elasticsearch/action/search/SearchResponseSections.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public class SearchResponseSections implements Releasable {
3232
false,
3333
null,
3434
null,
35-
1
35+
1,
36+
null
3637
);
3738
public static final SearchResponseSections EMPTY_WITHOUT_TOTAL_HITS = new SearchResponseSections(
3839
SearchHits.EMPTY_WITHOUT_TOTAL_HITS,
@@ -41,7 +42,8 @@ public class SearchResponseSections implements Releasable {
4142
false,
4243
null,
4344
null,
44-
1
45+
1,
46+
null
4547
);
4648
protected final SearchHits hits;
4749
protected final InternalAggregations aggregations;
@@ -50,6 +52,7 @@ public class SearchResponseSections implements Releasable {
5052
protected final boolean timedOut;
5153
protected final Boolean terminatedEarly;
5254
protected final int numReducePhases;
55+
protected final Long rangeTimestampFrom;
5356

5457
public SearchResponseSections(
5558
SearchHits hits,
@@ -58,7 +61,8 @@ public SearchResponseSections(
5861
boolean timedOut,
5962
Boolean terminatedEarly,
6063
SearchProfileResults profileResults,
61-
int numReducePhases
64+
int numReducePhases,
65+
Long rangeTimestampFrom
6266
) {
6367
this.hits = hits;
6468
this.aggregations = aggregations;
@@ -67,6 +71,7 @@ public SearchResponseSections(
6771
this.timedOut = timedOut;
6872
this.terminatedEarly = terminatedEarly;
6973
this.numReducePhases = numReducePhases;
74+
this.rangeTimestampFrom = rangeTimestampFrom;
7075
}
7176

7277
public final SearchHits hits() {

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,12 @@ public void onFailure(Exception e) {
430430
Arrays.stream(resolvedIndices.getConcreteLocalIndices()).map(Index::getName).toArray(String[]::new)
431431
);
432432
if (collectCCSTelemetry == false || resolvedIndices.getRemoteClusterIndices().isEmpty()) {
433-
searchResponseActionListener = new SearchTelemetryListener(delegate, searchResponseMetrics, searchRequestAttributes);
433+
searchResponseActionListener = new SearchTelemetryListener(
434+
delegate,
435+
searchResponseMetrics,
436+
searchRequestAttributes,
437+
timeProvider.absoluteStartMillis()
438+
);
434439
} else {
435440
CCSUsage.Builder usageBuilder = new CCSUsage.Builder();
436441
usageBuilder.setRemotesCount(resolvedIndices.getRemoteClusterIndices().size());
@@ -459,6 +464,7 @@ public void onFailure(Exception e) {
459464
delegate,
460465
searchResponseMetrics,
461466
searchRequestAttributes,
467+
timeProvider.absoluteStartMillis(),
462468
usageService,
463469
usageBuilder
464470
);
@@ -776,7 +782,8 @@ public void onResponse(SearchResponse searchResponse) {
776782
timeProvider.buildTookInMillis(),
777783
searchResponse.getShardFailures(),
778784
clusters,
779-
searchResponse.pointInTimeId()
785+
searchResponse.pointInTimeId(),
786+
searchResponse.getRangeTimestampFrom()
780787
)
781788
);
782789
}
@@ -2046,6 +2053,7 @@ static String[] ignoreBlockedIndices(ProjectState projectState, String[] concret
20462053
private static class SearchTelemetryListener extends DelegatingActionListener<SearchResponse, SearchResponse> {
20472054
private final CCSUsage.Builder usageBuilder;
20482055
private final SearchResponseMetrics searchResponseMetrics;
2056+
private final long nowInMillis;
20492057
private final UsageService usageService;
20502058
private final boolean collectCCSTelemetry;
20512059
private final Map<String, Object> searchRequestAttributes;
@@ -2054,12 +2062,14 @@ private static class SearchTelemetryListener extends DelegatingActionListener<Se
20542062
ActionListener<SearchResponse> listener,
20552063
SearchResponseMetrics searchResponseMetrics,
20562064
Map<String, Object> searchRequestAttributes,
2065+
long nowInMillis,
20572066
UsageService usageService,
20582067
CCSUsage.Builder usageBuilder
20592068
) {
20602069
super(listener);
20612070
this.searchResponseMetrics = searchResponseMetrics;
20622071
this.searchRequestAttributes = searchRequestAttributes;
2072+
this.nowInMillis = nowInMillis;
20632073
this.collectCCSTelemetry = true;
20642074
this.usageService = usageService;
20652075
this.usageBuilder = usageBuilder;
@@ -2068,11 +2078,13 @@ private static class SearchTelemetryListener extends DelegatingActionListener<Se
20682078
SearchTelemetryListener(
20692079
ActionListener<SearchResponse> listener,
20702080
SearchResponseMetrics searchResponseMetrics,
2071-
Map<String, Object> searchRequestAttributes
2081+
Map<String, Object> searchRequestAttributes,
2082+
long nowInMillis
20722083
) {
20732084
super(listener);
20742085
this.searchResponseMetrics = searchResponseMetrics;
20752086
this.searchRequestAttributes = searchRequestAttributes;
2087+
this.nowInMillis = nowInMillis;
20762088
this.collectCCSTelemetry = false;
20772089
this.usageService = null;
20782090
this.usageBuilder = null;
@@ -2081,7 +2093,12 @@ private static class SearchTelemetryListener extends DelegatingActionListener<Se
20812093
@Override
20822094
public void onResponse(SearchResponse searchResponse) {
20832095
try {
2084-
searchResponseMetrics.recordTookTime(searchResponse.getTookInMillis(), searchRequestAttributes);
2096+
searchResponseMetrics.recordTookTime(
2097+
searchResponse.getTookInMillis(),
2098+
searchResponse.getRangeTimestampFrom(),
2099+
nowInMillis,
2100+
searchRequestAttributes
2101+
);
20852102
SearchResponseMetrics.ResponseCountTotalStatus responseCountTotalStatus =
20862103
SearchResponseMetrics.ResponseCountTotalStatus.SUCCESS;
20872104
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {

server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -944,7 +944,7 @@ public Relation isFieldWithinQuery(
944944
minValue = Long.min(minValue, skipper.minValue());
945945
maxValue = Long.max(maxValue, skipper.maxValue());
946946
}
947-
return isFieldWithinQuery(minValue, maxValue, from, to, includeLower, includeUpper, timeZone, dateParser, context);
947+
return isFieldWithinQuery(minValue, maxValue, from, to, includeLower, includeUpper, timeZone, dateParser, context, name());
948948
}
949949
byte[] minPackedValue = PointValues.getMinPackedValue(reader, name());
950950
if (minPackedValue == null) {
@@ -954,7 +954,7 @@ public Relation isFieldWithinQuery(
954954
long minValue = LongPoint.decodeDimension(minPackedValue, 0);
955955
long maxValue = LongPoint.decodeDimension(PointValues.getMaxPackedValue(reader, name()), 0);
956956

957-
return isFieldWithinQuery(minValue, maxValue, from, to, includeLower, includeUpper, timeZone, dateParser, context);
957+
return isFieldWithinQuery(minValue, maxValue, from, to, includeLower, includeUpper, timeZone, dateParser, context, name());
958958
}
959959

960960
public DateMathParser resolveDateMathParser(DateMathParser dateParser, Object from, Object to) {
@@ -977,7 +977,8 @@ public Relation isFieldWithinQuery(
977977
boolean includeUpper,
978978
ZoneId timeZone,
979979
DateMathParser dateParser,
980-
QueryRewriteContext context
980+
QueryRewriteContext context,
981+
String fieldName
981982
) {
982983
dateParser = resolveDateMathParser(dateParser, from, to);
983984

@@ -990,6 +991,9 @@ public Relation isFieldWithinQuery(
990991
}
991992
++fromInclusive;
992993
}
994+
if (fieldName.equals(DataStream.TIMESTAMP_FIELD_NAME)) {
995+
context.setRangeTimestampFrom(fromInclusive);
996+
}
993997
}
994998

995999
long toInclusive = Long.MAX_VALUE;

server/src/main/java/org/elasticsearch/index/query/QueryRewriteContext.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public class QueryRewriteContext {
7878
private QueryRewriteInterceptor queryRewriteInterceptor;
7979
private final Boolean ccsMinimizeRoundTrips;
8080
private final boolean isExplain;
81+
private Long rangeTimestampFrom;
8182

8283
public QueryRewriteContext(
8384
final XContentParserConfiguration parserConfiguration,
@@ -520,4 +521,21 @@ public void setQueryRewriteInterceptor(QueryRewriteInterceptor queryRewriteInter
520521
this.queryRewriteInterceptor = queryRewriteInterceptor;
521522
}
522523

524+
/**
525+
* Returns the minimum lower bound across the time ranges filters against the @timestamp field included in the query
526+
*/
527+
public Long getRangeTimestampFrom() {
528+
return rangeTimestampFrom;
529+
}
530+
531+
/**
532+
* Records the lower bound of a time range filter against the @timestamp field included in the query. For telemetry purposes.
533+
*/
534+
public void setRangeTimestampFrom(long rangeTimestampFrom) {
535+
if (this.rangeTimestampFrom == null) {
536+
this.rangeTimestampFrom = rangeTimestampFrom;
537+
} else {
538+
this.rangeTimestampFrom = Math.min(rangeTimestampFrom, this.rangeTimestampFrom);
539+
}
540+
}
523541
}

0 commit comments

Comments
 (0)