Skip to content

Commit 60935e8

Browse files
authored
ESQL: Implement a MetricsAware interface (#121074)
* ESQL: Implement a MetricsAware interface (#120527) This implements an interface that export the names of the plan nodes and functions that need to be counted in the metrics. Also, the metrics are now counted from within the parser. This should allow correct accounting for the cases where some nodes can appear both standalone or part other nodes' children (like Aggregate being a child of INLINESTATS, so no STATS counting should occur). The functions counting now also validates that behind a name there is actually a function registered. Closes #115992. (cherry picked from commit a4482d4) * Drop the HashSet gating when counting commands The telemetry accounting is no longer done in just one place in the parser, but split, so that no HashSet is required to discard duplicate accounting of the same node. This lowers the memory requirements.
1 parent 15b93fe commit 60935e8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+332
-266
lines changed

docs/changelog/121074.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121074
2+
summary: Implement a `MetricsAware` interface
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@
7474
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
7575
import org.elasticsearch.xpack.esql.session.Configuration;
7676
import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
77-
import org.elasticsearch.xpack.esql.stats.Metrics;
7877
import org.elasticsearch.xpack.esql.stats.SearchStats;
78+
import org.elasticsearch.xpack.esql.telemetry.Metrics;
7979
import org.elasticsearch.xpack.versionfield.Version;
8080
import org.junit.Assert;
8181

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TelemetryIT.java

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.elasticsearch.plugins.PluginsService;
2121
import org.elasticsearch.telemetry.Measurement;
2222
import org.elasticsearch.telemetry.TestTelemetryPlugin;
23-
import org.elasticsearch.xpack.esql.stats.PlanningMetricsManager;
23+
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetryManager;
2424
import org.junit.Before;
2525

2626
import java.util.Collection;
@@ -113,6 +113,41 @@ public static Iterable<Object[]> parameters() {
113113
Map.ofEntries(Map.entry("TO_IP", 1), Map.entry("TO_STRING", 2)),
114114
true
115115
) },
116+
new Object[] {
117+
new Test(
118+
// Using the `::` cast operator and a function alias
119+
"""
120+
ROW host = "1.1.1.1"
121+
| EVAL ip = host::ip::string, y = to_str(host)
122+
""",
123+
Map.ofEntries(Map.entry("ROW", 1), Map.entry("EVAL", 1)),
124+
Map.ofEntries(Map.entry("TO_IP", 1), Map.entry("TO_STRING", 2)),
125+
true
126+
) },
127+
new Object[] {
128+
new Test(
129+
// Using the `::` cast operator and a function alias
130+
"""
131+
FROM idx
132+
| EVAL ip = host::ip::string, y = to_str(host)
133+
""",
134+
Map.ofEntries(Map.entry("FROM", 1), Map.entry("EVAL", 1)),
135+
Map.ofEntries(Map.entry("TO_IP", 1), Map.entry("TO_STRING", 2)),
136+
true
137+
) },
138+
new Object[] {
139+
new Test(
140+
"""
141+
FROM idx
142+
| EVAL y = to_str(host)
143+
| LOOKUP JOIN lookup_idx ON host
144+
""",
145+
Build.current().isSnapshot()
146+
? Map.ofEntries(Map.entry("FROM", 1), Map.entry("EVAL", 1), Map.entry("LOOKUP JOIN", 1))
147+
: Collections.emptyMap(),
148+
Build.current().isSnapshot() ? Map.ofEntries(Map.entry("TO_STRING", 1)) : Collections.emptyMap(),
149+
Build.current().isSnapshot()
150+
) },
116151
new Object[] {
117152
new Test(
118153
"METRICS idx | LIMIT 10",
@@ -123,9 +158,7 @@ public static Iterable<Object[]> parameters() {
123158
new Object[] {
124159
new Test(
125160
"METRICS idx max(id) BY host | LIMIT 10",
126-
Build.current().isSnapshot()
127-
? Map.ofEntries(Map.entry("METRICS", 1), Map.entry("LIMIT", 1), Map.entry("FROM TS", 1))
128-
: Collections.emptyMap(),
161+
Build.current().isSnapshot() ? Map.ofEntries(Map.entry("METRICS", 1), Map.entry("LIMIT", 1)) : Collections.emptyMap(),
129162
Build.current().isSnapshot() ? Map.ofEntries(Map.entry("MAX", 1)) : Collections.emptyMap(),
130163
Build.current().isSnapshot()
131164
) }
@@ -138,7 +171,7 @@ public static Iterable<Object[]> parameters() {
138171
// | EVAL ip = to_ip(host), x = to_string(host), y = to_string(host)
139172
// | INLINESTATS max(id)
140173
// """,
141-
// Build.current().isSnapshot() ? Map.of("FROM", 1, "EVAL", 1, "INLINESTATS", 1, "STATS", 1) : Collections.emptyMap(),
174+
// Build.current().isSnapshot() ? Map.of("FROM", 1, "EVAL", 1, "INLINESTATS", 1) : Collections.emptyMap(),
142175
// Build.current().isSnapshot()
143176
// ? Map.ofEntries(Map.entry("MAX", 1), Map.entry("TO_IP", 1), Map.entry("TO_STRING", 2))
144177
// : Collections.emptyMap(),
@@ -186,19 +219,19 @@ private static void testQuery(
186219
client(dataNode.getName()).execute(EsqlQueryAction.INSTANCE, request, ActionListener.running(() -> {
187220
try {
188221
// test total commands used
189-
final List<Measurement> commandMeasurementsAll = measurements(plugin, PlanningMetricsManager.FEATURE_METRICS_ALL);
222+
final List<Measurement> commandMeasurementsAll = measurements(plugin, PlanTelemetryManager.FEATURE_METRICS_ALL);
190223
assertAllUsages(expectedCommands, commandMeasurementsAll, iteration, success);
191224

192225
// test num of queries using a command
193-
final List<Measurement> commandMeasurements = measurements(plugin, PlanningMetricsManager.FEATURE_METRICS);
226+
final List<Measurement> commandMeasurements = measurements(plugin, PlanTelemetryManager.FEATURE_METRICS);
194227
assertUsageInQuery(expectedCommands, commandMeasurements, iteration, success);
195228

196229
// test total functions used
197-
final List<Measurement> functionMeasurementsAll = measurements(plugin, PlanningMetricsManager.FUNCTION_METRICS_ALL);
230+
final List<Measurement> functionMeasurementsAll = measurements(plugin, PlanTelemetryManager.FUNCTION_METRICS_ALL);
198231
assertAllUsages(expectedFunctions, functionMeasurementsAll, iteration, success);
199232

200233
// test number of queries using a function
201-
final List<Measurement> functionMeasurements = measurements(plugin, PlanningMetricsManager.FUNCTION_METRICS);
234+
final List<Measurement> functionMeasurements = measurements(plugin, PlanTelemetryManager.FUNCTION_METRICS);
202235
assertUsageInQuery(expectedFunctions, functionMeasurements, iteration, success);
203236
} finally {
204237
latch.countDown();
@@ -216,8 +249,8 @@ private static void assertAllUsages(Map<String, Integer> expected, List<Measurem
216249
Set<String> found = featureNames(metrics);
217250
assertThat(found, is(expected.keySet()));
218251
for (Measurement metric : metrics) {
219-
assertThat(metric.attributes().get(PlanningMetricsManager.SUCCESS), is(success));
220-
String featureName = (String) metric.attributes().get(PlanningMetricsManager.FEATURE_NAME);
252+
assertThat(metric.attributes().get(PlanTelemetryManager.SUCCESS), is(success));
253+
String featureName = (String) metric.attributes().get(PlanTelemetryManager.FEATURE_NAME);
221254
assertThat(metric.getLong(), is(iteration * expected.get(featureName)));
222255
}
223256
}
@@ -227,7 +260,7 @@ private static void assertUsageInQuery(Map<String, Integer> expected, List<Measu
227260
functionsFound = featureNames(found);
228261
assertThat(functionsFound, is(expected.keySet()));
229262
for (Measurement measurement : found) {
230-
assertThat(measurement.attributes().get(PlanningMetricsManager.SUCCESS), is(success));
263+
assertThat(measurement.attributes().get(PlanTelemetryManager.SUCCESS), is(success));
231264
assertThat(measurement.getLong(), is(iteration));
232265
}
233266
}
@@ -238,7 +271,7 @@ private static List<Measurement> measurements(TestTelemetryPlugin plugin, String
238271

239272
private static Set<String> featureNames(List<Measurement> functionMeasurements) {
240273
return functionMeasurements.stream()
241-
.map(x -> x.attributes().get(PlanningMetricsManager.FEATURE_NAME))
274+
.map(x -> x.attributes().get(PlanTelemetryManager.FEATURE_NAME))
242275
.map(String.class::cast)
243276
.collect(Collectors.toSet());
244277
}
@@ -268,6 +301,19 @@ private static void loadData(String nodeName) {
268301
}
269302

270303
client().admin().indices().prepareRefresh("idx").get();
304+
305+
assertAcked(
306+
client().admin()
307+
.indices()
308+
.prepareCreate("lookup_idx")
309+
.setSettings(
310+
Settings.builder()
311+
.put("index.routing.allocation.require._name", nodeName)
312+
.put("index.mode", "lookup")
313+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
314+
)
315+
.setMapping("ip", "type=ip", "host", "type=keyword")
316+
);
271317
}
272318

273319
private DiscoveryNode randomDataNode() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
import org.elasticsearch.xpack.esql.rule.Rule;
9393
import org.elasticsearch.xpack.esql.rule.RuleExecutor;
9494
import org.elasticsearch.xpack.esql.session.Configuration;
95-
import org.elasticsearch.xpack.esql.stats.FeatureMetric;
95+
import org.elasticsearch.xpack.esql.telemetry.FeatureMetric;
9696
import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter;
9797

9898
import java.time.Duration;
@@ -133,7 +133,7 @@
133133
import static org.elasticsearch.xpack.esql.core.type.DataType.TIME_DURATION;
134134
import static org.elasticsearch.xpack.esql.core.type.DataType.VERSION;
135135
import static org.elasticsearch.xpack.esql.core.type.DataType.isTemporalAmount;
136-
import static org.elasticsearch.xpack.esql.stats.FeatureMetric.LIMIT;
136+
import static org.elasticsearch.xpack.esql.telemetry.FeatureMetric.LIMIT;
137137
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.maybeParseTemporalAmount;
138138

139139
/**
@@ -220,7 +220,7 @@ private LogicalPlan resolveIndex(UnresolvedRelation plan, IndexResolution indexR
220220
plan.metadataFields(),
221221
plan.indexMode(),
222222
indexResolutionMessage,
223-
plan.commandName()
223+
plan.telemetryLabel()
224224
);
225225
}
226226
IndexPattern table = plan.indexPattern();
@@ -233,7 +233,7 @@ private LogicalPlan resolveIndex(UnresolvedRelation plan, IndexResolution indexR
233233
plan.metadataFields(),
234234
plan.indexMode(),
235235
"invalid [" + table + "] resolution to [" + indexResolution + "]",
236-
plan.commandName()
236+
plan.telemetryLabel()
237237
);
238238
}
239239

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
3333
import org.elasticsearch.xpack.esql.plan.logical.Lookup;
3434
import org.elasticsearch.xpack.esql.plan.logical.Project;
35-
import org.elasticsearch.xpack.esql.stats.FeatureMetric;
36-
import org.elasticsearch.xpack.esql.stats.Metrics;
35+
import org.elasticsearch.xpack.esql.telemetry.FeatureMetric;
36+
import org.elasticsearch.xpack.esql.telemetry.Metrics;
3737

3838
import java.util.ArrayList;
3939
import java.util.BitSet;
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.capabilities;
9+
10+
import java.util.Locale;
11+
12+
/**
13+
* Interface for plan nodes that need to be accounted in the statistics
14+
*/
15+
public interface TelemetryAware {
16+
17+
/**
18+
* @return the label reported in the telemetry data. Only needs to be overwritten if the label doesn't match the class name.
19+
*/
20+
default String telemetryLabel() {
21+
return getClass().getSimpleName().toUpperCase(Locale.ROOT);
22+
}
23+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
import org.elasticsearch.xpack.esql.session.IndexResolver;
2727
import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
2828
import org.elasticsearch.xpack.esql.session.Result;
29-
import org.elasticsearch.xpack.esql.stats.Metrics;
30-
import org.elasticsearch.xpack.esql.stats.PlanningMetrics;
31-
import org.elasticsearch.xpack.esql.stats.PlanningMetricsManager;
32-
import org.elasticsearch.xpack.esql.stats.QueryMetric;
29+
import org.elasticsearch.xpack.esql.telemetry.Metrics;
30+
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
31+
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetryManager;
32+
import org.elasticsearch.xpack.esql.telemetry.QueryMetric;
3333

3434
import static org.elasticsearch.action.ActionListener.wrap;
3535

@@ -41,7 +41,7 @@ public class PlanExecutor {
4141
private final Mapper mapper;
4242
private final Metrics metrics;
4343
private final Verifier verifier;
44-
private final PlanningMetricsManager planningMetricsManager;
44+
private final PlanTelemetryManager planTelemetryManager;
4545

4646
public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry, XPackLicenseState licenseState) {
4747
this.indexResolver = indexResolver;
@@ -50,7 +50,7 @@ public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry, XP
5050
this.mapper = new Mapper();
5151
this.metrics = new Metrics(functionRegistry);
5252
this.verifier = new Verifier(metrics, licenseState);
53-
this.planningMetricsManager = new PlanningMetricsManager(meterRegistry);
53+
this.planTelemetryManager = new PlanTelemetryManager(meterRegistry);
5454
}
5555

5656
public void esql(
@@ -65,7 +65,7 @@ public void esql(
6565
QueryBuilderResolver queryBuilderResolver,
6666
ActionListener<Result> listener
6767
) {
68-
final PlanningMetrics planningMetrics = new PlanningMetrics();
68+
final PlanTelemetry planTelemetry = new PlanTelemetry(functionRegistry);
6969
final var session = new EsqlSession(
7070
sessionId,
7171
cfg,
@@ -76,20 +76,20 @@ public void esql(
7676
new LogicalPlanOptimizer(new LogicalOptimizerContext(cfg, foldContext)),
7777
mapper,
7878
verifier,
79-
planningMetrics,
79+
planTelemetry,
8080
indicesExpressionGrouper,
8181
queryBuilderResolver
8282
);
8383
QueryMetric clientId = QueryMetric.fromString("rest");
8484
metrics.total(clientId);
8585

8686
ActionListener<Result> executeListener = wrap(x -> {
87-
planningMetricsManager.publish(planningMetrics, true);
87+
planTelemetryManager.publish(planTelemetry, true);
8888
listener.onResponse(x);
8989
}, ex -> {
9090
// TODO when we decide if we will differentiate Kibana from REST, this String value will likely come from the request
9191
metrics.failed(clientId);
92-
planningMetricsManager.publish(planningMetrics, false);
92+
planTelemetryManager.publish(planTelemetry, false);
9393
listener.onFailure(ex);
9494
});
9595
// Wrap it in a listener so that if we have any exceptions during execution, the listener picks it up

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ public class EsqlFunctionRegistry {
224224
// it has with the alias name associated to the FunctionDefinition instance
225225
private final Map<String, FunctionDefinition> defs = new LinkedHashMap<>();
226226
private final Map<String, String> aliases = new HashMap<>();
227+
private final Map<Class<? extends Function>, String> names = new HashMap<>();
227228

228229
private SnapshotFunctionRegistry snapshotRegistry = null;
229230

@@ -258,6 +259,12 @@ public boolean functionExists(String functionName) {
258259
return defs.containsKey(functionName);
259260
}
260261

262+
public String functionName(Class<? extends Function> clazz) {
263+
String name = names.get(clazz);
264+
Check.notNull(name, "Cannot find function by class {}", clazz);
265+
return name;
266+
}
267+
261268
public Collection<FunctionDefinition> listFunctions() {
262269
// It is worth double checking if we need this copy. These are immutable anyway.
263270
return defs.values();
@@ -758,6 +765,14 @@ void register(FunctionDefinition... functions) {
758765
}
759766
aliases.put(alias, f.name());
760767
}
768+
Check.isTrue(
769+
names.containsKey(f.clazz()) == false,
770+
"function type [{}} is registered twice with names [{}] and [{}]",
771+
f.clazz(),
772+
names.get(f.clazz()),
773+
f.name()
774+
);
775+
names.put(f.clazz(), f.name());
761776
}
762777
// sort the temporary map by key name and add it to the global map of functions
763778
defs.putAll(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/AstBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
package org.elasticsearch.xpack.esql.parser;
99

1010
public class AstBuilder extends LogicalPlanBuilder {
11-
public AstBuilder(QueryParams params) {
12-
super(params);
11+
public AstBuilder(ParsingContext context) {
12+
super(context);
1313
}
1414
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/EsqlParser.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import org.elasticsearch.logging.LogManager;
1919
import org.elasticsearch.logging.Logger;
2020
import org.elasticsearch.xpack.esql.core.util.StringUtils;
21+
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
2122
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
23+
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
2224

2325
import java.util.BitSet;
2426
import java.util.function.BiFunction;
@@ -52,20 +54,27 @@ public void setEsqlConfig(EsqlConfig config) {
5254
this.config = config;
5355
}
5456

57+
// testing utility
5558
public LogicalPlan createStatement(String query) {
5659
return createStatement(query, new QueryParams());
5760
}
5861

62+
// testing utility
5963
public LogicalPlan createStatement(String query, QueryParams params) {
64+
return createStatement(query, params, new PlanTelemetry(new EsqlFunctionRegistry()));
65+
}
66+
67+
public LogicalPlan createStatement(String query, QueryParams params, PlanTelemetry metrics) {
6068
if (log.isDebugEnabled()) {
6169
log.debug("Parsing as statement: {}", query);
6270
}
63-
return invokeParser(query, params, EsqlBaseParser::singleStatement, AstBuilder::plan);
71+
return invokeParser(query, params, metrics, EsqlBaseParser::singleStatement, AstBuilder::plan);
6472
}
6573

6674
private <T> T invokeParser(
6775
String query,
6876
QueryParams params,
77+
PlanTelemetry metrics,
6978
Function<EsqlBaseParser, ParserRuleContext> parseFunction,
7079
BiFunction<AstBuilder, ParserRuleContext, T> result
7180
) {
@@ -99,7 +108,7 @@ private <T> T invokeParser(
99108
log.trace("Parse tree: {}", tree.toStringTree());
100109
}
101110

102-
return result.apply(new AstBuilder(params), tree);
111+
return result.apply(new AstBuilder(new ExpressionBuilder.ParsingContext(params, metrics)), tree);
103112
} catch (StackOverflowError e) {
104113
throw new ParsingException("ESQL statement is too large, causing stack overflow when generating the parsing tree: [{}]", query);
105114
}

0 commit comments

Comments
 (0)