Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
dca56b5
ES|QL slow log
luigidellaquila Mar 5, 2025
e725e07
[CI] Auto commit changes from spotless
Mar 5, 2025
4746cd4
Merge branch 'main' into esql/slowlog
luigidellaquila Mar 7, 2025
d4ed58f
First round of reviews
luigidellaquila Mar 7, 2025
8957918
Fix test
luigidellaquila Mar 7, 2025
6e0b9c5
Add slowlog for failing queries
luigidellaquila Mar 7, 2025
4d25aaa
Cleanup
luigidellaquila Mar 7, 2025
45764f4
Cleanup
luigidellaquila Mar 7, 2025
b2ae5ef
Update docs/changelog/124094.yaml
luigidellaquila Mar 7, 2025
2eb589a
Merge branch 'main' into esql/slowlog
luigidellaquila Mar 10, 2025
cbfcf2c
Add unit tests
luigidellaquila Mar 10, 2025
51820c9
Merge remote-tracking branch 'luigidellaquila/esql/slowlog' into esql…
luigidellaquila Mar 10, 2025
d2bd672
Add user name to slow log; some refactoring
luigidellaquila Mar 10, 2025
b74ea91
Implement review suggestions
luigidellaquila Mar 11, 2025
30d07d4
Merge branch 'main' into esql/slowlog
luigidellaquila Mar 11, 2025
4e1838b
Merge branch 'main' into esql/slowlog
luigidellaquila Mar 11, 2025
af3366f
Refactor to use SlowLogFieldProviders
luigidellaquila Mar 14, 2025
95c76d7
Merge remote-tracking branch 'luigidellaquila/esql/slowlog' into esql…
luigidellaquila Mar 14, 2025
bbfa89b
[CI] Auto commit changes from spotless
Mar 14, 2025
bbc11c5
Merge branch 'main' into esql/slowlog
luigidellaquila Mar 14, 2025
e390232
Merge log field providers
luigidellaquila Mar 14, 2025
694ff2e
Merge remote-tracking branch 'luigidellaquila/esql/slowlog' into esql…
luigidellaquila Mar 14, 2025
cec182e
[CI] Auto commit changes from spotless
Mar 14, 2025
44ea6fc
Merge branch 'main' into esql/slowlog
luigidellaquila Mar 14, 2025
df788db
Fix tests
luigidellaquila Mar 14, 2025
a970fc5
Merge remote-tracking branch 'luigidellaquila/esql/slowlog' into esql…
luigidellaquila Mar 14, 2025
e4202af
Merge branch 'main' into esql/slowlog
luigidellaquila Mar 17, 2025
01d4c66
Rename as QueryLog
luigidellaquila Mar 17, 2025
fd85f06
Simplify cluster settings
luigidellaquila Mar 17, 2025
03dbacd
fix log4j config
luigidellaquila Mar 18, 2025
6cdb1b3
Merge branch 'main' into esql/slowlog
luigidellaquila Mar 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions distribution/src/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,26 @@ logger.index_indexing_slowlog.name = index.indexing.slowlog.index
logger.index_indexing_slowlog.level = trace
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling
logger.index_indexing_slowlog.additivity = false


######## Search ES|QL JSON ####################
appender.esql_slowlog_rolling.type = RollingFile
appender.esql_slowlog_rolling.name = esql_slowlog_rolling
appender.esql_slowlog_rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs\
.cluster_name}_esql_slowlog.json
appender.esql_slowlog_rolling.layout.type = ECSJsonLayout
appender.esql_slowlog_rolling.layout.dataset = elasticsearch.esql_slowlog

appender.esql_slowlog_rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs\
.cluster_name}_esql_slowlog-%i.json.gz
appender.esql_slowlog_rolling.policies.type = Policies
appender.esql_slowlog_rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.esql_slowlog_rolling.policies.size.size = 1GB
appender.esql_slowlog_rolling.strategy.type = DefaultRolloverStrategy
appender.esql_slowlog_rolling.strategy.max = 4
#################################################

