Skip to content

Commit 378595d

Browse files
committed
ESQL: Log partial failures
Now that ESQL has `allow_partial_results` we can reply with a `200` even though some nodes failed to run ESQL. This could happen because the node is restarting. Or because of a bug. Or a disconnect. All kinds of things. This logs those partial failures so an operator can look at them and get a sense of why they are happening.
1 parent 6e67fac commit 378595d

File tree

3 files changed

+125
-13
lines changed

3 files changed

+125
-13
lines changed

server/src/main/java/org/elasticsearch/rest/RestResponse.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99

1010
package org.elasticsearch.rest;
1111

12+
import org.apache.logging.log4j.Level;
1213
import org.apache.logging.log4j.LogManager;
1314
import org.apache.logging.log4j.Logger;
14-
import org.apache.logging.log4j.util.Supplier;
1515
import org.elasticsearch.ElasticsearchException;
1616
import org.elasticsearch.ExceptionsHelper;
1717
import org.elasticsearch.common.bytes.BytesArray;
@@ -126,18 +126,8 @@ public RestResponse(RestChannel channel, RestStatus status, Exception e) throws
126126
this.status = status;
127127
ToXContent.Params params = channel.request();
128128
if (e != null) {
129-
Supplier<?> messageSupplier = () -> String.format(
130-
Locale.ROOT,
131-
"path: %s, params: %s, status: %d",
132-
channel.request().rawPath(),
133-
channel.request().params(),
134-
status.getStatus()
135-
);
136-
if (status.getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e)) {
137-
SUPPRESSED_ERROR_LOGGER.debug(messageSupplier, e);
138-
} else {
139-
SUPPRESSED_ERROR_LOGGER.warn(messageSupplier, e);
140-
}
129+
Level level = status.getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e) ? Level.DEBUG : Level.WARN;
130+
suppressedError(level, channel.request().rawPath(), channel.request().params(), status, e);
141131
}
142132
// if "error_trace" is turned on in the request, we want to render it in the rest response
143133
// for that the REST_EXCEPTION_SKIP_STACK_TRACE flag that if "true" omits the stack traces is
@@ -254,4 +244,14 @@ public Map<String, List<String>> filterHeaders(Map<String, List<String>> headers
254244
public void close() {
255245
Releasables.closeExpectNoException(releasable);
256246
}
247+
248+
public static void suppressedError(Level level, String rawPath, Map<String, String> params, RestStatus status, Exception e) {
249+
if (SUPPRESSED_ERROR_LOGGER.isEnabled(level)) {
250+
SUPPRESSED_ERROR_LOGGER.log(
251+
level,
252+
String.format(Locale.ROOT, "path: %s, params: %s, status: %d", rawPath, params, status.getStatus()),
253+
e
254+
);
255+
}
256+
}
257257
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.ExceptionsHelper;
1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.search.ShardSearchFailure;
1213
import org.elasticsearch.core.Releasable;
1314
import org.elasticsearch.core.Releasables;
1415
import org.elasticsearch.core.TimeValue;
@@ -30,6 +31,7 @@
3031

3132
import java.io.IOException;
3233
import java.util.Locale;
34+
import java.util.Map;
3335
import java.util.concurrent.TimeUnit;
3436
import java.util.function.Consumer;
3537

@@ -121,6 +123,7 @@ private EsqlResponseListener(RestChannel channel, RestRequest restRequest, Strin
121123

122124
@Override
123125
protected void processResponse(EsqlQueryResponse esqlQueryResponse) throws IOException {
126+
logPartialResponseErrors(channel.request().rawPath(), channel.request().params(), esqlQueryResponse.getExecutionInfo());
124127
channel.sendResponse(buildResponse(esqlQueryResponse));
125128
}
126129

@@ -229,4 +232,12 @@ private void checkDelimiter() {
229232
throw new IllegalArgumentException(message);
230233
}
231234
}
235+
236+
static void logPartialResponseErrors(String rawPath, Map<String, String> params, EsqlExecutionInfo exeuctionInfo) {
237+
for (EsqlExecutionInfo.Cluster cluster : exeuctionInfo.getClusters().values()) {
238+
for (ShardSearchFailure failure : cluster.getFailures()) {
239+
RestResponse.suppressedError(org.apache.logging.log4j.Level.WARN, rawPath, params, RestStatus.OK, failure);
240+
}
241+
}
242+
}
232243
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.apache.logging.log4j.Level;
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.core.LogEvent;
14+
import org.apache.logging.log4j.core.appender.AbstractAppender;
15+
import org.apache.logging.log4j.core.config.Configurator;
16+
import org.apache.logging.log4j.core.filter.RegexFilter;
17+
import org.elasticsearch.action.search.ShardSearchFailure;
18+
import org.elasticsearch.common.logging.Loggers;
19+
import org.elasticsearch.core.TimeValue;
20+
import org.elasticsearch.index.shard.ShardId;
21+
import org.elasticsearch.search.SearchShardTarget;
22+
import org.elasticsearch.test.ESTestCase;
23+
import org.elasticsearch.transport.RemoteClusterAware;
24+
import org.junit.AfterClass;
25+
import org.junit.BeforeClass;
26+
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.Map;
30+
31+
import static org.hamcrest.Matchers.equalTo;
32+
33+
public class EsqlResponseListenerTests extends ESTestCase {
34+
private final String LOCAL_CLUSTER_ALIAS = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
35+
36+
private static MockAppender appender;
37+
static Logger restSuppressedLogger = LogManager.getLogger("rest.suppressed");
38+
39+
@BeforeClass
40+
public static void init() throws IllegalAccessException {
41+
appender = new MockAppender("testAppender");
42+
appender.start();
43+
Configurator.setLevel(restSuppressedLogger, Level.DEBUG);
44+
Loggers.addAppender(restSuppressedLogger, appender);
45+
}
46+
47+
@AfterClass
48+
public static void cleanup() {
49+
appender.stop();
50+
Loggers.removeAppender(restSuppressedLogger, appender);
51+
}
52+
53+
public void testLogPartialResponseErrors() {
54+
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(false);
55+
executionInfo.swapCluster(
56+
LOCAL_CLUSTER_ALIAS,
57+
(k, v) -> new EsqlExecutionInfo.Cluster(
58+
LOCAL_CLUSTER_ALIAS,
59+
"idx",
60+
false,
61+
EsqlExecutionInfo.Cluster.Status.SUCCESSFUL,
62+
10,
63+
10,
64+
3,
65+
0,
66+
List.of(
67+
new ShardSearchFailure(new Exception("dummy"), target(0)),
68+
new ShardSearchFailure(new Exception("error"), target(1))
69+
),
70+
new TimeValue(4444L)
71+
)
72+
);
73+
EsqlResponseListener.logPartialResponseErrors("/_query", Map.of(), executionInfo);
74+
75+
LogEvent logEvent = appender.events.get(0);
76+
assertThat(logEvent.getLevel(), equalTo(Level.WARN));
77+
assertThat(logEvent.getMessage().getFormattedMessage(), equalTo("path: /_query, params: {}, status: 200"));
78+
assertThat(logEvent.getThrown().getCause().getMessage(), equalTo("dummy"));
79+
logEvent = appender.events.get(1);
80+
assertThat(logEvent.getLevel(), equalTo(Level.WARN));
81+
assertThat(logEvent.getMessage().getFormattedMessage(), equalTo("path: /_query, params: {}, status: 200"));
82+
assertThat(logEvent.getThrown().getCause().getMessage(), equalTo("error"));
83+
}
84+
85+
private SearchShardTarget target(int shardId) {
86+
return new SearchShardTarget("node", new ShardId("idx", "uuid", 0), LOCAL_CLUSTER_ALIAS);
87+
}
88+
89+
private static class MockAppender extends AbstractAppender {
90+
public final List<LogEvent> events = new ArrayList<>();
91+
92+
MockAppender(final String name) throws IllegalAccessException {
93+
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false);
94+
}
95+
96+
@Override
97+
public void append(LogEvent event) {
98+
events.add(event.toImmutable());
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)