|
17 | 17 | import org.elasticsearch.action.ActionRequestValidationException; |
18 | 18 | import org.elasticsearch.action.ActionResponse; |
19 | 19 | import org.elasticsearch.action.ActionType; |
| 20 | +import org.elasticsearch.action.DelegatingActionListener; |
20 | 21 | import org.elasticsearch.action.LatchedActionListener; |
21 | 22 | import org.elasticsearch.action.LegacyActionRequest; |
22 | 23 | import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; |
|
26 | 27 | import org.elasticsearch.action.support.PlainActionFuture; |
27 | 28 | import org.elasticsearch.client.internal.node.NodeClient; |
28 | 29 | import org.elasticsearch.cluster.node.DiscoveryNode; |
| 30 | +import org.elasticsearch.common.Strings; |
29 | 31 | import org.elasticsearch.common.io.stream.StreamInput; |
30 | 32 | import org.elasticsearch.common.io.stream.StreamOutput; |
31 | 33 | import org.elasticsearch.common.util.CollectionUtils; |
|
34 | 36 | import org.elasticsearch.common.util.set.Sets; |
35 | 37 | import org.elasticsearch.core.TimeValue; |
36 | 38 | import org.elasticsearch.injection.guice.Inject; |
| 39 | +import org.elasticsearch.logging.LogManager; |
| 40 | +import org.elasticsearch.logging.Logger; |
37 | 41 | import org.elasticsearch.plugins.ActionPlugin; |
38 | 42 | import org.elasticsearch.plugins.Plugin; |
39 | 43 | import org.elasticsearch.tasks.CancellableTask; |
|
43 | 47 | import org.elasticsearch.tasks.TaskInfo; |
44 | 48 | import org.elasticsearch.tasks.TaskManager; |
45 | 49 | import org.elasticsearch.test.ESIntegTestCase; |
| 50 | +import org.elasticsearch.test.junit.annotations.TestIssueLogging; |
46 | 51 | import org.elasticsearch.threadpool.ThreadPool; |
47 | 52 | import org.elasticsearch.transport.ReceiveTimeoutTransportException; |
48 | 53 | import org.elasticsearch.transport.SendRequestTransportException; |
|
74 | 79 | @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) |
75 | 80 | public class CancellableTasksIT extends ESIntegTestCase { |
76 | 81 |
|
| 82 | + // Temporary addition for investigation into https://github.com/elastic/elasticsearch/issues/123568 |
| 83 | + private static final Logger logger = LogManager.getLogger(CancellableTasksIT.class); |
| 84 | + |
77 | 85 | static int idGenerator = 0; |
78 | 86 | static final Map<TestRequest, CountDownLatch> beforeSendLatches = ConcurrentCollections.newConcurrentMap(); |
79 | 87 | static final Map<TestRequest, CountDownLatch> arrivedLatches = ConcurrentCollections.newConcurrentMap(); |
@@ -366,18 +374,42 @@ public void testRemoveBanParentsOnDisconnect() throws Exception { |
366 | 374 | } |
367 | 375 | } |
368 | 376 |
|
| 377 | + @TestIssueLogging( |
| 378 | + issueUrl = "https://github.com/elastic/elasticsearch/issues/123568", |
| 379 | + value = "org.elasticsearch.transport.TransportService.tracer:TRACE" |
| 380 | + + ",org.elasticsearch.tasks.TaskManager:TRACE" |
| 381 | + + ",org.elasticsearch.action.admin.cluster.node.tasks.CancellableTasksIT:DEBUG" |
| 382 | + ) |
369 | 383 | public void testChildrenTasksCancelledOnTimeout() throws Exception { |
370 | 384 | Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet()); |
371 | 385 | final TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 4), true); |
| 386 | + logger.info("generated request\n{}", buildTestRequestDescription(rootRequest, "", new StringBuilder()).toString()); |
372 | 387 | ActionFuture<TestResponse> rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest); |
| 388 | + logger.info("action executed"); |
373 | 389 | allowEntireRequest(rootRequest); |
| 390 | + logger.info("execution released"); |
374 | 391 | waitForRootTask(rootTaskFuture, true); |
| 392 | + logger.info("root task completed"); |
375 | 393 | ensureBansAndCancellationsConsistency(); |
| 394 | + logger.info("ensureBansAndCancellationsConsistency completed"); |
376 | 395 |
|
377 | 396 | // Make sure all descendent requests have completed |
378 | 397 | for (TestRequest subRequest : rootRequest.descendants()) { |
| 398 | + logger.info("awaiting completion of request {}", subRequest.getDescription()); |
379 | 399 | safeAwait(completedLatches.get(subRequest)); |
380 | 400 | } |
| 401 | + logger.info("all requests completed"); |
| 402 | + } |
| 403 | + |
| 404 | + // Temporary addition for investigation into https://github.com/elastic/elasticsearch/issues/123568 |
| 405 | + static StringBuilder buildTestRequestDescription(TestRequest request, String prefix, StringBuilder stringBuilder) { |
| 406 | + stringBuilder.append(prefix) |
| 407 | + .append(Strings.format("id=%d [timeout=%s] %s", request.id, request.timeout, request.node.descriptionWithoutAttributes())) |
| 408 | + .append('\n'); |
| 409 | + for (TestRequest subRequest : request.subRequests) { |
| 410 | + buildTestRequestDescription(subRequest, prefix + " ", stringBuilder); |
| 411 | + } |
| 412 | + return stringBuilder; |
381 | 413 | } |
382 | 414 |
|
383 | 415 | static TaskId getRootTaskId(TestRequest request) throws Exception { |
@@ -506,6 +538,8 @@ public void writeTo(StreamOutput out) throws IOException { |
506 | 538 |
|
507 | 539 | public static class TransportTestAction extends HandledTransportAction<TestRequest, TestResponse> { |
508 | 540 |
|
| 541 | + private static final Logger logger = CancellableTasksIT.logger; |
| 542 | + |
509 | 543 | public static ActionType<TestResponse> ACTION = new ActionType<>("internal::test_action"); |
510 | 544 | private final TransportService transportService; |
511 | 545 | private final NodeClient client; |
@@ -565,7 +599,22 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp |
565 | 599 | protected void startSubTask(TaskId parentTaskId, TestRequest subRequest, ActionListener<TestResponse> listener) { |
566 | 600 | subRequest.setParentTask(parentTaskId); |
567 | 601 | CountDownLatch completeLatch = completedLatches.get(subRequest); |
568 | | - LatchedActionListener<TestResponse> latchedListener = new LatchedActionListener<>(listener, completeLatch); |
| 602 | + ActionListener<TestResponse> latchedListener = new DelegatingActionListener<>( |
| 603 | + new LatchedActionListener<>(listener, completeLatch) |
| 604 | + ) { |
| 605 | + // Temporary logging addition for investigation into https://github.com/elastic/elasticsearch/issues/123568 |
| 606 | + @Override |
| 607 | + public void onResponse(TestResponse testResponse) { |
| 608 | + logger.debug("processing successful response to request [{}]", subRequest.getDescription()); |
| 609 | + delegate.onResponse(testResponse); |
| 610 | + } |
| 611 | + |
| 612 | + @Override |
| 613 | + public void onFailure(Exception e) { |
| 614 | + logger.debug("processing exceptional response to request [{}]: {}", subRequest.getDescription(), e.getMessage()); |
| 615 | + super.onFailure(e); |
| 616 | + } |
| 617 | + }; |
569 | 618 | transportService.getThreadPool().generic().execute(new AbstractRunnable() { |
570 | 619 | @Override |
571 | 620 | public void onFailure(Exception e) { |
@@ -596,6 +645,13 @@ protected void doRun() throws Exception { |
596 | 645 | TransportResponseHandler.TRANSPORT_WORKER |
597 | 646 | ) |
598 | 647 | ); |
| 648 | + // Temporary addition for investigation into https://github.com/elastic/elasticsearch/issues/123568 |
| 649 | + logger.debug( |
| 650 | + "sent test request [{}] from [{}] to [{}]", |
| 651 | + subRequest.getDescription(), |
| 652 | + client.getLocalNodeId(), |
| 653 | + subRequest.node.descriptionWithoutAttributes() |
| 654 | + ); |
599 | 655 | } |
600 | 656 | } |
601 | 657 | }); |
|
0 commit comments