logger.esql_slowlog_rolling.name = esql.slowlog
logger.esql_slowlog_rolling.level = trace
logger.esql_slowlog_rolling.appenderRef.index_search_slowlog_rolling.ref = esql_slowlog_rolling
logger.esql_slowlog_rolling.additivity = false
5 changes: 5 additions & 0 deletions docs/changelog/124094.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 124094
summary: ES|QL slow log
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public enum LogType {
AUDIT("%s_audit.json"),
SEARCH_SLOW("%s_index_search_slowlog.json"),
INDEXING_SLOW("%s_index_indexing_slowlog.json"),
ESQL_SLOW("%s_esql_slowlog.json"),
DEPRECATION("%s_deprecation.json");

private final String filenameFormat;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql;

import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.filter.RegexFilter;
import org.apache.logging.log4j.message.Message;

public class MockAppender extends AbstractAppender {
public LogEvent lastEvent;

public MockAppender(final String name) throws IllegalAccessException {
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false);
}

@Override
public void append(LogEvent event) {
lastEvent = event.toImmutable();
}

public Message lastMessage() {
return lastEvent.getMessage();
}

public LogEvent lastEvent() {
return lastEvent;
}

public LogEvent getLastEventAndReset() {
LogEvent toReturn = lastEvent;
lastEvent = null;
return toReturn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogMessage;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.esql.MockAppender;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

public class EsqlSlowLogIT extends AbstractEsqlIntegTestCase {
static MockAppender appender;
static Logger queryLog = LogManager.getLogger(EsqlSlowLog.SLOWLOG_PREFIX + ".query");
static Level origQueryLogLevel = queryLog.getLevel();

@BeforeClass
public static void init() throws IllegalAccessException {
appender = new MockAppender("trace_appender");
appender.start();
Loggers.addAppender(queryLog, appender);

Loggers.setLevel(queryLog, Level.TRACE);
}

@AfterClass
public static void cleanup() {
Loggers.removeAppender(queryLog, appender);
appender.stop();
Loggers.setLevel(queryLog, origQueryLogLevel);
}

public void testSetLevel() throws Exception {
int numDocs1 = randomIntBetween(1, 15);
assertAcked(client().admin().indices().prepareCreate("index-1").setMapping("host", "type=keyword"));
for (int i = 0; i < numDocs1; i++) {
client().prepareIndex("index-1").setSource("host", "192." + i).get();
}
int numDocs2 = randomIntBetween(1, 15);
assertAcked(client().admin().indices().prepareCreate("index-2").setMapping("host", "type=keyword"));
for (int i = 0; i < numDocs2; i++) {
client().prepareIndex("index-2").setSource("host", "10." + i).get();
}

DiscoveryNode coordinator = randomFrom(clusterService().state().nodes().stream().toList());
client().admin().indices().prepareRefresh("index-1", "index-2").get();

Map<Level, String> levels = Map.of(
Level.WARN,
EsqlPlugin.ESQL_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING.getKey(),
Level.INFO,
EsqlPlugin.ESQL_SLOWLOG_THRESHOLD_QUERY_INFO_SETTING.getKey(),
Level.DEBUG,
EsqlPlugin.ESQL_SLOWLOG_THRESHOLD_QUERY_DEBUG_SETTING.getKey(),
Level.TRACE,
EsqlPlugin.ESQL_SLOWLOG_THRESHOLD_QUERY_TRACE_SETTING.getKey()
);
testAllLevels(
levels,
coordinator,
0,
"FROM index-* | EVAL ip = to_ip(host) | STATS s = COUNT(*) by ip | KEEP ip | LIMIT 100",
null,
null
);
for (int i = 0; i < 10; i++) {
testAllLevels(
levels,
coordinator,
randomIntBetween(0, 500),
"FROM index-* | EVAL ip = to_ip(host) | STATS s = COUNT(*) by ip | KEEP ip | LIMIT 100",
null,
null
);
}
testAllLevels(
levels,
coordinator,
600_000,
"FROM index-* | EVAL ip = to_ip(host) | STATS s = COUNT(*) by ip | KEEP ip | LIMIT 100",
null,
null
);

testAllLevels(
levels,
coordinator,
0,
"FROM index-* | EVAL a = count(*) | LIMIT 100",
"aggregate function [count(*)] not allowed outside STATS command",
VerificationException.class.getName()
);
for (int i = 0; i < 10; i++) {
testAllLevels(
levels,
coordinator,
randomIntBetween(0, 500),
"FROM index-* | EVAL a = count(*) | LIMIT 100",
"aggregate function [count(*)] not allowed outside STATS command",
VerificationException.class.getName()
);
}
testAllLevels(
levels,
coordinator,
600_000,
"FROM index-* | EVAL a = count(*) | LIMIT 100",
"aggregate function [count(*)] not allowed outside STATS command",
VerificationException.class.getName()
);
}

private static void testAllLevels(
Map<Level, String> levels,
DiscoveryNode coordinator,
long timeoutMillis,
String query,
String expectedErrorMsg,
String expectedException
) throws InterruptedException, ExecutionException {
for (Map.Entry<Level, String> logLevel : levels.entrySet()) {

client().execute(
ClusterUpdateSettingsAction.INSTANCE,
new ClusterUpdateSettingsRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS).persistentSettings(
Settings.builder().put(logLevel.getValue(), timeoutMillis + "ms")
)
).get();

EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query(query);
request.pragmas(randomPragmas());
CountDownLatch latch = new CountDownLatch(1);
client(coordinator.getName()).execute(EsqlQueryAction.INSTANCE, request, ActionListener.running(() -> {
try {
if (appender.lastEvent() == null) {
if (timeoutMillis == 0) {
fail("Expected a slow log with timeout set to zero");
}
return;
}
var msg = (ESLogMessage) appender.lastMessage();
long took = Long.valueOf(msg.get("elasticsearch.slowlog.took"));
long tookMillisExpected = took / 1_000_000;
long tookMillis = Long.valueOf(msg.get("elasticsearch.slowlog.took_millis"));
assertThat(took, greaterThan(0L));
assertThat(tookMillis, greaterThanOrEqualTo(timeoutMillis));
assertThat(tookMillis, is(tookMillisExpected));

if (expectedException == null) {
long planningTook = Long.valueOf(msg.get("elasticsearch.slowlog.planning.took"));
long planningTookMillisExpected = planningTook / 1_000_000;
long planningTookMillis = Long.valueOf(msg.get("elasticsearch.slowlog.planning.took_millis"));
assertThat(planningTook, greaterThanOrEqualTo(0L));
assertThat(planningTookMillis, is(planningTookMillisExpected));
assertThat(took, greaterThan(planningTook));
}

assertThat(msg.get("elasticsearch.slowlog.query"), is(query));
assertThat(appender.getLastEventAndReset().getLevel(), equalTo(logLevel.getKey()));

boolean success = Boolean.valueOf(msg.get("elasticsearch.slowlog.success"));
assertThat(success, is(expectedException == null));
if (expectedErrorMsg == null) {
assertThat(msg.get("elasticsearch.slowlog.error.message"), is(nullValue()));
} else {
assertThat(msg.get("elasticsearch.slowlog.error.message"), containsString(expectedErrorMsg));
}
if (expectedException == null) {
assertThat(msg.get("elasticsearch.slowlog.error.type"), is(nullValue()));
} else {
assertThat(msg.get("elasticsearch.slowlog.error.type"), is(expectedException));
}
} finally {
latch.countDown();
}
}));

safeAwait(latch);

assertEquals("All requests must respond", 0, latch.getCount());

client().execute(
ClusterUpdateSettingsAction.INSTANCE,
new ClusterUpdateSettingsRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS).persistentSettings(
Settings.builder().putNull(logLevel.getValue())
)
).get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.xpack.esql.session.EsqlSession;
import org.elasticsearch.xpack.esql.session.IndexResolver;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog;
import org.elasticsearch.xpack.esql.telemetry.Metrics;
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetryManager;
Expand All @@ -42,15 +43,17 @@ public class PlanExecutor {
private final Metrics metrics;
private final Verifier verifier;
private final PlanTelemetryManager planTelemetryManager;
private final EsqlSlowLog slowLog;

public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry, XPackLicenseState licenseState) {
public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry, XPackLicenseState licenseState, EsqlSlowLog slowLog) {
this.indexResolver = indexResolver;
this.preAnalyzer = new PreAnalyzer();
this.functionRegistry = new EsqlFunctionRegistry();
this.mapper = new Mapper();
this.metrics = new Metrics(functionRegistry);
this.verifier = new Verifier(metrics, licenseState);
this.planTelemetryManager = new PlanTelemetryManager(meterRegistry);
this.slowLog = slowLog;
}

public void esql(
Expand Down Expand Up @@ -83,13 +86,16 @@ public void esql(
QueryMetric clientId = QueryMetric.fromString("rest");
metrics.total(clientId);

var begin = System.nanoTime();
ActionListener<Result> executeListener = wrap(x -> {
planTelemetryManager.publish(planTelemetry, true);
slowLog.onQueryPhase(x, request.query());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To help with encapsulation, please create two internal methods called when the query is successfully executed or is failing and move the metrics, telemetry and now the slow log there (e.g. onQuerySuccesful(), onQueryFailure())

listener.onResponse(x);
}, ex -> {
// TODO when we decide if we will differentiate Kibana from REST, this String value will likely come from the request
metrics.failed(clientId);
planTelemetryManager.publish(planTelemetry, false);
slowLog.onQueryFailure(request.query(), ex, System.nanoTime() - begin);
listener.onFailure(ex);
});
// Wrap it in a listener so that if we have any exceptions during execution, the listener picks it up
Expand Down
Loading