Skip to content

Commit 194cacc

Browse files
ES|QL: fix NPE on query log when CCS remotes are unavailable
1 parent 1c17a0f commit 194cacc

File tree

6 files changed

+309
-0
lines changed

6 files changed

+309
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.apache.logging.log4j.Level;
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
14+
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
15+
import org.elasticsearch.common.logging.ESLogMessage;
16+
import org.elasticsearch.common.logging.Loggers;
17+
import org.elasticsearch.common.logging.MockAppender;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.core.TimeValue;
20+
import org.elasticsearch.core.Tuple;
21+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
22+
import org.elasticsearch.xpack.esql.querylog.EsqlQueryLog;
23+
import org.junit.AfterClass;
24+
import org.junit.BeforeClass;
25+
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Set;
29+
30+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
31+
import static org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase.randomIncludeCCSMetadata;
32+
import static org.elasticsearch.xpack.esql.querylog.EsqlQueryLog.ELASTICSEARCH_QUERYLOG_QUERY;
33+
import static org.elasticsearch.xpack.esql.querylog.EsqlQueryLog.ELASTICSEARCH_QUERYLOG_SUCCESS;
34+
import static org.elasticsearch.xpack.esql.querylog.EsqlQueryLog.ELASTICSEARCH_QUERYLOG_TOOK;
35+
import static org.hamcrest.Matchers.equalTo;
36+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
37+
import static org.hamcrest.Matchers.hasSize;
38+
import static org.hamcrest.Matchers.is;
39+
import static org.hamcrest.Matchers.notNullValue;
40+
41+
/**
42+
* Tests ESQL query logging with cross-cluster search when remote clusters are unavailable.
43+
* This tests the fix for <a href="https://github.com/elastic/elasticsearch/issues/142915">issue #142915</a>.
44+
*/
45+
public class CrossClusterQueryLogUnavailableRemotesIT extends AbstractCrossClusterTestCase {
46+
47+
static MockAppender appender;
48+
static Logger queryLog = LogManager.getLogger(EsqlQueryLog.LOGGER_NAME);
49+
static Level origQueryLogLevel = queryLog.getLevel();
50+
51+
@BeforeClass
52+
public static void initQueryLogging() throws IllegalAccessException {
53+
appender = new MockAppender("ccs_querylog_appender");
54+
appender.start();
55+
Loggers.addAppender(queryLog, appender);
56+
Loggers.setLevel(queryLog, Level.TRACE);
57+
}
58+
59+
@AfterClass
60+
public static void cleanupQueryLogging() {
61+
Loggers.removeAppender(queryLog, appender);
62+
appender.stop();
63+
Loggers.setLevel(queryLog, origQueryLogLevel);
64+
}
65+
66+
@Override
67+
protected boolean reuseClusters() {
68+
return false;
69+
}
70+
71+
/**
72+
* Tests that query logging works correctly when all remote clusters are unavailable
73+
* with skip_unavailable=true. This reproduces issue #142915 where NPE occurred due to
74+
* TimeSpanMarkers not being properly stopped before logging.
75+
*/
76+
public void testQueryLoggingWithAllRemotesUnavailable() throws Exception {
77+
int numClusters = 2;
78+
setupClusters(numClusters);
79+
setSkipUnavailable(REMOTE_CLUSTER_1, true);
80+
81+
enableQueryLogging();
82+
83+
try {
84+
cluster(REMOTE_CLUSTER_1).close();
85+
86+
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
87+
Boolean requestIncludeMeta = includeCCSMetadata.v1();
88+
boolean responseExpectMeta = includeCCSMetadata.v2();
89+
90+
String query = "FROM " + REMOTE_CLUSTER_1 + ":logs-* | STATS sum(v)";
91+
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
92+
List<List<Object>> values = getValuesList(resp);
93+
assertThat(values, hasSize(0));
94+
95+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
96+
assertNotNull(executionInfo);
97+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
98+
assertThat(executionInfo.isPartial(), equalTo(true));
99+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1)));
100+
101+
EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
102+
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
103+
104+
assertClusterMetadataInResponse(resp, responseExpectMeta, 1);
105+
106+
assertQueryLogged(query);
107+
}
108+
} finally {
109+
disableQueryLogging();
110+
clearSkipUnavailable(numClusters);
111+
}
112+
}
113+
114+
/**
115+
* Tests query logging when one remote is unavailable but local cluster and another remote succeed.
116+
*/
117+
public void testQueryLoggingWithPartialRemoteFailure() throws Exception {
118+
int numClusters = 3;
119+
Map<String, Object> testClusterInfo = setupClusters(numClusters);
120+
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
121+
int remote2NumShards = (Integer) testClusterInfo.get("remote2.num_shards");
122+
setSkipUnavailable(REMOTE_CLUSTER_1, true);
123+
setSkipUnavailable(REMOTE_CLUSTER_2, true);
124+
125+
enableQueryLogging();
126+
127+
try {
128+
cluster(REMOTE_CLUSTER_1).close();
129+
130+
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
131+
Boolean requestIncludeMeta = includeCCSMetadata.v1();
132+
boolean responseExpectMeta = includeCCSMetadata.v2();
133+
134+
String query = "FROM logs-*,*:logs-* | STATS sum(v)";
135+
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
136+
List<List<Object>> values = getValuesList(resp);
137+
assertThat(values, hasSize(1));
138+
assertThat(values.get(0), equalTo(List.of(330L)));
139+
140+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
141+
assertNotNull(executionInfo);
142+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
143+
long overallTookMillis = executionInfo.overallTook().millis();
144+
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
145+
assertThat(executionInfo.isPartial(), equalTo(true));
146+
147+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));
148+
149+
EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
150+
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
151+
152+
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
153+
assertClusterInfoSuccess(remote2Cluster, remote2NumShards);
154+
155+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
156+
assertClusterInfoSuccess(localCluster, localNumShards);
157+
158+
assertClusterMetadataInResponse(resp, responseExpectMeta, numClusters);
159+
160+
assertQueryLogged(query);
161+
}
162+
} finally {
163+
disableQueryLogging();
164+
clearSkipUnavailable(numClusters);
165+
}
166+
}
167+
168+
/**
169+
* Tests query logging when both remotes are unavailable but local cluster succeeds.
170+
*/
171+
public void testQueryLoggingWithAllRemotesUnavailableLocalSucceeds() throws Exception {
172+
int numClusters = 3;
173+
Map<String, Object> testClusterInfo = setupClusters(numClusters);
174+
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
175+
setSkipUnavailable(REMOTE_CLUSTER_1, true);
176+
setSkipUnavailable(REMOTE_CLUSTER_2, true);
177+
178+
enableQueryLogging();
179+
180+
try {
181+
cluster(REMOTE_CLUSTER_1).close();
182+
cluster(REMOTE_CLUSTER_2).close();
183+
184+
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
185+
Boolean requestIncludeMeta = includeCCSMetadata.v1();
186+
boolean responseExpectMeta = includeCCSMetadata.v2();
187+
188+
String query = "FROM logs-*,*:logs-* | STATS sum(v)";
189+
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
190+
List<List<Object>> values = getValuesList(resp);
191+
assertThat(values, hasSize(1));
192+
assertThat(values.get(0), equalTo(List.of(45L)));
193+
194+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
195+
assertNotNull(executionInfo);
196+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
197+
assertThat(executionInfo.isPartial(), equalTo(true));
198+
199+
EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
200+
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
201+
202+
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
203+
assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
204+
205+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
206+
assertClusterInfoSuccess(localCluster, localNumShards);
207+
208+
assertClusterMetadataInResponse(resp, responseExpectMeta, numClusters);
209+
210+
assertQueryLogged(query);
211+
}
212+
} finally {
213+
disableQueryLogging();
214+
clearSkipUnavailable(numClusters);
215+
}
216+
}
217+
218+
private void enableQueryLogging() throws Exception {
219+
client(LOCAL_CLUSTER).execute(
220+
ClusterUpdateSettingsAction.INSTANCE,
221+
new ClusterUpdateSettingsRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS).persistentSettings(
222+
Settings.builder().put(EsqlPlugin.ESQL_QUERYLOG_THRESHOLD_TRACE_SETTING.getKey(), "0ms")
223+
)
224+
).get();
225+
}
226+
227+
private void disableQueryLogging() throws Exception {
228+
client(LOCAL_CLUSTER).execute(
229+
ClusterUpdateSettingsAction.INSTANCE,
230+
new ClusterUpdateSettingsRequest(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS).persistentSettings(
231+
Settings.builder().putNull(EsqlPlugin.ESQL_QUERYLOG_THRESHOLD_TRACE_SETTING.getKey())
232+
)
233+
).get();
234+
}
235+
236+
private void assertQueryLogged(String expectedQuery) {
237+
assertThat("Query should have been logged", appender.lastEvent(), is(notNullValue()));
238+
var msg = (ESLogMessage) appender.lastMessage();
239+
assertThat(msg.get(ELASTICSEARCH_QUERYLOG_QUERY), is(expectedQuery));
240+
assertThat(msg.get(ELASTICSEARCH_QUERYLOG_SUCCESS), is("true"));
241+
assertThat(Long.valueOf(msg.get(ELASTICSEARCH_QUERYLOG_TOOK)), greaterThanOrEqualTo(0L));
242+
appender.getLastEventAndReset();
243+
}
244+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryProfile.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,16 @@ public Collection<TimeSpanMarker> timeSpanMarkers() {
213213
return List.of(totalMarker, planningMarker, parsingMarker, preAnalysisMarker, dependencyResolutionMarker, analysisMarker);
214214
}
215215

