Skip to content

Commit 5a2cef5

Browse files
Fix task cancellation on remote cluster when original request fails (#109440) (#109484)
Fixes a bug where the task on the remote cluster node is not cancelled when the original request (that started the task) fails (returns an exception).
1 parent fcb713e commit 5a2cef5

File tree

6 files changed

+119
-9
lines changed

6 files changed

+119
-9
lines changed

docs/changelog/109440.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 109440
2+
summary: Fix task cancellation on remote cluster when original request fails
3+
area: Network
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ public void cancelChildRemote(TaskId parentTask, long childRequestId, Transport.
439439
reason
440440
);
441441
final CancelChildRequest request = CancelChildRequest.createCancelChildRequest(parentTask, childRequestId, reason);
442-
transportService.sendRequest(childNode, CANCEL_CHILD_ACTION_NAME, request, TransportRequestOptions.EMPTY, NOOP_HANDLER);
442+
transportService.sendRequest(childConnection, CANCEL_CHILD_ACTION_NAME, request, TransportRequestOptions.EMPTY, NOOP_HANDLER);
443443
}
444444
}
445445

server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import static java.util.Collections.emptySet;
6262
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
6363
import static org.elasticsearch.test.ClusterServiceUtils.setState;
64+
import static org.elasticsearch.test.transport.MockTransportService.createTaskManager;
6465

