Skip to content

Commit 7785cde

Browse files
authored
Clear field caps index responses on cancellation (#93716)
Closes #93029
1 parent fb033b9 commit 7785cde

File tree

4 files changed

+177
-119
lines changed

4 files changed

+177
-119
lines changed

docs/changelog/93716.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 93716
2+
summary: Clear field caps index responses on cancelled
3+
area: Search
4+
type: bug
5+
issues:
6+
- 93029

server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java

Lines changed: 61 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.http.entity.ContentType;
1212
import org.apache.http.entity.StringEntity;
13+
import org.apache.logging.log4j.Level;
1314
import org.apache.lucene.util.BytesRef;
1415
import org.elasticsearch.ElasticsearchException;
1516
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
@@ -31,6 +32,7 @@
3132
import org.elasticsearch.common.breaker.CircuitBreakingException;
3233
import org.elasticsearch.common.io.stream.StreamInput;
3334
import org.elasticsearch.common.settings.Settings;
35+
import org.elasticsearch.core.Releasable;
3436
import org.elasticsearch.index.IndexMode;
3537
import org.elasticsearch.index.IndexService;
3638
import org.elasticsearch.index.IndexSettings;
@@ -53,6 +55,8 @@
5355
import org.elasticsearch.search.DummyQueryBuilder;
5456
import org.elasticsearch.tasks.TaskInfo;
5557
import org.elasticsearch.test.ESIntegTestCase;
58+
import org.elasticsearch.test.MockLogAppender;
59+
import org.elasticsearch.test.junit.annotations.TestLogging;
5660
import org.elasticsearch.test.transport.MockTransportService;
5761
import org.elasticsearch.transport.TransportService;
5862
import org.elasticsearch.xcontent.ObjectParser;
@@ -669,50 +673,64 @@ public void testManyIndicesWithSameMapping() {
669673
assertTrue(resp.getField("extra_field").get("integer").isAggregatable());
670674
}
671675

676+
@TestLogging(
677+
value = "org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction:TRACE",
678+
reason = "verify the log output on cancelled"
679+
)
672680
public void testCancel() throws Exception {
673-
BlockingOnRewriteQueryBuilder.blockOnRewrite();
674-
PlainActionFuture<Response> future = PlainActionFuture.newFuture();
675-
Request restRequest = new Request("POST", "/_field_caps?fields=*");
676-
restRequest.setEntity(new StringEntity("""
677-
{
678-
"index_filter": {
679-
"blocking_query": {}
680-
}
681-
}
682-
""", ContentType.APPLICATION_JSON.withCharset(StandardCharsets.UTF_8)));
683-
Cancellable cancellable = getRestClient().performRequestAsync(restRequest, wrapAsRestResponseListener(future));
684-
logger.info("--> waiting for field-caps tasks to be started");
685-
assertBusy(() -> {
686-
List<TaskInfo> tasks = client().admin()
687-
.cluster()
688-
.prepareListTasks()
689-
.setActions("indices:data/read/field_caps", "indices:data/read/field_caps[n]")
690-
.get()
691-
.getTasks();
692-
assertThat(tasks.size(), greaterThanOrEqualTo(2));
693-
for (TaskInfo task : tasks) {
694-
assertTrue(task.cancellable());
695-
assertFalse(task.cancelled());
696-
}
697-
}, 30, TimeUnit.SECONDS);
698-
699-
cancellable.cancel();
700-
logger.info("--> waiting for field-caps tasks to be cancelled");
701-
assertBusy(() -> {
702-
List<TaskInfo> tasks = client().admin()
703-
.cluster()
704-
.prepareListTasks()
705-
.setActions("indices:data/read/field_caps", "indices:data/read/field_caps[n]")
706-
.get()
707-
.getTasks();
708-
for (TaskInfo task : tasks) {
709-
assertTrue(task.cancellable());
710-
assertTrue(task.cancelled());
711-
}
712-
}, 30, TimeUnit.SECONDS);
713-
714-
BlockingOnRewriteQueryBuilder.unblockOnRewrite();
715-
expectThrows(CancellationException.class, future::actionGet);
681+
MockLogAppender logAppender = new MockLogAppender();
682+
try (Releasable ignored = logAppender.capturing(TransportFieldCapabilitiesAction.class)) {
683+
logAppender.addExpectation(
684+
new MockLogAppender.SeenEventExpectation(
685+
"clear resources",
686+
TransportFieldCapabilitiesAction.class.getCanonicalName(),
687+
Level.TRACE,
688+
"clear index responses on cancellation"
689+
)
690+
);
691+
BlockingOnRewriteQueryBuilder.blockOnRewrite();
692+
PlainActionFuture<Response> future = PlainActionFuture.newFuture();
693+
Request restRequest = new Request("POST", "/_field_caps?fields=*");
694+
restRequest.setEntity(new StringEntity("""
695+
{
696+
"index_filter": {
697+
"blocking_query": {}
698+
}
699+
}
700+
""", ContentType.APPLICATION_JSON.withCharset(StandardCharsets.UTF_8)));
701+
Cancellable cancellable = getRestClient().performRequestAsync(restRequest, wrapAsRestResponseListener(future));
702+
logger.info("--> waiting for field-caps tasks to be started");
703+
assertBusy(() -> {
704+
List<TaskInfo> tasks = client().admin()
705+
.cluster()
706+
.prepareListTasks()
707+
.setActions("indices:data/read/field_caps", "indices:data/read/field_caps[n]")
708+
.get()
709+
.getTasks();
710+
assertThat(tasks.size(), greaterThanOrEqualTo(2));
711+
for (TaskInfo task : tasks) {
712+
assertTrue(task.cancellable());
713+
assertFalse(task.cancelled());
714+
}
715+
}, 30, TimeUnit.SECONDS);
716+
cancellable.cancel();
717+
assertBusy(logAppender::assertAllExpectationsMatched);
718+
logger.info("--> waiting for field-caps tasks to be cancelled");
719+
assertBusy(() -> {
720+
List<TaskInfo> tasks = client().admin()
721+
.cluster()
722+
.prepareListTasks()
723+
.setActions("indices:data/read/field_caps", "indices:data/read/field_caps[n]")
724+
.get()
725+
.getTasks();
726+
for (TaskInfo task : tasks) {
727+
assertTrue(task.cancellable());
728+
assertTrue(task.cancelled());
729+
}
730+
}, 30, TimeUnit.SECONDS);
731+
BlockingOnRewriteQueryBuilder.unblockOnRewrite();
732+
expectThrows(CancellationException.class, future::actionGet);
733+
}
716734
}
717735

718736
private void assertIndices(FieldCapabilitiesResponse response, String... indices) {

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 107 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@
1616
import org.elasticsearch.action.support.ActionFilters;
1717
import org.elasticsearch.action.support.ChannelActionListener;
1818
import org.elasticsearch.action.support.HandledTransportAction;
19+
import org.elasticsearch.action.support.RefCountingRunnable;
1920
import org.elasticsearch.client.internal.Client;
2021
import org.elasticsearch.cluster.ClusterState;
2122
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2223
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2324
import org.elasticsearch.cluster.service.ClusterService;
2425
import org.elasticsearch.common.Strings;
2526
import org.elasticsearch.common.inject.Inject;
26-
import org.elasticsearch.common.util.concurrent.CountDown;
2727
import org.elasticsearch.common.util.set.Sets;
2828
import org.elasticsearch.core.Tuple;
2929
import org.elasticsearch.index.shard.ShardId;
3030
import org.elasticsearch.indices.IndicesService;
31+
import org.elasticsearch.logging.LogManager;
32+
import org.elasticsearch.logging.Logger;
3133
import org.elasticsearch.search.SearchService;
3234
import org.elasticsearch.tasks.CancellableTask;
3335
import org.elasticsearch.tasks.Task;
@@ -47,6 +49,8 @@
4749
import java.util.Map;
4850
import java.util.Optional;
4951
import java.util.Set;
52+
import java.util.concurrent.atomic.AtomicBoolean;
53+
import java.util.function.BiConsumer;
5054
import java.util.function.Consumer;
5155
import java.util.function.Function;
5256
import java.util.stream.Collectors;
@@ -56,6 +60,7 @@
5660

5761
public class TransportFieldCapabilitiesAction extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
5862
public static final String ACTION_NODE_NAME = FieldCapabilitiesAction.NAME + "[n]";
63+
public static final Logger LOGGER = LogManager.getLogger(TransportFieldCapabilitiesAction.class);
5964

6065
private final ThreadPool threadPool;
6166
private final TransportService transportService;
@@ -116,61 +121,99 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
116121
}
117122

118123
checkIndexBlocks(clusterState, concreteIndices);
119-
124+
final FailureCollector indexFailures = new FailureCollector();
120125
final Map<String, FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedMap(new HashMap<>());
121126
// This map is used to share the index response for indices which have the same index mapping hash to reduce the memory usage.
122127
final Map<String, FieldCapabilitiesIndexResponse> indexMappingHashToResponses = Collections.synchronizedMap(new HashMap<>());
128+
final Runnable releaseResourcesOnCancel = () -> {
129+
LOGGER.trace("clear index responses on cancellation");
130+
indexFailures.clear();
131+
indexResponses.clear();
132+
indexMappingHashToResponses.clear();
133+
};
123134
final Consumer<FieldCapabilitiesIndexResponse> handleIndexResponse = resp -> {
135+
if (fieldCapTask.isCancelled()) {
136+
releaseResourcesOnCancel.run();
137+
return;
138+
}
124139
if (resp.canMatch() && resp.getIndexMappingHash() != null) {
125140
FieldCapabilitiesIndexResponse curr = indexMappingHashToResponses.putIfAbsent(resp.getIndexMappingHash(), resp);
126141
if (curr != null) {
127142
resp = new FieldCapabilitiesIndexResponse(resp.getIndexName(), curr.getIndexMappingHash(), curr.get(), true);
128143
}
129144
}
130145
indexResponses.putIfAbsent(resp.getIndexName(), resp);
146+
if (fieldCapTask.isCancelled()) {
147+
releaseResourcesOnCancel.run();
148+
}
131149
};
132-
final FailureCollector indexFailures = new FailureCollector();
133-
// One for each cluster including the local cluster
134-
final CountDown completionCounter = new CountDown(1 + remoteClusterIndices.size());
135-
final Runnable countDown = createResponseMerger(request, fieldCapTask, completionCounter, indexResponses, indexFailures, listener);
136-
final RequestDispatcher requestDispatcher = new RequestDispatcher(
137-
clusterService,
138-
transportService,
139-
task,
140-
request,
141-
localIndices,
142-
nowInMillis,
143-
concreteIndices,
144-
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
145-
handleIndexResponse,
146-
indexFailures::collect,
147-
countDown
148-
);
149-
requestDispatcher.execute();
150+
final BiConsumer<String, Exception> handleIndexFailure = (index, error) -> {
151+
if (fieldCapTask.isCancelled()) {
152+
releaseResourcesOnCancel.run();
153+
return;
154+
}
155+
indexFailures.collect(index, error);
156+
if (fieldCapTask.isCancelled()) {
157+
releaseResourcesOnCancel.run();
158+
}
159+
};
160+
final var finishedOrCancelled = new AtomicBoolean();
161+
fieldCapTask.addListener(() -> {
162+
if (finishedOrCancelled.compareAndSet(false, true)) {
163+
releaseResourcesOnCancel.run();
164+
}
165+
});
166+
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
167+
finishedOrCancelled.set(true);
168+
if (fieldCapTask.notifyIfCancelled(listener)) {
169+
releaseResourcesOnCancel.run();
170+
} else {
171+
mergeIndexResponses(request, fieldCapTask, indexResponses, indexFailures, listener);
172+
}
173+
})) {
174+
// local cluster
175+
final RequestDispatcher requestDispatcher = new RequestDispatcher(
176+
clusterService,
177+
transportService,
178+
task,
179+
request,
180+
localIndices,
181+
nowInMillis,
182+
concreteIndices,
183+
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
184+
handleIndexResponse,
185+
handleIndexFailure,
186+
refs.acquire()::close
187+
);
188+
requestDispatcher.execute();
150189

151-
// this is the cross cluster part of this API - we force the other cluster to not merge the results but instead
152-
// send us back all individual index results.
153-
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
154-
String clusterAlias = remoteIndices.getKey();
155-
OriginalIndices originalIndices = remoteIndices.getValue();
156-
Client remoteClusterClient = transportService.getRemoteClusterService().getRemoteClusterClient(threadPool, clusterAlias);
157-
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis);
158-
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
159-
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
160-
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
161-
handleIndexResponse.accept(
162-
new FieldCapabilitiesIndexResponse(indexName, resp.getIndexMappingHash(), resp.get(), resp.canMatch())
163-
);
164-
}
165-
for (FieldCapabilitiesFailure failure : response.getFailures()) {
166-
Exception ex = failure.getException();
167-
indexFailures.collectRemoteException(ex, clusterAlias, failure.getIndices());
168-
}
169-
countDown.run();
170-
}, ex -> {
171-
indexFailures.collectRemoteException(ex, clusterAlias, originalIndices.indices());
172-
countDown.run();
173-
}));
190+
// this is the cross cluster part of this API - we force the other cluster to not merge the results but instead
191+
// send us back all individual index results.
192+
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
193+
String clusterAlias = remoteIndices.getKey();
194+
OriginalIndices originalIndices = remoteIndices.getValue();
195+
Client remoteClusterClient = transportService.getRemoteClusterService().getRemoteClusterClient(threadPool, clusterAlias);
196+
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis);
197+
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
198+
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
199+
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
200+
handleIndexResponse.accept(
201+
new FieldCapabilitiesIndexResponse(indexName, resp.getIndexMappingHash(), resp.get(), resp.canMatch())
202+
);
203+
}
204+
for (FieldCapabilitiesFailure failure : response.getFailures()) {
205+
Exception ex = failure.getException();
206+
for (String index : failure.getIndices()) {
207+
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
208+
}
209+
}
210+
}, ex -> {
211+
for (String index : originalIndices.indices()) {
212+
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
213+
}
214+
});
215+
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.releaseAfter(remoteListener, refs.acquire()));
216+
}
174217
}
175218
}
176219

