-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ES|QL query log #124094
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ES|QL query log #124094
Changes from 2 commits
dca56b5
e725e07
4746cd4
d4ed58f
8957918
6e0b9c5
4d25aaa
45764f4
b2ae5ef
2eb589a
cbfcf2c
51820c9
d2bd672
b74ea91
30d07d4
4e1838b
af3366f
95c76d7
bbfa89b
bbc11c5
e390232
694ff2e
cec182e
44ea6fc
df788db
a970fc5
e4202af
01d4c66
fd85f06
03dbacd
6cdb1b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.esql.action; | ||
|
||
import org.apache.logging.log4j.Level; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.common.logging.ESLogMessage; | ||
import org.elasticsearch.common.logging.Loggers; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; | ||
import org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog; | ||
import org.junit.AfterClass; | ||
import org.junit.BeforeClass; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.greaterThan; | ||
import static org.hamcrest.Matchers.greaterThanOrEqualTo; | ||
import static org.hamcrest.Matchers.is; | ||
|
||
public class EsqlSlowLogIT extends AbstractEsqlIntegTestCase { | ||
static MockAppender appender; | ||
static Logger queryLog = LogManager.getLogger(EsqlSlowLog.SLOWLOG_PREFIX + ".query"); | ||
static Level origQueryLogLevel = queryLog.getLevel(); | ||
|
||
@BeforeClass | ||
public static void init() throws IllegalAccessException { | ||
appender = new MockAppender("trace_appender"); | ||
appender.start(); | ||
Loggers.addAppender(queryLog, appender); | ||
|
||
Loggers.setLevel(queryLog, Level.TRACE); | ||
} | ||
|
||
@AfterClass | ||
public static void cleanup() { | ||
Loggers.removeAppender(queryLog, appender); | ||
appender.stop(); | ||
Loggers.setLevel(queryLog, origQueryLogLevel); | ||
} | ||
|
||
public void testSetLevel() throws Exception { | ||
final String node1, node2; | ||
if (randomBoolean()) { | ||
internalCluster().ensureAtLeastNumDataNodes(2); | ||
node1 = randomDataNode().getName(); | ||
node2 = randomValueOtherThan(node1, () -> randomDataNode().getName()); | ||
} else { | ||
node1 = randomDataNode().getName(); | ||
node2 = randomDataNode().getName(); | ||
} | ||
|
||
int numDocs1 = randomIntBetween(1, 15); | ||
assertAcked(client().admin().indices().prepareCreate("index-1").setMapping("host", "type=keyword")); | ||
for (int i = 0; i < numDocs1; i++) { | ||
client().prepareIndex("index-1").setSource("host", "192." + i).get(); | ||
} | ||
int numDocs2 = randomIntBetween(1, 15); | ||
assertAcked(client().admin().indices().prepareCreate("index-2").setMapping("host", "type=keyword")); | ||
for (int i = 0; i < numDocs2; i++) { | ||
client().prepareIndex("index-2").setSource("host", "10." + i).get(); | ||
} | ||
|
||
DiscoveryNode coordinator = randomFrom(clusterService().state().nodes().stream().toList()); | ||
client().admin().indices().prepareRefresh("index-1", "index-2").get(); | ||
|
||
Map<Level, String> levels = Map.of( | ||
Level.WARN, | ||
EsqlPlugin.ESQL_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING.getKey(), | ||
Level.INFO, | ||
EsqlPlugin.ESQL_SLOWLOG_THRESHOLD_QUERY_INFO_SETTING.getKey(), | ||
Level.DEBUG, | ||
EsqlPlugin.ESQL_SLOWLOG_THRESHOLD_QUERY_DEBUG_SETTING.getKey(), | ||
Level.TRACE, | ||
EsqlPlugin.ESQL_SLOWLOG_THRESHOLD_QUERY_TRACE_SETTING.getKey() | ||
); | ||
testAllLevels(levels, coordinator, 0); | ||
for (int i = 0; i < 10; i++) { | ||
testAllLevels(levels, coordinator, randomIntBetween(0, 10_000)); | ||
} | ||
} | ||
|
||
private static void testAllLevels(Map<Level, String> levels, DiscoveryNode coordinator, long timeoutMillis) throws InterruptedException, | ||
ExecutionException { | ||
for (Map.Entry<Level, String> logLevel : levels.entrySet()) { | ||
|
||
client().execute( | ||
ClusterUpdateSettingsAction.INSTANCE, | ||
new ClusterUpdateSettingsRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS).persistentSettings( | ||
Settings.builder().put(logLevel.getValue(), timeoutMillis + "ms") | ||
) | ||
).get(); | ||
|
||
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); | ||
var querySource = "FROM index-* | EVAL ip = to_ip(host) | STATS s = COUNT(*) by ip | KEEP ip | LIMIT 100"; | ||
request.query(querySource); | ||
request.pragmas(randomPragmas()); | ||
CountDownLatch latch = new CountDownLatch(1); | ||
client(coordinator.getName()).execute(EsqlQueryAction.INSTANCE, request, ActionListener.running(() -> { | ||
try { | ||
if (appender.lastEvent() == null) { | ||
if (timeoutMillis == 0) { | ||
fail("Expected a slow log with timeout set to zero"); | ||
} | ||
return; | ||
} | ||
var msg = (ESLogMessage) appender.lastMessage(); | ||
long took = Long.valueOf(msg.get("elasticsearch.slowlog.took")); | ||
long tookMillisExpected = took / 1_000_000; | ||
long tookMillis = Long.valueOf(msg.get("elasticsearch.slowlog.took_millis")); | ||
assertThat(took, greaterThan(0L)); | ||
assertThat(tookMillis, greaterThanOrEqualTo(timeoutMillis)); | ||
assertThat(tookMillis, is(tookMillisExpected)); | ||
|
||
long planningTook = Long.valueOf(msg.get("elasticsearch.slowlog.planning.took")); | ||
long planningTookMillisExpected = planningTook / 1_000_000; | ||
long planningTookMillis = Long.valueOf(msg.get("elasticsearch.slowlog.planning.took_millis")); | ||
assertThat(planningTook, greaterThanOrEqualTo(0L)); | ||
assertThat(planningTookMillis, greaterThanOrEqualTo(timeoutMillis)); | ||
assertThat(planningTookMillis, is(planningTookMillisExpected)); | ||
|
||
assertThat(took, greaterThan(planningTook)); | ||
|
||
assertThat(msg.get("elasticsearch.slowlog.source"), is(querySource)); | ||
assertThat(appender.getLastEventAndReset().getLevel(), equalTo(logLevel.getKey())); | ||
} finally { | ||
latch.countDown(); | ||
} | ||
})); | ||
latch.await(30, TimeUnit.SECONDS); | ||
|
||
|
||
client().execute( | ||
ClusterUpdateSettingsAction.INSTANCE, | ||
new ClusterUpdateSettingsRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS).persistentSettings( | ||
Settings.builder().putNull(logLevel.getValue()) | ||
) | ||
).get(); | ||
} | ||
} | ||
|
||
private DiscoveryNode randomDataNode() { | ||
return randomFrom(clusterService().state().nodes().getDataNodes().values()); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.esql.action; | ||
|
||
import org.apache.logging.log4j.core.LogEvent; | ||
import org.apache.logging.log4j.core.appender.AbstractAppender; | ||
import org.apache.logging.log4j.core.filter.RegexFilter; | ||
import org.apache.logging.log4j.message.Message; | ||
|
||
public class MockAppender extends AbstractAppender { | ||
public LogEvent lastEvent; | ||
|
||
public MockAppender(final String name) throws IllegalAccessException { | ||
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false); | ||
} | ||
|
||
@Override | ||
public void append(LogEvent event) { | ||
lastEvent = event.toImmutable(); | ||
} | ||
|
||
Message lastMessage() { | ||
return lastEvent.getMessage(); | ||
} | ||
|
||
LogEvent lastEvent() { | ||
return lastEvent; | ||
} | ||
|
||
public LogEvent getLastEventAndReset() { | ||
LogEvent toReturn = lastEvent; | ||
lastEvent = null; | ||
return toReturn; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
import org.elasticsearch.xpack.esql.session.EsqlSession; | ||
import org.elasticsearch.xpack.esql.session.IndexResolver; | ||
import org.elasticsearch.xpack.esql.session.Result; | ||
import org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog; | ||
import org.elasticsearch.xpack.esql.telemetry.Metrics; | ||
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry; | ||
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetryManager; | ||
|
@@ -42,15 +43,17 @@ public class PlanExecutor { | |
private final Metrics metrics; | ||
private final Verifier verifier; | ||
private final PlanTelemetryManager planTelemetryManager; | ||
private final EsqlSlowLog slowLog; | ||
|
||
public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry, XPackLicenseState licenseState) { | ||
public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry, XPackLicenseState licenseState, EsqlSlowLog slowLog) { | ||
this.indexResolver = indexResolver; | ||
this.preAnalyzer = new PreAnalyzer(); | ||
this.functionRegistry = new EsqlFunctionRegistry(); | ||
this.mapper = new Mapper(); | ||
this.metrics = new Metrics(functionRegistry); | ||
this.verifier = new Verifier(metrics, licenseState); | ||
this.planTelemetryManager = new PlanTelemetryManager(meterRegistry); | ||
this.slowLog = slowLog; | ||
} | ||
|
||
public void esql( | ||
|
@@ -85,6 +88,7 @@ public void esql( | |
|
||
ActionListener<Result> executeListener = wrap(x -> { | ||
planTelemetryManager.publish(planTelemetry, true); | ||
slowLog.onQueryPhase(x, request.query()); | ||
|
||
listener.onResponse(x); | ||
}, ex -> { | ||
// TODO when we decide if we will differentiate Kibana from REST, this String value will likely come from the request | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ | |
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; | ||
import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator; | ||
import org.elasticsearch.compute.operator.topn.TopNOperatorStatus; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.features.NodeFeature; | ||
import org.elasticsearch.license.XPackLicenseState; | ||
import org.elasticsearch.plugins.ActionPlugin; | ||
|
@@ -70,6 +71,7 @@ | |
import org.elasticsearch.xpack.esql.plan.PlanWritables; | ||
import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery; | ||
import org.elasticsearch.xpack.esql.session.IndexResolver; | ||
import org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog; | ||
|
||
import java.lang.invoke.MethodHandles; | ||
import java.util.ArrayList; | ||
|
@@ -102,6 +104,42 @@ public class EsqlPlugin extends Plugin implements ActionPlugin { | |
Setting.Property.Dynamic | ||
); | ||
|
||
public static final Setting<TimeValue> ESQL_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING = Setting.timeSetting( | ||
"esql.slowlog.threshold.query.warn", | ||
TimeValue.timeValueNanos(-1), | ||
TimeValue.timeValueMillis(-1), | ||
|
||
TimeValue.timeValueMillis(Integer.MAX_VALUE), | ||
Setting.Property.NodeScope, | ||
Setting.Property.Dynamic | ||
); | ||
|
||
public static final Setting<TimeValue> ESQL_SLOWLOG_THRESHOLD_QUERY_INFO_SETTING = Setting.timeSetting( | ||
"esql.slowlog.threshold.query.info", | ||
TimeValue.timeValueNanos(-1), | ||
TimeValue.timeValueMillis(-1), | ||
TimeValue.timeValueMillis(Integer.MAX_VALUE), | ||
Setting.Property.NodeScope, | ||
Setting.Property.Dynamic | ||
); | ||
|
||
public static final Setting<TimeValue> ESQL_SLOWLOG_THRESHOLD_QUERY_DEBUG_SETTING = Setting.timeSetting( | ||
"esql.slowlog.threshold.query.debug", | ||
TimeValue.timeValueNanos(-1), | ||
TimeValue.timeValueMillis(-1), | ||
TimeValue.timeValueMillis(Integer.MAX_VALUE), | ||
Setting.Property.NodeScope, | ||
Setting.Property.Dynamic | ||
); | ||
|
||
public static final Setting<TimeValue> ESQL_SLOWLOG_THRESHOLD_QUERY_TRACE_SETTING = Setting.timeSetting( | ||
"esql.slowlog.threshold.query.trace", | ||
TimeValue.timeValueNanos(-1), | ||
TimeValue.timeValueMillis(-1), | ||
TimeValue.timeValueMillis(Integer.MAX_VALUE), | ||
Setting.Property.NodeScope, | ||
Setting.Property.Dynamic | ||
); | ||
|
||
@Override | ||
public Collection<?> createComponents(PluginServices services) { | ||
CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request"); | ||
|
@@ -115,7 +153,12 @@ public Collection<?> createComponents(PluginServices services) { | |
var blockFactoryProvider = blockFactoryProvider(circuitBreaker, bigArrays, maxPrimitiveArrayBlockSize); | ||
setupSharedSecrets(); | ||
return List.of( | ||
new PlanExecutor(new IndexResolver(services.client()), services.telemetryProvider().getMeterRegistry(), getLicenseState()), | ||
new PlanExecutor( | ||
new IndexResolver(services.client()), | ||
services.telemetryProvider().getMeterRegistry(), | ||
getLicenseState(), | ||
new EsqlSlowLog(services.clusterService().getClusterSettings()) | ||
), | ||
new ExchangeService( | ||
services.clusterService().getSettings(), | ||
services.threadPool(), | ||
|
@@ -151,7 +194,14 @@ protected XPackLicenseState getLicenseState() { | |
*/ | ||
@Override | ||
public List<Setting<?>> getSettings() { | ||
return List.of(QUERY_RESULT_TRUNCATION_DEFAULT_SIZE, QUERY_RESULT_TRUNCATION_MAX_SIZE); | ||
return List.of( | ||
QUERY_RESULT_TRUNCATION_DEFAULT_SIZE, | ||
QUERY_RESULT_TRUNCATION_MAX_SIZE, | ||
ESQL_SLOWLOG_THRESHOLD_QUERY_TRACE_SETTING, | ||
ESQL_SLOWLOG_THRESHOLD_QUERY_DEBUG_SETTING, | ||
ESQL_SLOWLOG_THRESHOLD_QUERY_INFO_SETTING, | ||
ESQL_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING | ||
); | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -269,8 +269,8 @@ static boolean concreteIndexRequested(String indexExpression) { | |
// visible for testing | ||
static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { | ||
// TODO: this logic assumes a single phase execution model, so it may need to altered once INLINESTATS is made CCS compatible | ||
execInfo.markEndPlanning(); | ||
if (execInfo.isCrossClusterSearch()) { | ||
execInfo.markEndPlanning(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this a bug? Was it working when not CCS? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really. |
||
for (String clusterAlias : execInfo.clusterAliases()) { | ||
EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias); | ||
if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems unused