Skip to content

Commit 7538860

Browse files
Test for cancelled task in TransportSnapshotsStatusAction.buildResponse()
Testing for cancellation in buildResponse() avoids a lot of unnecessary processing in scenarios with many shards. Closes ES-10981.
1 parent c1ecafa commit 7538860

File tree

2 files changed

+192
-1
lines changed

2 files changed

+192
-1
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ protected void masterOperation(
165165

166166
}
167167

168-
private void buildResponse(
168+
// Package access for testing.
169+
void buildResponse(
169170
SnapshotsInProgress snapshotsInProgress,
170171
SnapshotsStatusRequest request,
171172
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
@@ -190,6 +191,9 @@ private void buildResponse(
190191
for (Map.Entry<RepositoryShardId, SnapshotsInProgress.ShardSnapshotStatus> shardEntry : entry
191192
.shardSnapshotStatusByRepoShardId()
192193
.entrySet()) {
194+
if (task.notifyIfCancelled(listener)) {
195+
return;
196+
}
193197
SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.getValue();
194198
if (status.nodeId() != null) {
195199
// We should have information about this shard from the shard:
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.snapshots.status;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.support.ActionFilters;
14+
import org.elasticsearch.client.internal.node.NodeClient;
15+
import org.elasticsearch.cluster.SnapshotsInProgress;
16+
import org.elasticsearch.cluster.metadata.ProjectId;
17+
import org.elasticsearch.cluster.service.ClusterService;
18+
import org.elasticsearch.index.IndexVersion;
19+
import org.elasticsearch.index.shard.ShardId;
20+
import org.elasticsearch.repositories.IndexId;
21+
import org.elasticsearch.repositories.RepositoriesService;
22+
import org.elasticsearch.repositories.ShardGeneration;
23+
import org.elasticsearch.snapshots.Snapshot;
24+
import org.elasticsearch.snapshots.SnapshotId;
25+
import org.elasticsearch.tasks.CancellableTask;
26+
import org.elasticsearch.tasks.TaskCancelHelper;
27+
import org.elasticsearch.tasks.TaskCancelledException;
28+
import org.elasticsearch.test.ClusterServiceUtils;
29+
import org.elasticsearch.test.ESTestCase;
30+
import org.elasticsearch.test.transport.CapturingTransport;
31+
import org.elasticsearch.threadpool.TestThreadPool;
32+
import org.elasticsearch.threadpool.ThreadPool;
33+
import org.elasticsearch.transport.TransportService;
34+
import org.junit.After;
35+
import org.junit.Before;
36+
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.Set;
40+
import java.util.concurrent.atomic.AtomicBoolean;
41+
import java.util.function.Consumer;
42+
43+
public class TransportSnapshotsStatusActionTests extends ESTestCase {
44+
45+
private ThreadPool threadPool;
46+
private ClusterService clusterService;
47+
private TransportService transportService;
48+
private RepositoriesService repositoriesService;
49+
private TransportSnapshotsStatusAction action;
50+
51+
@Override
52+
@Before
53+
public void setUp() throws Exception {
54+
super.setUp();
55+
threadPool = new TestThreadPool(TransportSnapshotsStatusActionTests.class.getName());
56+
clusterService = ClusterServiceUtils.createClusterService(threadPool);
57+
transportService = new CapturingTransport().createTransportService(
58+
clusterService.getSettings(),
59+
threadPool,
60+
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
61+
address -> clusterService.localNode(),
62+
clusterService.getClusterSettings(),
63+
Set.of()
64+
);
65+
final var nodeClient = new NodeClient(clusterService.getSettings(), threadPool);
66+
repositoriesService = new RepositoriesService(
67+
clusterService.getSettings(),
68+
clusterService,
69+
Map.of(),
70+
Map.of(),
71+
threadPool,
72+
nodeClient,
73+
List.of()
74+
);
75+
action = new TransportSnapshotsStatusAction(
76+
transportService,
77+
clusterService,
78+
threadPool,
79+
repositoriesService,
80+
nodeClient,
81+
new ActionFilters(Set.of())
82+
);
83+
}
84+
85+
@Override
86+
@After
87+
public void tearDown() throws Exception {
88+
super.tearDown();
89+
threadPool.shutdown();
90+
repositoriesService.close();
91+
transportService.close();
92+
clusterService.close();
93+
}
94+
95+
public void testBuildResponseDetectsTaskIsCancelledWhileProcessingCurrentSnapshotEntries() throws Exception {
96+
runBasicBuildResponseTest(true);
97+
}
98+
99+
public void testBuildResponseInvokesListenerWithResponseWhenTaskIsNotCancelled() throws Exception {
100+
runBasicBuildResponseTest(false);
101+
}
102+
103+
private void runBasicBuildResponseTest(boolean shouldCancelTask) {
104+
final var expectedSnapshot = new Snapshot(ProjectId.DEFAULT, "test-repo", new SnapshotId("snapshot", "uuid"));
105+
final var expectedState = SnapshotsInProgress.State.STARTED;
106+
final var currentSnapshotEntries = List.of(
107+
SnapshotsInProgress.Entry.snapshot(
108+
expectedSnapshot,
109+
randomBoolean(),
110+
randomBoolean(),
111+
SnapshotsInProgress.State.STARTED,
112+
Map.of("index", new IndexId("index", "uuid")),
113+
List.of(),
114+
List.of(),
115+
randomNonNegativeLong(),
116+
randomNonNegativeLong(),
117+
Map.of(new ShardId("index", "uuid", 0), new SnapshotsInProgress.ShardSnapshotStatus("node", new ShardGeneration("gen"))),
118+
null,
119+
Map.of(),
120+
IndexVersion.current()
121+
)
122+
);
123+
final var nodeSnapshotStatuses = new TransportNodesSnapshotsStatus.NodesSnapshotStatus(
124+
clusterService.getClusterName(),
125+
List.of(),
126+
List.of()
127+
);
128+
129+
// Run some sanity checks for when the task is not cancelled and we get back a response object.
130+
// Note that thorough verification of the SnapshotsStatusResponse is done in the higher level SnapshotStatus API integration tests.
131+
final Consumer<SnapshotsStatusResponse> verifyResponse = rsp -> {
132+
assertNotNull(rsp);
133+
final var snapshotStatuses = rsp.getSnapshots();
134+
assertNotNull(snapshotStatuses);
135+
assertEquals(1, snapshotStatuses.size());
136+
final var snapshotStatus = snapshotStatuses.getFirst();
137+
assertNotNull(snapshotStatus.getSnapshot());
138+
assertEquals(expectedSnapshot, snapshotStatus.getSnapshot());
139+
assertEquals(expectedState, snapshotStatus.getState());
140+
final var snapshotStatusShards = snapshotStatus.getShards();
141+
assertNotNull(snapshotStatusShards);
142+
assertEquals(1, snapshotStatusShards.size());
143+
final var snapshotStatusIndices = snapshotStatus.getIndices();
144+
assertNotNull(snapshotStatusIndices);
145+
assertEquals(1, snapshotStatusIndices.size());
146+
assertTrue(snapshotStatusIndices.containsKey("index"));
147+
assertNotNull(snapshotStatus.getShardsStats());
148+
};
149+
150+
final var listener = new ActionListener<SnapshotsStatusResponse>() {
151+
@Override
152+
public void onResponse(SnapshotsStatusResponse rsp) {
153+
if (shouldCancelTask) {
154+
fail("expected detection of task cancellation");
155+
} else {
156+
verifyResponse.accept(rsp);
157+
}
158+
}
159+
160+
@Override
161+
public void onFailure(Exception e) {
162+
if (shouldCancelTask) {
163+
assertTrue(e instanceof TaskCancelledException);
164+
} else {
165+
fail("expected normal response when task is not cancelled");
166+
}
167+
}
168+
};
169+
170+
final var listenerInvoked = new AtomicBoolean(false);
171+
final var cancellableTask = new CancellableTask(randomLong(), "type", "action", "desc", null, Map.of());
172+
173+
if (shouldCancelTask) {
174+
TaskCancelHelper.cancel(cancellableTask, "simulated cancellation");
175+
}
176+
177+
action.buildResponse(
178+
SnapshotsInProgress.EMPTY,
179+
new SnapshotsStatusRequest(TEST_REQUEST_TIMEOUT),
180+
currentSnapshotEntries,
181+
nodeSnapshotStatuses,
182+
cancellableTask,
183+
ActionListener.runAfter(listener, () -> listenerInvoked.set(true))
184+
);
185+
assertTrue("Expected listener to be invoked", listenerInvoked.get());
186+
}
187+
}

0 commit comments

Comments
 (0)