Skip to content

Commit b74ea91

Browse files
Implement review suggestions
1 parent d2bd672 commit b74ea91

File tree

5 files changed

+117
-83
lines changed

5 files changed

+117
-83
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlSlowLogIT.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@
3030
import java.util.concurrent.ExecutionException;
3131

3232
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
33+
import static org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog.ELASTICSEARCH_SLOWLOG_ERROR_MESSAGE;
34+
import static org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog.ELASTICSEARCH_SLOWLOG_ERROR_TYPE;
35+
import static org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog.ELASTICSEARCH_SLOWLOG_PLANNING_TOOK;
36+
import static org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog.ELASTICSEARCH_SLOWLOG_PLANNING_TOOK_MILLIS;
37+
import static org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog.ELASTICSEARCH_SLOWLOG_QUERY;
38+
import static org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog.ELASTICSEARCH_SLOWLOG_SUCCESS;
39+
import static org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog.ELASTICSEARCH_SLOWLOG_TOOK;
40+
import static org.elasticsearch.xpack.esql.slowlog.EsqlSlowLog.ELASTICSEARCH_SLOWLOG_TOOK_MILLIS;
3341
import static org.hamcrest.Matchers.containsString;
3442
import static org.hamcrest.Matchers.equalTo;
3543
import static org.hamcrest.Matchers.greaterThan;
@@ -39,7 +47,7 @@
3947

