Skip to content

Commit 5b598bc

Browse files
smalyshevquux00
authored andcommitted
Collecting CCS usage telemetry stats (elastic#111905)
* This creates the use CCSUsage and CCSUsageTelemetry classes and wires them up to the UsageService. An initial set of telemetry metrics are now being gathered in TransportSearchAction. Many more will be added later to meet all the requirements for the CCS Telemetry epic of work. Co-authored-by: Michael Peterson <[email protected]>
1 parent 5f089eb commit 5b598bc

File tree

15 files changed

+3092
-60
lines changed

15 files changed

+3092
-60
lines changed

server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CCSUsageTelemetryIT.java

Lines changed: 708 additions & 0 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java

Lines changed: 404 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.admin.cluster.stats;
10+
11+
import org.elasticsearch.ElasticsearchSecurityException;
12+
import org.elasticsearch.ExceptionsHelper;
13+
import org.elasticsearch.ResourceNotFoundException;
14+
import org.elasticsearch.action.ShardOperationFailedException;
15+
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.Result;
16+
import org.elasticsearch.action.search.SearchPhaseExecutionException;
17+
import org.elasticsearch.action.search.ShardSearchFailure;
18+
import org.elasticsearch.action.search.TransportSearchAction;
19+
import org.elasticsearch.core.TimeValue;
20+
import org.elasticsearch.search.SearchShardTarget;
21+
import org.elasticsearch.search.query.SearchTimeoutException;
22+
import org.elasticsearch.tasks.TaskCancelledException;
23+
import org.elasticsearch.transport.ConnectTransportException;
24+
import org.elasticsearch.transport.NoSeedNodeLeftException;
25+
import org.elasticsearch.transport.NoSuchRemoteClusterException;
26+
27+
import java.util.Arrays;
28+
import java.util.HashMap;
29+
import java.util.HashSet;
30+
import java.util.Map;
31+
import java.util.Set;
32+
33+
import static org.elasticsearch.transport.RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
34+
35+
/**
36+
* This is a container for telemetry data from an individual cross-cluster search for _search or _async_search (or
37+
* other search endpoints that use the {@link TransportSearchAction} such as _msearch).
38+
*/
39+
public class CCSUsage {
40+
private final long took;
41+
private final Result status;
42+
private final Set<String> features;
43+
private final int remotesCount;
44+
45+
private final String client;
46+
47+
private final Set<String> skippedRemotes;
48+
private final Map<String, PerClusterUsage> perClusterUsage;
49+
50+
public static class Builder {
51+
private long took;
52+
private final Set<String> features;
53+
private Result status = Result.SUCCESS;
54+
private int remotesCount;
55+
private String client;
56+
private final Set<String> skippedRemotes;
57+
private final Map<String, PerClusterUsage> perClusterUsage;
58+
59+
public Builder() {
60+
features = new HashSet<>();
61+
skippedRemotes = new HashSet<>();
62+
perClusterUsage = new HashMap<>();
63+
}
64+
65+
public Builder took(long took) {
66+
this.took = took;
67+
return this;
68+
}
69+
70+
public Builder setFailure(Result failureType) {
71+
this.status = failureType;
72+
return this;
73+
}
74+
75+
public Builder setFailure(Exception e) {
76+
return setFailure(getFailureType(e));
77+
}
78+
79+
public Builder setFeature(String feature) {
80+
this.features.add(feature);
81+
return this;
82+
}
83+
84+
public Builder setClient(String client) {
85+
this.client = client;
86+
return this;
87+
}
88+
89+
public Builder skippedRemote(String remote) {
90+
this.skippedRemotes.add(remote);
91+
return this;
92+
}
93+
94+
public Builder perClusterUsage(String remote, TimeValue took) {
95+
this.perClusterUsage.put(remote, new PerClusterUsage(took));
96+
return this;
97+
}
98+
99+
public CCSUsage build() {
100+
return new CCSUsage(took, status, remotesCount, skippedRemotes, features, client, perClusterUsage);
101+
}
102+
103+
public Builder setRemotesCount(int remotesCount) {
104+
this.remotesCount = remotesCount;
105+
return this;
106+
}
107+
108+
public int getRemotesCount() {
109+
return remotesCount;
110+
}
111+
112+
/**
113+
* Get failure type as {@link Result} from the search failure exception.
114+
*/
115+
public static Result getFailureType(Exception e) {
116+
var unwrapped = ExceptionsHelper.unwrapCause(e);
117+
if (unwrapped instanceof Exception) {
118+
e = (Exception) unwrapped;
119+
}
120+
if (isRemoteUnavailable(e)) {
121+
return Result.REMOTES_UNAVAILABLE;
122+
}
123+
if (ExceptionsHelper.unwrap(e, ResourceNotFoundException.class) != null) {
124+
return Result.NOT_FOUND;
125+
}
126+
if (e instanceof TaskCancelledException || (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null)) {
127+
return Result.CANCELED;
128+
}
129+
if (ExceptionsHelper.unwrap(e, SearchTimeoutException.class) != null) {
130+
return Result.TIMEOUT;
131+
}
132+
if (ExceptionsHelper.unwrap(e, ElasticsearchSecurityException.class) != null) {
133+
return Result.SECURITY;
134+
}
135+
if (ExceptionsHelper.unwrapCorruption(e) != null) {
136+
return Result.CORRUPTION;
137+
}
138+
// This is kind of last resort check - if we still don't know the reason but all shard failures are remote,
139+
// we assume it's remote's fault somehow.
140+
if (e instanceof SearchPhaseExecutionException spe) {
141+
// If this is a failure that happened because of remote failures only
142+
var groupedFails = ExceptionsHelper.groupBy(spe.shardFailures());
143+
if (Arrays.stream(groupedFails).allMatch(Builder::isRemoteFailure)) {
144+
return Result.REMOTES_UNAVAILABLE;
145+
}
146+
}
147+
// OK we don't know what happened
148+
return Result.UNKNOWN;
149+
}
150+
151+
/**
152+
* Is this failure exception because remote was unavailable?
153+
* See also: TransportResolveClusterAction#notConnectedError
154+
*/
155+
static boolean isRemoteUnavailable(Exception e) {
156+
if (ExceptionsHelper.unwrap(
157+
e,
158+
ConnectTransportException.class,
159+
NoSuchRemoteClusterException.class,
160+
NoSeedNodeLeftException.class
161+
) != null) {
162+
return true;
163+
}
164+
Throwable ill = ExceptionsHelper.unwrap(e, IllegalStateException.class, IllegalArgumentException.class);
165+
if (ill != null && (ill.getMessage().contains("Unable to open any connections") || ill.getMessage().contains("unknown host"))) {
166+
return true;
167+
}
168+
// Ok doesn't look like any of the known remote exceptions
169+
return false;
170+
}
171+
172+
/**
173+
* Is this failure coming from a remote cluster?
174+
*/
175+
static boolean isRemoteFailure(ShardOperationFailedException failure) {
176+
if (failure instanceof ShardSearchFailure shardFailure) {
177+
SearchShardTarget shard = shardFailure.shard();
178+
return shard != null && shard.getClusterAlias() != null && LOCAL_CLUSTER_GROUP_KEY.equals(shard.getClusterAlias()) == false;
179+
}
180+
return false;
181+
}
182+
}
183+
184+
private CCSUsage(
185+
long took,
186+
Result status,
187+
int remotesCount,
188+
Set<String> skippedRemotes,
189+
Set<String> features,
190+
String client,
191+
Map<String, PerClusterUsage> perClusterUsage
192+
) {
193+
this.status = status;
194+
this.remotesCount = remotesCount;
195+
this.features = features;
196+
this.client = client;
197+
this.took = took;
198+
this.skippedRemotes = skippedRemotes;
199+
this.perClusterUsage = perClusterUsage;
200+
}
201+
202+
public Map<String, PerClusterUsage> getPerClusterUsage() {
203+
return perClusterUsage;
204+
}
205+
206+
public Result getStatus() {
207+
return status;
208+
}
209+
210+
public Set<String> getFeatures() {
211+
return features;
212+
}
213+
214+
public long getRemotesCount() {
215+
return remotesCount;
216+
}
217+
218+
public String getClient() {
219+
return client;
220+
}
221+
222+
public long getTook() {
223+
return took;
224+
}
225+
226+
public Set<String> getSkippedRemotes() {
227+
return skippedRemotes;
228+
}
229+
230+
public static class PerClusterUsage {
231+
232+
// if MRT=true, the took time on the remote cluster (if MRT=true), otherwise the overall took time
233+
private long took;
234+
235+
public PerClusterUsage(TimeValue took) {
236+
if (took != null) {
237+
this.took = took.millis();
238+
}
239+
}
240+
241+
public long getTook() {
242+
return took;
243+
}
244+
}
245+
246+
}

0 commit comments

Comments
 (0)