@@ -181,39 +224,32 @@ private static void checkIndexBlocks(ClusterState clusterState, String[] concret
181224
}
182225
}
183226

184-
private Runnable createResponseMerger(
227+
private void mergeIndexResponses(
185228
FieldCapabilitiesRequest request,
186229
CancellableTask task,
187-
CountDown completionCounter,
188230
Map<String, FieldCapabilitiesIndexResponse> indexResponses,
189231
FailureCollector indexFailures,
190232
ActionListener<FieldCapabilitiesResponse> listener
191233
) {
192-
return () -> {
193-
if (completionCounter.countDown()) {
194-
List<FieldCapabilitiesFailure> failures = indexFailures.build(indexResponses.keySet());
195-
if (indexResponses.size() > 0) {
196-
if (request.isMergeResults()) {
197-
// fork off to the management pool for merging the responses as the operation can run for longer than is acceptable
198-
// on a transport thread in case of large numbers of indices and/or fields
199-
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION)
200-
.submit(ActionRunnable.supply(listener, () -> merge(indexResponses, task, request, new ArrayList<>(failures))));
201-
} else {
202-
listener.onResponse(
203-
new FieldCapabilitiesResponse(new ArrayList<>(indexResponses.values()), new ArrayList<>(failures))
204-
);
205-
}
206-
} else {
207-
// we have no responses at all, maybe because of errors
208-
if (indexFailures.isEmpty() == false) {
209-
// throw back the first exception
210-
listener.onFailure(failures.iterator().next().getException());
211-
} else {
212-
listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList()));
213-
}
214-
}
234+
List<FieldCapabilitiesFailure> failures = indexFailures.build(indexResponses.keySet());
235+
if (indexResponses.size() > 0) {
236+
if (request.isMergeResults()) {
237+
// fork off to the management pool for merging the responses as the operation can run for longer than is acceptable
238+
// on a transport thread in case of large numbers of indices and/or fields
239+
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION)
240+
.submit(ActionRunnable.supply(listener, () -> merge(indexResponses, task, request, new ArrayList<>(failures))));
241+
} else {
242+
listener.onResponse(new FieldCapabilitiesResponse(new ArrayList<>(indexResponses.values()), new ArrayList<>(failures)));
215243
}
216-
};
244+
} else {
245+
// we have no responses at all, maybe because of errors
246+
if (indexFailures.isEmpty() == false) {
247+
// throw back the first exception
248+
listener.onFailure(failures.iterator().next().getException());
249+
} else {
250+
listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList()));
251+
}
252+
}
217253
}
218254

219255
private static FieldCapabilitiesRequest prepareRemoteRequest(
@@ -380,10 +416,8 @@ void collect(String index, Exception e) {
380416
failuresByIndex.putIfAbsent(index, e);
381417
}
382418

383-
void collectRemoteException(Exception ex, String clusterAlias, String[] remoteIndices) {
384-
for (String failedIndex : remoteIndices) {
385-
collect(RemoteClusterAware.buildRemoteIndexName(clusterAlias, failedIndex), ex);
386-
}
419+
void clear() {
420+
failuresByIndex.clear();
387421
}
388422

389423
boolean isEmpty() {

0 commit comments

Comments
 (0)