4048
public class EsqlSlowLogIT extends AbstractEsqlIntegTestCase {
4149
static MockAppender appender;
42-
static Logger queryLog = LogManager.getLogger(EsqlSlowLog.SLOWLOG_PREFIX + ".query");
50+
static Logger queryLog = LogManager.getLogger(EsqlSlowLog.LOGGER_NAME);
4351
static Level origQueryLogLevel = queryLog.getLevel();
4452

4553
@BeforeClass
@@ -168,36 +176,36 @@ private static void testAllLevels(
168176
return;
169177
}
170178
var msg = (ESLogMessage) appender.lastMessage();
171-
long took = Long.valueOf(msg.get("elasticsearch.slowlog.took"));
179+
long took = Long.valueOf(msg.get(ELASTICSEARCH_SLOWLOG_TOOK));
172180
long tookMillisExpected = took / 1_000_000;
173-
long tookMillis = Long.valueOf(msg.get("elasticsearch.slowlog.took_millis"));
181+
long tookMillis = Long.valueOf(msg.get(ELASTICSEARCH_SLOWLOG_TOOK_MILLIS));
174182
assertThat(took, greaterThan(0L));
175183
assertThat(tookMillis, greaterThanOrEqualTo(timeoutMillis));
176184
assertThat(tookMillis, is(tookMillisExpected));
177185

178186
if (expectedException == null) {
179-
long planningTook = Long.valueOf(msg.get("elasticsearch.slowlog.planning.took"));
187+
long planningTook = Long.valueOf(msg.get(ELASTICSEARCH_SLOWLOG_PLANNING_TOOK));
180188
long planningTookMillisExpected = planningTook / 1_000_000;
181-
long planningTookMillis = Long.valueOf(msg.get("elasticsearch.slowlog.planning.took_millis"));
189+
long planningTookMillis = Long.valueOf(msg.get(ELASTICSEARCH_SLOWLOG_PLANNING_TOOK_MILLIS));
182190
assertThat(planningTook, greaterThanOrEqualTo(0L));
183191
assertThat(planningTookMillis, is(planningTookMillisExpected));
184192
assertThat(took, greaterThan(planningTook));
185193
}
186194

187-
assertThat(msg.get("elasticsearch.slowlog.query"), is(query));
195+
assertThat(msg.get(ELASTICSEARCH_SLOWLOG_QUERY), is(query));
188196
assertThat(appender.getLastEventAndReset().getLevel(), equalTo(logLevel.getKey()));
189197

190-
boolean success = Boolean.valueOf(msg.get("elasticsearch.slowlog.success"));
198+
boolean success = Boolean.valueOf(msg.get(ELASTICSEARCH_SLOWLOG_SUCCESS));
191199
assertThat(success, is(expectedException == null));
192200
if (expectedErrorMsg == null) {
193-
assertThat(msg.get("elasticsearch.slowlog.error.message"), is(nullValue()));
201+
assertThat(msg.get(ELASTICSEARCH_SLOWLOG_ERROR_MESSAGE), is(nullValue()));
194202
} else {
195-
assertThat(msg.get("elasticsearch.slowlog.error.message"), containsString(expectedErrorMsg));
203+
assertThat(msg.get(ELASTICSEARCH_SLOWLOG_ERROR_MESSAGE), containsString(expectedErrorMsg));
196204
}
197205
if (expectedException == null) {
198-
assertThat(msg.get("elasticsearch.slowlog.error.type"), is(nullValue()));
206+
assertThat(msg.get(ELASTICSEARCH_SLOWLOG_ERROR_TYPE), is(nullValue()));
199207
} else {
200-
assertThat(msg.get("elasticsearch.slowlog.error.type"), is(expectedException));
208+
assertThat(msg.get(ELASTICSEARCH_SLOWLOG_ERROR_TYPE), is(expectedException));
201209
}
202210
} finally {
203211
latch.countDown();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,22 +87,36 @@ public void esql(
8787
metrics.total(clientId);
8888

8989
var begin = System.nanoTime();
90-
ActionListener<Result> executeListener = wrap(x -> {
91-
planTelemetryManager.publish(planTelemetry, true);
92-
slowLog.onQueryPhase(x, request.query());
93-
listener.onResponse(x);
94-
}, ex -> {
95-
// TODO when we decide if we will differentiate Kibana from REST, this String value will likely come from the request
96-
metrics.failed(clientId);
97-
planTelemetryManager.publish(planTelemetry, false);
98-
slowLog.onQueryFailure(request.query(), ex, System.nanoTime() - begin);
99-
listener.onFailure(ex);
100-
});
90+
ActionListener<Result> executeListener = wrap(
91+
x -> onQuerySuccess(request, listener, x, planTelemetry),
92+
ex -> onQueryFailure(request, listener, ex, clientId, planTelemetry, begin)
93+
);
10194
// Wrap it in a listener so that if we have any exceptions during execution, the listener picks it up
10295
// and all the metrics are properly updated
10396
ActionListener.run(executeListener, l -> session.execute(request, executionInfo, planRunner, l));
10497
}
10598

99+
private void onQuerySuccess(EsqlQueryRequest request, ActionListener<Result> listener, Result x, PlanTelemetry planTelemetry) {
100+
planTelemetryManager.publish(planTelemetry, true);
101+
slowLog.onQueryPhase(x, request.query());
102+
listener.onResponse(x);
103+
}
104+
105+
private void onQueryFailure(
106+
EsqlQueryRequest request,
107+
ActionListener<Result> listener,
108+
Exception ex,
109+
QueryMetric clientId,
110+
PlanTelemetry planTelemetry,
111+
long begin
112+
) {
113+
// TODO when we decide if we will differentiate Kibana from REST, this String value will likely come from the request
114+
metrics.failed(clientId);
115+
planTelemetryManager.publish(planTelemetry, false);
116+
slowLog.onQueryFailure(request.query(), ex, System.nanoTime() - begin);
117+
listener.onFailure(ex);
118+
}
119+
106120
public IndexResolver indexResolver() {
107121
return indexResolver;
108122
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
156156
);
157157

158158
@Override
159-
public Collection<?> createComponents(Plugin.PluginServices services) {
159+
public Collection<?> createComponents(PluginServices services) {
160160
CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request");
161161
Objects.requireNonNull(circuitBreaker, "request circuit breaker wasn't set");
162162
Settings settings = services.clusterService().getSettings();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/slowlog/EsqlSlowLog.java

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.nio.charset.StandardCharsets;
2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import java.util.function.Supplier;
2324

2425
import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_SLOWLOG_THRESHOLD_INCLUDE_USER_SETTING;
2526
import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_SLOWLOG_THRESHOLD_QUERY_DEBUG_SETTING;
@@ -29,12 +30,19 @@
2930

3031
public final class EsqlSlowLog {
3132

32-
private final ClusterSettings clusterSettings;
33+
public static final String ELASTICSEARCH_SLOWLOG_PREFIX = "elasticsearch.slowlog";
34+
public static final String ELASTICSEARCH_SLOWLOG_ERROR_MESSAGE = ELASTICSEARCH_SLOWLOG_PREFIX + ".error.message";
35+
public static final String ELASTICSEARCH_SLOWLOG_ERROR_TYPE = ELASTICSEARCH_SLOWLOG_PREFIX + ".error.type";
36+
public static final String ELASTICSEARCH_SLOWLOG_TOOK = ELASTICSEARCH_SLOWLOG_PREFIX + ".took";
37+
public static final String ELASTICSEARCH_SLOWLOG_TOOK_MILLIS = ELASTICSEARCH_SLOWLOG_PREFIX + ".took_millis";
38+
public static final String ELASTICSEARCH_SLOWLOG_PLANNING_TOOK = ELASTICSEARCH_SLOWLOG_PREFIX + ".planning.took";
39+
public static final String ELASTICSEARCH_SLOWLOG_PLANNING_TOOK_MILLIS = ELASTICSEARCH_SLOWLOG_PREFIX + ".planning.took_millis";
40+
public static final String ELASTICSEARCH_SLOWLOG_SUCCESS = ELASTICSEARCH_SLOWLOG_PREFIX + ".success";
41+
public static final String ELASTICSEARCH_SLOWLOG_SEARCH_TYPE = ELASTICSEARCH_SLOWLOG_PREFIX + ".search_type";
42+
public static final String ELASTICSEARCH_SLOWLOG_QUERY = ELASTICSEARCH_SLOWLOG_PREFIX + ".query";
3343

34-
public static final String SLOWLOG_PREFIX = "esql.slowlog";
35-
36-
private static final Logger queryLogger = LogManager.getLogger(SLOWLOG_PREFIX + ".query");
37-
private final SecurityContext security;
44+
public static final String LOGGER_NAME = "esql.slowlog.query";
45+
private static final Logger queryLogger = LogManager.getLogger(LOGGER_NAME);
3846

3947
private volatile long queryWarnThreshold;
4048
private volatile long queryInfoThreshold;
@@ -43,13 +51,14 @@ public final class EsqlSlowLog {
4351

4452
private volatile boolean includeUser;
4553

54+
private final SecurityContext security;
55+
4656
public EsqlSlowLog(ClusterSettings settings, SecurityContext security) {
4757
settings.initializeAndWatch(ESQL_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING, this::setQueryWarnThreshold);
4858
settings.initializeAndWatch(ESQL_SLOWLOG_THRESHOLD_QUERY_INFO_SETTING, this::setQueryInfoThreshold);
4959
settings.initializeAndWatch(ESQL_SLOWLOG_THRESHOLD_QUERY_DEBUG_SETTING, this::setQueryDebugThreshold);
5060
settings.initializeAndWatch(ESQL_SLOWLOG_THRESHOLD_QUERY_TRACE_SETTING, this::setQueryTraceThreshold);
5161
settings.initializeAndWatch(ESQL_SLOWLOG_THRESHOLD_INCLUDE_USER_SETTING, this::setIncludeUser);
52-
this.clusterSettings = settings;
5362

5463
this.security = security;
5564
}
@@ -63,37 +72,22 @@ public void onQueryPhase(Result esqlResult, String query) {
6372
return; // TODO review, it happens in some tests, not sure if it's a thing also in prod
6473
}
6574
long tookInNanos = esqlResult.executionInfo().overallTook().nanos();
66-
if (queryWarnThreshold >= 0 && tookInNanos > queryWarnThreshold) {
67-
queryLogger.warn(Message.of(esqlResult, query, user()));
68-
} else if (queryInfoThreshold >= 0 && tookInNanos > queryInfoThreshold) {
69-
queryLogger.info(Message.of(esqlResult, query, user()));
70-
71-
} else if (queryDebugThreshold >= 0 && tookInNanos > queryDebugThreshold) {
72-
queryLogger.debug(Message.of(esqlResult, query, user()));
73-
74-
} else if (queryTraceThreshold >= 0 && tookInNanos > queryTraceThreshold) {
75-
queryLogger.trace(Message.of(esqlResult, query, user()));
76-
77-
}
75+
log(() -> Message.of(esqlResult, query, user()), tookInNanos);
7876
}
7977

80-
private User user() {
81-
User user = null;
82-
if (includeUser && security != null) {
83-
user = security.getUser();
84-
}
85-
return user;
78+
public void onQueryFailure(String query, Exception ex, long tookInNanos) {
79+
log(() -> Message.of(query, tookInNanos, ex, user()), tookInNanos);
8680
}
8781

88-
public void onQueryFailure(String query, Exception ex, long tookInNanos) {
82+
private void log(Supplier<ESLogMessage> logProducer, long tookInNanos) {
8983
if (queryWarnThreshold >= 0 && tookInNanos > queryWarnThreshold) {
90-
queryLogger.warn(Message.of(query, tookInNanos, ex, user()));
84+
queryLogger.warn(logProducer.get());
9185
} else if (queryInfoThreshold >= 0 && tookInNanos > queryInfoThreshold) {
92-
queryLogger.info(Message.of(query, tookInNanos, ex, user()));
86+
queryLogger.info(logProducer.get());
9387
} else if (queryDebugThreshold >= 0 && tookInNanos > queryDebugThreshold) {
94-
queryLogger.debug(Message.of(query, tookInNanos, ex, user()));
88+
queryLogger.debug(logProducer.get());
9589
} else if (queryTraceThreshold >= 0 && tookInNanos > queryTraceThreshold) {
96-
queryLogger.trace(Message.of(query, tookInNanos, ex, user()));
90+
queryLogger.trace(logProducer.get());
9791
}
9892
}
9993

@@ -117,6 +111,14 @@ public void setIncludeUser(boolean includeUser) {
117111
this.includeUser = includeUser;
118112
}
119113

114+
private User user() {
115+
User user = null;
116+
if (includeUser && security != null) {
117+
user = security.getUser();
118+
}
119+
return user;
120+
}
121+
120122
static final class Message {
121123

122124
private static String escapeJson(String text) {
@@ -125,36 +127,41 @@ private static String escapeJson(String text) {
125127
}
126128

127129
public static ESLogMessage of(Result esqlResult, String query, User user) {
128-
Map<String, Object> jsonFields = prepareMap(esqlResult, query, true, user);
129-
130+
Map<String, Object> jsonFields = new HashMap<>();
131+
addGenericFields(jsonFields, query, true, user);
132+
addResultFields(jsonFields, esqlResult);
130133
return new ESLogMessage().withFields(jsonFields);
131134
}
132135

133136
public static ESLogMessage of(String query, long took, Exception exception, User user) {
134-
Map<String, Object> jsonFields = prepareMap(null, query, false, user);
135-
jsonFields.put("elasticsearch.slowlog.error.message", exception.getMessage() == null ? "" : exception.getMessage());
136-
jsonFields.put("elasticsearch.slowlog.error.type", exception.getClass().getName());
137-
jsonFields.put("elasticsearch.slowlog.took", took);
138-
jsonFields.put("elasticsearch.slowlog.took_millis", took / 1_000_000);
137+
Map<String, Object> jsonFields = new HashMap<>();
138+
addGenericFields(jsonFields, query, false, user);
139+
addErrorFields(jsonFields, took, exception);
139140
return new ESLogMessage().withFields(jsonFields);
140141
}
141142

142-
private static Map<String, Object> prepareMap(Result esqlResult, String query, boolean success, User user) {
143-
Map<String, Object> messageFields = new HashMap<>();
143+
private static void addGenericFields(Map<String, Object> fieldMap, String query, boolean success, User user) {
144144
if (user != null) {
145-
messageFields.put("user.name", user.principal());
146-
}
147-
if (esqlResult != null) {
148-
messageFields.put("elasticsearch.slowlog.took", esqlResult.executionInfo().overallTook().nanos());
149-
messageFields.put("elasticsearch.slowlog.took_millis", esqlResult.executionInfo().overallTook().millis());
150-
messageFields.put("elasticsearch.slowlog.planning.took", esqlResult.executionInfo().planningTookTime().nanos());
151-
messageFields.put("elasticsearch.slowlog.planning.took_millis", esqlResult.executionInfo().planningTookTime().millis());
145+
fieldMap.put("user.name", user.principal());
152146
}
153147
String source = escapeJson(query);
154-
messageFields.put("elasticsearch.slowlog.success", success);
155-
messageFields.put("elasticsearch.slowlog.search_type", "ESQL");
156-
messageFields.put("elasticsearch.slowlog.query", source);
157-
return messageFields;
148+
fieldMap.put(ELASTICSEARCH_SLOWLOG_SUCCESS, success);
149+
fieldMap.put(ELASTICSEARCH_SLOWLOG_SEARCH_TYPE, "ESQL");
150+
fieldMap.put(ELASTICSEARCH_SLOWLOG_QUERY, source);
151+
}
152+
153+
private static void addResultFields(Map<String, Object> fieldMap, Result esqlResult) {
154+
fieldMap.put(ELASTICSEARCH_SLOWLOG_TOOK, esqlResult.executionInfo().overallTook().nanos());
155+
fieldMap.put(ELASTICSEARCH_SLOWLOG_TOOK_MILLIS, esqlResult.executionInfo().overallTook().millis());
156+
fieldMap.put(ELASTICSEARCH_SLOWLOG_PLANNING_TOOK, esqlResult.executionInfo().planningTookTime().nanos());
157+
fieldMap.put(ELASTICSEARCH_SLOWLOG_PLANNING_TOOK_MILLIS, esqlResult.executionInfo().planningTookTime().millis());
158+
}
159+
160+
private static void addErrorFields(Map<String, Object> jsonFields, long took, Exception exception) {
161+
jsonFields.put(ELASTICSEARCH_SLOWLOG_TOOK, took);
162+
jsonFields.put(ELASTICSEARCH_SLOWLOG_TOOK_MILLIS, took / 1_000_000);
163+
jsonFields.put(ELASTICSEARCH_SLOWLOG_ERROR_MESSAGE, exception.getMessage() == null ? "" : exception.getMessage());
164+
jsonFields.put(ELASTICSEARCH_SLOWLOG_ERROR_TYPE, exception.getClass().getName());
158165
}
159166
}
160167
}

0 commit comments

Comments
 (0)