Skip to content

Commit 3b2d47b

Browse files
carlosdelestsmalyshev
authored andcommitted
Add Search Phase APM metrics (#113194)
1 parent 0ac82d2 commit 3b2d47b

File tree

17 files changed

+370
-312
lines changed

17 files changed

+370
-312
lines changed

docs/changelog/113194.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 113194
2+
summary: Add Search Phase APM metrics
3+
area: Search
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/action/search/SearchTransportAPMMetrics.java

Lines changed: 0 additions & 51 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 26 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,6 @@
6767
import java.util.concurrent.Executor;
6868
import java.util.function.BiFunction;
6969

70-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.ACTION_ATTRIBUTE_NAME;
71-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.CLEAR_SCROLL_CONTEXTS_ACTION_METRIC;
72-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.DFS_ACTION_METRIC;
73-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FETCH_ID_ACTION_METRIC;
74-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FETCH_ID_SCROLL_ACTION_METRIC;
75-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FREE_CONTEXT_ACTION_METRIC;
76-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FREE_CONTEXT_SCROLL_ACTION_METRIC;
77-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_ACTION_METRIC;
78-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_CAN_MATCH_NODE_METRIC;
79-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_FETCH_SCROLL_ACTION_METRIC;
80-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_ID_ACTION_METRIC;
81-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_SCROLL_ACTION_METRIC;
82-
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.RANK_SHARD_FEATURE_ACTION_METRIC;
83-
8470
/**
8571
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
8672
* transport.
@@ -450,11 +436,7 @@ public void writeTo(StreamOutput out) throws IOException {
450436
}
451437
}
452438

453-
public static void registerRequestHandler(
454-
TransportService transportService,
455-
SearchService searchService,
456-
SearchTransportAPMMetrics searchTransportMetrics
457-
) {
439+
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
458440
final TransportRequestHandler<ScrollFreeContextRequest> freeContextHandler = (request, channel, task) -> {
459441
logger.trace("releasing search context [{}]", request.id());
460442
boolean freed = searchService.freeReaderContext(request.id());
@@ -465,7 +447,7 @@ public static void registerRequestHandler(
465447
FREE_CONTEXT_SCROLL_ACTION_NAME,
466448
freeContextExecutor,
467449
ScrollFreeContextRequest::new,
468-
instrumentedHandler(FREE_CONTEXT_SCROLL_ACTION_METRIC, transportService, searchTransportMetrics, freeContextHandler)
450+
freeContextHandler
469451
);
470452
TransportActionProxy.registerProxyAction(
471453
transportService,
@@ -478,18 +460,18 @@ public static void registerRequestHandler(
478460
FREE_CONTEXT_ACTION_NAME,
479461
freeContextExecutor,
480462
SearchFreeContextRequest::new,
481-
instrumentedHandler(FREE_CONTEXT_ACTION_METRIC, transportService, searchTransportMetrics, freeContextHandler)
463+
freeContextHandler
482464
);
483465
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::readFrom);
484466

485467
transportService.registerRequestHandler(
486468
CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
487469
freeContextExecutor,
488470
ClearScrollContextsRequest::new,
489-
instrumentedHandler(CLEAR_SCROLL_CONTEXTS_ACTION_METRIC, transportService, searchTransportMetrics, (request, channel, task) -> {
471+
(request, channel, task) -> {
490472
searchService.freeAllScrollContexts();
491473
channel.sendResponse(TransportResponse.Empty.INSTANCE);
492-
})
474+
}
493475
);
494476
TransportActionProxy.registerProxyAction(
495477
transportService,
@@ -502,32 +484,18 @@ public static void registerRequestHandler(
502484
DFS_ACTION_NAME,
503485
EsExecutors.DIRECT_EXECUTOR_SERVICE,
504486
ShardSearchRequest::new,
505-
instrumentedHandler(
506-
DFS_ACTION_METRIC,
507-
transportService,
508-
searchTransportMetrics,
509-
(request, channel, task) -> searchService.executeDfsPhase(
510-
request,
511-
(SearchShardTask) task,
512-
new ChannelActionListener<>(channel)
513-
)
514-
)
487+
(request, channel, task) -> searchService.executeDfsPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel))
515488
);
516489
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, true, DfsSearchResult::new);
517490

518491
transportService.registerRequestHandler(
519492
QUERY_ACTION_NAME,
520493
EsExecutors.DIRECT_EXECUTOR_SERVICE,
521494
ShardSearchRequest::new,
522-
instrumentedHandler(
523-
QUERY_ACTION_METRIC,
524-
transportService,
525-
searchTransportMetrics,
526-
(request, channel, task) -> searchService.executeQueryPhase(
527-
request,
528-
(SearchShardTask) task,
529-
new ChannelActionListener<>(channel)
530-
)
495+
(request, channel, task) -> searchService.executeQueryPhase(
496+
request,
497+
(SearchShardTask) task,
498+
new ChannelActionListener<>(channel)
531499
)
532500
);
533501
TransportActionProxy.registerProxyActionWithDynamicResponseType(
@@ -541,15 +509,10 @@ public static void registerRequestHandler(
541509
QUERY_ID_ACTION_NAME,
542510
EsExecutors.DIRECT_EXECUTOR_SERVICE,
543511
QuerySearchRequest::new,
544-
instrumentedHandler(
545-
QUERY_ID_ACTION_METRIC,
546-
transportService,
547-
searchTransportMetrics,
548-
(request, channel, task) -> searchService.executeQueryPhase(
549-
request,
550-
(SearchShardTask) task,
551-
new ChannelActionListener<>(channel)
552-
)
512+
(request, channel, task) -> searchService.executeQueryPhase(
513+
request,
514+
(SearchShardTask) task,
515+
new ChannelActionListener<>(channel)
553516
)
554517
);
555518
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, true, QuerySearchResult::new);
@@ -558,15 +521,10 @@ public static void registerRequestHandler(
558521
QUERY_SCROLL_ACTION_NAME,
559522
EsExecutors.DIRECT_EXECUTOR_SERVICE,
560523
InternalScrollSearchRequest::new,
561-
instrumentedHandler(
562-
QUERY_SCROLL_ACTION_METRIC,
563-
transportService,
564-
searchTransportMetrics,
565-
(request, channel, task) -> searchService.executeQueryPhase(
566-
request,
567-
(SearchShardTask) task,
568-
new ChannelActionListener<>(channel)
569-
)
524+
(request, channel, task) -> searchService.executeQueryPhase(
525+
request,
526+
(SearchShardTask) task,
527+
new ChannelActionListener<>(channel)
570528
)
571529
);
572530
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, true, ScrollQuerySearchResult::new);
@@ -575,15 +533,10 @@ public static void registerRequestHandler(
575533
QUERY_FETCH_SCROLL_ACTION_NAME,
576534
EsExecutors.DIRECT_EXECUTOR_SERVICE,
577535
InternalScrollSearchRequest::new,
578-
instrumentedHandler(
579-
QUERY_FETCH_SCROLL_ACTION_METRIC,
580-
transportService,
581-
searchTransportMetrics,
582-
(request, channel, task) -> searchService.executeFetchPhase(
583-
request,
584-
(SearchShardTask) task,
585-
new ChannelActionListener<>(channel)
586-
)
536+
(request, channel, task) -> searchService.executeFetchPhase(
537+
request,
538+
(SearchShardTask) task,
539+
new ChannelActionListener<>(channel)
587540
)
588541
);
589542
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, true, ScrollQueryFetchSearchResult::new);
@@ -594,7 +547,7 @@ public static void registerRequestHandler(
594547
RANK_FEATURE_SHARD_ACTION_NAME,
595548
EsExecutors.DIRECT_EXECUTOR_SERVICE,
596549
RankFeatureShardRequest::new,
597-
instrumentedHandler(RANK_SHARD_FEATURE_ACTION_METRIC, transportService, searchTransportMetrics, rankShardFeatureRequest)
550+
rankShardFeatureRequest
598551
);
599552
TransportActionProxy.registerProxyAction(transportService, RANK_FEATURE_SHARD_ACTION_NAME, true, RankFeatureResult::new);
600553

@@ -604,7 +557,7 @@ public static void registerRequestHandler(
604557
FETCH_ID_SCROLL_ACTION_NAME,
605558
EsExecutors.DIRECT_EXECUTOR_SERVICE,
606559
ShardFetchRequest::new,
607-
instrumentedHandler(FETCH_ID_SCROLL_ACTION_METRIC, transportService, searchTransportMetrics, shardFetchRequestHandler)
560+
shardFetchRequestHandler
608561
);
609562
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, true, FetchSearchResult::new);
610563

@@ -614,20 +567,15 @@ public static void registerRequestHandler(
614567
true,
615568
true,
616569
ShardFetchSearchRequest::new,
617-
instrumentedHandler(FETCH_ID_ACTION_METRIC, transportService, searchTransportMetrics, shardFetchRequestHandler)
570+
shardFetchRequestHandler
618571
);
619572
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, true, FetchSearchResult::new);
620573

621574
transportService.registerRequestHandler(
622575
QUERY_CAN_MATCH_NODE_NAME,
623576
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION),
624577
CanMatchNodeRequest::new,
625-
instrumentedHandler(
626-
QUERY_CAN_MATCH_NODE_METRIC,
627-
transportService,
628-
searchTransportMetrics,
629-
(request, channel, task) -> searchService.canMatch(request, new ChannelActionListener<>(channel))
630-
)
578+
(request, channel, task) -> searchService.canMatch(request, new ChannelActionListener<>(channel))
631579
);
632580
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NODE_NAME, true, CanMatchNodeResponse::new);
633581
}
@@ -658,26 +606,6 @@ public void onFailure(Exception e) {
658606
});
659607
}
660608

661-
private static <Request extends TransportRequest> TransportRequestHandler<Request> instrumentedHandler(
662-
String actionQualifier,
663-
TransportService transportService,
664-
SearchTransportAPMMetrics searchTransportMetrics,
665-
TransportRequestHandler<Request> transportRequestHandler
666-
) {
667-
var threadPool = transportService.getThreadPool();
668-
var latencies = searchTransportMetrics.getActionLatencies();
669-
Map<String, Object> attributes = Map.of(ACTION_ATTRIBUTE_NAME, actionQualifier);
670-
return (request, channel, task) -> {
671-
var startTime = threadPool.relativeTimeInMillis();
672-
try {
673-
transportRequestHandler.messageReceived(request, channel, task);
674-
} finally {
675-
var elapsedTime = threadPool.relativeTimeInMillis() - startTime;
676-
latencies.record(elapsedTime, attributes);
677-
}
678-
};
679-
}
680-
681609
/**
682610
* Returns a connection to the given node on the provided cluster. If the cluster alias is <code>null</code> the node will be resolved
683611
* against the local cluster.

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ public TransportSearchAction(
175175
IndexNameExpressionResolver indexNameExpressionResolver,
176176
NamedWriteableRegistry namedWriteableRegistry,
177177
ExecutorSelector executorSelector,
178-
SearchTransportAPMMetrics searchTransportMetrics,
179178
SearchResponseMetrics searchResponseMetrics,
180179
Client client,
181180
UsageService usageService
@@ -186,7 +185,7 @@ public TransportSearchAction(
186185
this.searchPhaseController = searchPhaseController;
187186
this.searchTransportService = searchTransportService;
188187
this.remoteClusterService = searchTransportService.getRemoteClusterService();
189-
SearchTransportService.registerRequestHandler(transportService, searchService, searchTransportMetrics);
188+
SearchTransportService.registerRequestHandler(transportService, searchService);
190189
this.clusterService = clusterService;
191190
this.transportService = transportService;
192191
this.searchService = searchService;

server/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public interface DirectoryWrapper {
167167
private final Map<String, TriFunction<Settings, IndexVersion, ScriptService, Similarity>> similarities = new HashMap<>();
168168
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
169169
private final SetOnce<BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> forceQueryCacheProvider = new SetOnce<>();
170-
private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
170+
private final List<SearchOperationListener> searchOperationListeners;
171171
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
172172
private final IndexNameExpressionResolver expressionResolver;
173173
private final AtomicBoolean frozen = new AtomicBoolean(false);
@@ -194,11 +194,14 @@ public IndexModule(
194194
final IndexNameExpressionResolver expressionResolver,
195195
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
196196
final SlowLogFieldProvider slowLogFieldProvider,
197-
final MapperMetrics mapperMetrics
197+
final MapperMetrics mapperMetrics,
198+
final List<SearchOperationListener> searchOperationListeners
198199
) {
199200
this.indexSettings = indexSettings;
200201
this.analysisRegistry = analysisRegistry;
201202
this.engineFactory = Objects.requireNonNull(engineFactory);
203+
// Need to have a mutable arraylist for plugins to add listeners to it
204+
this.searchOperationListeners = new ArrayList<>(searchOperationListeners);
202205
this.searchOperationListeners.add(new SearchSlowLog(indexSettings, slowLogFieldProvider));
203206
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings, slowLogFieldProvider));
204207
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.search.stats;
11+
12+
import org.elasticsearch.common.util.concurrent.EsExecutors;
13+
import org.elasticsearch.index.shard.SearchOperationListener;
14+
import org.elasticsearch.search.internal.SearchContext;
15+
import org.elasticsearch.telemetry.metric.LongHistogram;
16+
import org.elasticsearch.telemetry.metric.MeterRegistry;
17+
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
import java.util.concurrent.TimeUnit;
21+
22+
public final class ShardSearchPhaseAPMMetrics implements SearchOperationListener {
23+
24+
public static final String QUERY_SEARCH_PHASE_METRIC = "es.search.shards.phases.query.duration.histogram";
25+
public static final String FETCH_SEARCH_PHASE_METRIC = "es.search.shards.phases.fetch.duration.histogram";
26+
27+
public static final String SYSTEM_THREAD_ATTRIBUTE_NAME = "system_thread";
28+
29+
private final LongHistogram queryPhaseMetric;
30+
private final LongHistogram fetchPhaseMetric;
31+
32+
// Avoid allocating objects in the search path and multithreading clashes
33+
private static final ThreadLocal<Map<String, Object>> THREAD_LOCAL_ATTRS = ThreadLocal.withInitial(() -> new HashMap<>(1));
34+
35+
public ShardSearchPhaseAPMMetrics(MeterRegistry meterRegistry) {
36+
this.queryPhaseMetric = meterRegistry.registerLongHistogram(
37+
QUERY_SEARCH_PHASE_METRIC,
38+
"Query search phase execution times at the shard level, expressed as a histogram",
39+
"ms"
40+
);
41+
this.fetchPhaseMetric = meterRegistry.registerLongHistogram(
42+
FETCH_SEARCH_PHASE_METRIC,
43+
"Fetch search phase execution times at the shard level, expressed as a histogram",
44+
"ms"
45+
);
46+
}
47+
48+
@Override
49+
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
50+
recordPhaseLatency(queryPhaseMetric, tookInNanos);
51+
}
52+
53+
@Override
54+
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
55+
recordPhaseLatency(fetchPhaseMetric, tookInNanos);
56+
}
57+
58+
private static void recordPhaseLatency(LongHistogram histogramMetric, long tookInNanos) {
59+
Map<String, Object> attrs = ShardSearchPhaseAPMMetrics.THREAD_LOCAL_ATTRS.get();
60+
boolean isSystem = ((EsExecutors.EsThread) Thread.currentThread()).isSystem();
61+
attrs.put(SYSTEM_THREAD_ATTRIBUTE_NAME, isSystem);
62+
histogramMetric.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos), attrs);
63+
}
64+
}

0 commit comments

Comments
 (0)