Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/129164.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 129164
summary: Log partial failures
area: ES|QL
type: feature
issues: []
36 changes: 23 additions & 13 deletions server/src/main/java/org/elasticsearch/rest/RestResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

package org.elasticsearch.rest;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray;
Expand Down Expand Up @@ -126,18 +126,8 @@ public RestResponse(RestChannel channel, RestStatus status, Exception e) throws
this.status = status;
ToXContent.Params params = channel.request();
if (e != null) {
Supplier<?> messageSupplier = () -> String.format(
Locale.ROOT,
"path: %s, params: %s, status: %d",
channel.request().rawPath(),
channel.request().params(),
status.getStatus()
);
if (status.getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e)) {
SUPPRESSED_ERROR_LOGGER.debug(messageSupplier, e);
} else {
SUPPRESSED_ERROR_LOGGER.warn(messageSupplier, e);
}
Level level = status.getStatus() < 500 || ExceptionsHelper.isNodeOrShardUnavailableTypeException(e) ? Level.DEBUG : Level.WARN;
logSuppressedError(level, channel.request().rawPath(), channel.request().params(), status, null, e);
}
// if "error_trace" is turned on in the request, we want to render it in the rest response
// for that the REST_EXCEPTION_SKIP_STACK_TRACE flag that if "true" omits the stack traces is
Expand Down Expand Up @@ -254,4 +244,24 @@ public Map<String, List<String>> filterHeaders(Map<String, List<String>> headers
public void close() {
Releasables.closeExpectNoException(releasable);
}

/**
* Log failures to the {@code rest.suppressed} logger.
*/
public static void logSuppressedError(
Level level,
String rawPath,
Map<String, String> params,
RestStatus status,
@Nullable String extra,
Exception e
) {
if (SUPPRESSED_ERROR_LOGGER.isEnabled(level)) {
String message = String.format(Locale.ROOT, "path: %s, params: %s, status: %d", rawPath, params, status.getStatus());
if (extra != null) {
message += ", " + extra;
}
SUPPRESSED_ERROR_LOGGER.log(level, message, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xcontent.MediaType;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.esql.arrow.ArrowFormat;
Expand All @@ -30,6 +32,7 @@

import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Expand Down Expand Up @@ -121,6 +124,7 @@ private EsqlResponseListener(RestChannel channel, RestRequest restRequest, Strin

@Override
protected void processResponse(EsqlQueryResponse esqlQueryResponse) throws IOException {
logPartialFailures(channel.request().rawPath(), channel.request().params(), esqlQueryResponse.getExecutionInfo());
channel.sendResponse(buildResponse(esqlQueryResponse));
}

Expand Down Expand Up @@ -229,4 +233,28 @@ private void checkDelimiter() {
throw new IllegalArgumentException(message);
}
}

/**
* Log all partial request failures to the {@code rest.suppressed} logger
* so an operator can categorize them after the fact.
*/
static void logPartialFailures(String rawPath, Map<String, String> params, EsqlExecutionInfo executionInfo) {
if (executionInfo == null) {
return;
}
for (EsqlExecutionInfo.Cluster cluster : executionInfo.getClusters().values()) {
for (ShardSearchFailure failure : cluster.getFailures()) {
RestResponse.logSuppressedError(
org.apache.logging.log4j.Level.WARN,
rawPath,
params,
RestStatus.OK,
cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)
? null
: "cluster: " + cluster.getClusterAlias(),
failure
);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.logging.log4j.core.filter.RegexFilter;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteClusterAware;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class EsqlResponseListenerTests extends ESTestCase {
private final String LOCAL_CLUSTER_ALIAS = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;

private static MockAppender appender;
static Logger restSuppressedLogger = LogManager.getLogger("rest.suppressed");

@BeforeClass
public static void init() throws IllegalAccessException {
appender = new MockAppender("testAppender");
appender.start();
Configurator.setLevel(restSuppressedLogger, Level.DEBUG);
Loggers.addAppender(restSuppressedLogger, appender);
}

@After
public void clear() {
appender.events.clear();
}

@AfterClass
public static void cleanup() {
appender.stop();
Loggers.removeAppender(restSuppressedLogger, appender);
}

public void testLogPartialFailures() {
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(false);
executionInfo.swapCluster(
LOCAL_CLUSTER_ALIAS,
(k, v) -> new EsqlExecutionInfo.Cluster(
LOCAL_CLUSTER_ALIAS,
"idx",
false,
EsqlExecutionInfo.Cluster.Status.SUCCESSFUL,
10,
10,
3,
0,
List.of(
new ShardSearchFailure(new Exception("dummy"), target(0)),
new ShardSearchFailure(new Exception("error"), target(1))
),
new TimeValue(4444L)
)
);
EsqlResponseListener.logPartialFailures("/_query", Map.of(), executionInfo);

assertThat(appender.events, hasSize(2));
LogEvent logEvent = appender.events.get(0);
assertThat(logEvent.getLevel(), equalTo(Level.WARN));
assertThat(logEvent.getMessage().getFormattedMessage(), equalTo("path: /_query, params: {}, status: 200"));
assertThat(logEvent.getThrown().getCause().getMessage(), equalTo("dummy"));
logEvent = appender.events.get(1);
assertThat(logEvent.getLevel(), equalTo(Level.WARN));
assertThat(logEvent.getMessage().getFormattedMessage(), equalTo("path: /_query, params: {}, status: 200"));
assertThat(logEvent.getThrown().getCause().getMessage(), equalTo("error"));
}

public void testLogPartialFailuresRemote() {
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(false);
executionInfo.swapCluster(
"remote_cluster",
(k, v) -> new EsqlExecutionInfo.Cluster(
"remote_cluster",
"idx",
false,
EsqlExecutionInfo.Cluster.Status.SUCCESSFUL,
10,
10,
3,
0,
List.of(new ShardSearchFailure(new Exception("dummy"), target(0))),
new TimeValue(4444L)
)
);
EsqlResponseListener.logPartialFailures("/_query", Map.of(), executionInfo);

assertThat(appender.events, hasSize(1));
LogEvent logEvent = appender.events.get(0);
assertThat(logEvent.getLevel(), equalTo(Level.WARN));
assertThat(logEvent.getMessage().getFormattedMessage(), equalTo("path: /_query, params: {}, status: 200, cluster: remote_cluster"));
assertThat(logEvent.getThrown().getCause().getMessage(), equalTo("dummy"));
}

private SearchShardTarget target(int shardId) {
return new SearchShardTarget("node", new ShardId("idx", "uuid", 0), LOCAL_CLUSTER_ALIAS);
}

private static class MockAppender extends AbstractAppender {
public final List<LogEvent> events = new ArrayList<>();

MockAppender(final String name) throws IllegalAccessException {
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false);
}

@Override
public void append(LogEvent event) {
events.add(event.toImmutable());
}
}
}
Loading