216+
/**
217+
* Safely stops all markers that were started but not yet stopped.
218+
* This is useful in error paths where we need to ensure all timing data is captured.
219+
*/
220+
public void stopAllStartedMarkers() {
221+
for (TimeSpanMarker marker : timeSpanMarkers()) {
222+
marker.stopIfStarted();
223+
}
224+
}
225+
216226
@Override
217227
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
218228
for (TimeSpanMarker timeSpanMarker : timeSpanMarkers()) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/TimeSpanMarker.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ public void stop() {
5151
timeSpan = timeSpanBuilder.stop();
5252
}
5353

54+
/**
55+
* Safely stops the marker only if it was started but not yet stopped.
56+
* This is useful in error paths where we don't know which markers were started.
57+
*/
58+
public void stopIfStarted() {
59+
if (timeSpanBuilder != null && timeSpan == null) {
60+
timeSpan = timeSpanBuilder.stop();
61+
}
62+
}
63+
5464
public TimeValue timeTook() {
5565
return timeSpan == null ? null : timeSpan.toTimeValue();
5666
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLog.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ private static void addResultFields(Map<String, Object> fieldMap, Result esqlRes
142142
EsqlQueryProfile esqlQueryProfile = esqlResult.executionInfo().queryProfile();
143143
for (TimeSpanMarker timeSpanMarker : esqlQueryProfile.timeSpanMarkers()) {
144144
TimeValue timeTook = timeSpanMarker.timeTook();
145+
if (timeTook == null) {
146+
continue;
147+
}
145148
String namePrefix = ELASTICSEARCH_QUERYLOG_PREFIX + timeSpanMarker.name();
146149
fieldMap.put(namePrefix + ELASTICSEARCH_QUERYLOG_TOOK_SUFFIX, timeTook.nanos());
147150
fieldMap.put(namePrefix + ELASTICSEARCH_QUERYLOG_TOOK_MILLIS_SUFFIX, timeTook.millis());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exc
137137
static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionInfo, Exception e) {
138138
// This applies even for subplans - if we had an error and have to skip a cluster, then it will remain skipped.
139139
executionInfo.markEndQuery();
140+
executionInfo.queryProfile().stopAllStartedMarkers();
140141
Exception exceptionForResponse;
141142
if (e instanceof ConnectTransportException) {
142143
// when field-caps has no field info (since no clusters could be connected to or had matching indices)

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,45 @@ private static TimeSpan randomTimeSpan() {
211211
long stopNanos = startNanos + randomLongBetween(1, 100_000);
212212
return new TimeSpan(startNanos / 1_000_000, startNanos, stopNanos / 1_000_000, stopNanos);
213213
}
214+
215+
/**
216+
* Tests that query logging does not throw NPE when some TimeSpanMarkers have null timeTook values.
217+
* This can happen in scenarios like CCS queries where a remote cluster is unavailable.
218+
* See <a href="https://github.com/elastic/elasticsearch/issues/142915">issue #142915</a>.
219+
*/
220+
public void testQueryLoggingWithNullTimeSpanMarkers() {
221+
EsqlQueryLog queryLog = new EsqlQueryLog(settings, mockLogFieldProvider());
222+
String query = "from " + randomAlphaOfLength(10);
223+
224+
long tookNanos = randomLongBetween(40_000_000, 50_000_000);
225+
EsqlExecutionInfo executionInfo = getEsqlExecutionInfoWithNullTimeSpans(tookNanos);
226+
queryLog.onQueryPhase(
227+
new Versioned<>(
228+
new Result(List.of(), List.of(), EsqlTestUtils.TEST_CFG, DriverCompletionInfo.EMPTY, executionInfo),
229+
TransportVersion.current()
230+
),
231+
query
232+
);
233+
234+
assertThat(appender.lastEvent(), is(not(nullValue())));
235+
var msg = (ESLogMessage) appender.lastMessage();
236+
long took = Long.valueOf(msg.get(ELASTICSEARCH_QUERYLOG_TOOK));
237+
assertThat(took, is(tookNanos));
238+
assertThat(msg.get(ELASTICSEARCH_QUERYLOG_QUERY), is(query));
239+
assertThat(appender.getLastEventAndReset().getLevel(), equalTo(Level.WARN));
240+
}
241+
242+
private static EsqlExecutionInfo getEsqlExecutionInfoWithNullTimeSpans(long tookNanos) {
243+
return new EsqlExecutionInfo(Predicates.always(), EsqlExecutionInfo.IncludeExecutionMetadata.CCS_ONLY) {
244+
@Override
245+
public TimeValue overallTook() {
246+
return new TimeValue(tookNanos, TimeUnit.NANOSECONDS);
247+
}
248+
249+
@Override
250+
public EsqlQueryProfile queryProfile() {
251+
return new EsqlQueryProfile(null, null, null, null, null, null, 0);
252+
}
253+
};
254+
}
214255
}

0 commit comments

Comments
 (0)