6566
/**
6667
* The test case for unit testing task manager and related transport actions
@@ -176,12 +177,7 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) {
176177
discoveryNode.set(DiscoveryNodeUtils.create(name, address.publishAddress(), emptyMap(), emptySet()));
177178
return discoveryNode.get();
178179
};
179-
TaskManager taskManager;
180-
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
181-
taskManager = new MockTaskManager(settings, threadPool, emptySet());
182-
} else {
183-
taskManager = new TaskManager(settings, threadPool, emptySet());
184-
}
180+
TaskManager taskManager = createTaskManager(settings, threadPool, emptySet(), Tracer.NOOP);
185181
transportService = new TransportService(
186182
settings,
187183
new Netty4Transport(

server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import org.elasticsearch.common.util.concurrent.ThreadContext;
2323
import org.elasticsearch.core.TimeValue;
2424
import org.elasticsearch.index.query.MatchAllQueryBuilder;
25+
import org.elasticsearch.tasks.Task;
26+
import org.elasticsearch.tasks.TaskCancellationService;
27+
import org.elasticsearch.tasks.TaskId;
2528
import org.elasticsearch.test.ESTestCase;
2629
import org.elasticsearch.test.transport.MockTransportService;
2730
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
@@ -31,11 +34,19 @@
3134
import java.util.Collections;
3235
import java.util.List;
3336
import java.util.concurrent.CopyOnWriteArrayList;
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.ExecutionException;
3439
import java.util.concurrent.ExecutorService;
3540
import java.util.concurrent.Executors;
3641
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.atomic.AtomicLong;
3743

44+
import static org.elasticsearch.test.tasks.MockTaskManager.SPY_TASK_MANAGER_SETTING;
3845
import static org.hamcrest.Matchers.equalTo;
46+
import static org.hamcrest.Matchers.instanceOf;
47+
import static org.mockito.ArgumentMatchers.anyString;
48+
import static org.mockito.ArgumentMatchers.eq;
49+
import static org.mockito.Mockito.verify;
3950

4051
public class RemoteClusterAwareClientTests extends ESTestCase {
4152

@@ -62,6 +73,89 @@ private MockTransportService startTransport(String id, List<DiscoveryNode> known
6273
);
6374
}
6475

76+
public void testRemoteTaskCancellationOnFailedResponse() throws Exception {
77+
Settings.Builder remoteTransportSettingsBuilder = Settings.builder();
78+
remoteTransportSettingsBuilder.put(SPY_TASK_MANAGER_SETTING.getKey(), true);
79+
try (
80+
MockTransportService remoteTransport = RemoteClusterConnectionTests.startTransport(
81+
"seed_node",
82+
new CopyOnWriteArrayList<>(),
83+
VersionInformation.CURRENT,
84+
TransportVersion.current(),
85+
threadPool,
86+
remoteTransportSettingsBuilder.build()
87+
)
88+
) {
89+
remoteTransport.getTaskManager().setTaskCancellationService(new TaskCancellationService(remoteTransport));
90+
Settings.Builder builder = Settings.builder();
91+
builder.putList("cluster.remote.cluster1.seeds", remoteTransport.getLocalDiscoNode().getAddress().toString());
92+
try (
93+
MockTransportService localService = MockTransportService.createNewService(
94+
builder.build(),
95+
VersionInformation.CURRENT,
96+
TransportVersion.current(),
97+
threadPool,
98+
null
99+
)
100+
) {
101+
// the TaskCancellationService references the same TransportService instance
102+
// this is identically to how it works in the Node constructor
103+
localService.getTaskManager().setTaskCancellationService(new TaskCancellationService(localService));
104+
localService.start();
105+
localService.acceptIncomingRequests();
106+
107+
SearchShardsRequest searchShardsRequest = new SearchShardsRequest(
108+
new String[] { "test-index" },
109+
IndicesOptions.strictExpandOpen(),
110+
new MatchAllQueryBuilder(),
111+
null,
112+
"index_not_found", // this request must fail
113+
randomBoolean(),
114+
null
115+
);
116+
Task parentTask = localService.getTaskManager().register("test_type", "test_action", searchShardsRequest);
117+
TaskId parentTaskId = new TaskId("test-mock-node-id", parentTask.getId());
118+
searchShardsRequest.setParentTask(parentTaskId);
119+
var client = new RemoteClusterAwareClient(
120+
localService,
121+
"cluster1",
122+
threadPool.executor(TEST_THREAD_POOL_NAME),
123+
randomBoolean()
124+
);
125+
126+
CountDownLatch cancelChildReceived = new CountDownLatch(1);
127+
remoteTransport.addRequestHandlingBehavior(
128+
TaskCancellationService.CANCEL_CHILD_ACTION_NAME,
129+
(handler, request, channel, task) -> {
130+
handler.messageReceived(request, channel, task);
131+
cancelChildReceived.countDown();
132+
}
133+
);
134+
AtomicLong searchShardsRequestId = new AtomicLong(-1);
135+
CountDownLatch cancelChildSent = new CountDownLatch(1);
136+
localService.addSendBehavior(remoteTransport, (connection, requestId, action, request, options) -> {
137+
connection.sendRequest(requestId, action, request, options);
138+
if (action.equals("indices:admin/search/search_shards")) {
139+
searchShardsRequestId.set(requestId);
140+
} else if (action.equals(TaskCancellationService.CANCEL_CHILD_ACTION_NAME)) {
141+
cancelChildSent.countDown();
142+
}
143+
});
144+
145+
// assert original request failed
146+
var future = new PlainActionFuture<SearchShardsResponse>();
147+
client.execute(TransportSearchShardsAction.REMOTE_TYPE, searchShardsRequest, future);
148+
ExecutionException e = expectThrows(ExecutionException.class, future::get);
149+
assertThat(e.getCause(), instanceOf(RemoteTransportException.class));
150+
151+
// assert remote task is cancelled
152+
safeAwait(cancelChildSent);
153+
safeAwait(cancelChildReceived);
154+
verify(remoteTransport.getTaskManager()).cancelChildLocal(eq(parentTaskId), eq(searchShardsRequestId.get()), anyString());
155+
}
156+
}
157+
}
158+
65159
public void testSearchShards() throws Exception {
66160
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
67161
try (

test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ public class MockTaskManager extends TaskManager {
3838
Property.NodeScope
3939
);
4040

41+
public static final Setting<Boolean> SPY_TASK_MANAGER_SETTING = Setting.boolSetting(
42+
"tests.spy.taskmanager.enabled",
43+
false,
44+
Property.NodeScope
45+
);
46+
4147
private final Collection<MockTaskManagerListener> listeners = new CopyOnWriteArrayList<>();
4248

4349
public MockTaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import java.util.function.Supplier;
7878

7979
import static org.junit.Assert.assertNotNull;
80+
import static org.mockito.Mockito.spy;
8081

8182
/**
8283
* A mock delegate service that allows to simulate different network topology failures.
@@ -99,7 +100,7 @@ public class MockTransportService extends TransportService {
99100
public static class TestPlugin extends Plugin {
100101
@Override
101102
public List<Setting<?>> getSettings() {
102-
return List.of(MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING);
103+
return List.of(MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING, MockTaskManager.SPY_TASK_MANAGER_SETTING);
103104
}
104105
}
105106

@@ -307,7 +308,15 @@ private static TransportAddress[] extractTransportAddresses(TransportService tra
307308
return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]);
308309
}
309310

310-
private static TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer) {
311+
public static TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer) {
312+
if (MockTaskManager.SPY_TASK_MANAGER_SETTING.get(settings)) {
313+
return spy(createMockTaskManager(settings, threadPool, taskHeaders, tracer));
314+
} else {
315+
return createMockTaskManager(settings, threadPool, taskHeaders, tracer);
316+
}
317+
}
318+
319+
private static TaskManager createMockTaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer) {
311320
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
312321
return new MockTaskManager(settings, threadPool, taskHeaders);
313322
} else {

0 commit comments

Comments
 (0)