|
17 | 17 | import org.elasticsearch.action.ActionRequestValidationException; |
18 | 18 | import org.elasticsearch.action.ActionResponse; |
19 | 19 | import org.elasticsearch.action.ActionType; |
20 | | -import org.elasticsearch.action.DelegatingActionListener; |
21 | 20 | import org.elasticsearch.action.LatchedActionListener; |
22 | 21 | import org.elasticsearch.action.LegacyActionRequest; |
23 | 22 | import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; |
|
27 | 26 | import org.elasticsearch.action.support.PlainActionFuture; |
28 | 27 | import org.elasticsearch.client.internal.node.NodeClient; |
29 | 28 | import org.elasticsearch.cluster.node.DiscoveryNode; |
30 | | -import org.elasticsearch.common.Strings; |
31 | 29 | import org.elasticsearch.common.io.stream.StreamInput; |
32 | 30 | import org.elasticsearch.common.io.stream.StreamOutput; |
33 | 31 | import org.elasticsearch.common.util.CollectionUtils; |
|
36 | 34 | import org.elasticsearch.common.util.set.Sets; |
37 | 35 | import org.elasticsearch.core.TimeValue; |
38 | 36 | import org.elasticsearch.injection.guice.Inject; |
39 | | -import org.elasticsearch.logging.LogManager; |
40 | | -import org.elasticsearch.logging.Logger; |
41 | 37 | import org.elasticsearch.plugins.ActionPlugin; |
42 | 38 | import org.elasticsearch.plugins.Plugin; |
43 | 39 | import org.elasticsearch.tasks.CancellableTask; |
|
47 | 43 | import org.elasticsearch.tasks.TaskInfo; |
48 | 44 | import org.elasticsearch.tasks.TaskManager; |
49 | 45 | import org.elasticsearch.test.ESIntegTestCase; |
50 | | -import org.elasticsearch.test.junit.annotations.TestIssueLogging; |
51 | 46 | import org.elasticsearch.threadpool.ThreadPool; |
52 | 47 | import org.elasticsearch.transport.ReceiveTimeoutTransportException; |
53 | 48 | import org.elasticsearch.transport.SendRequestTransportException; |
|
79 | 74 | @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) |
80 | 75 | public class CancellableTasksIT extends ESIntegTestCase { |
81 | 76 |
|
82 | | - // Temporary addition for investigation into https://github.com/elastic/elasticsearch/issues/123568 |
83 | | - private static final Logger logger = LogManager.getLogger(CancellableTasksIT.class); |
84 | | - |
85 | 77 | static int idGenerator = 0; |
86 | 78 | static final Map<TestRequest, CountDownLatch> beforeSendLatches = ConcurrentCollections.newConcurrentMap(); |
87 | 79 | static final Map<TestRequest, CountDownLatch> arrivedLatches = ConcurrentCollections.newConcurrentMap(); |
@@ -374,42 +366,18 @@ public void testRemoveBanParentsOnDisconnect() throws Exception { |
374 | 366 | } |
375 | 367 | } |
376 | 368 |
|
377 | | - @TestIssueLogging( |
378 | | - issueUrl = "https://github.com/elastic/elasticsearch/issues/123568", |
379 | | - value = "org.elasticsearch.transport.TransportService.tracer:TRACE" |
380 | | - + ",org.elasticsearch.tasks.TaskManager:TRACE" |
381 | | - + ",org.elasticsearch.action.admin.cluster.node.tasks.CancellableTasksIT:DEBUG" |
382 | | - ) |
383 | 369 | public void testChildrenTasksCancelledOnTimeout() throws Exception { |
384 | 370 | Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet()); |
385 | 371 | final TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 4), true); |
386 | | - logger.info("generated request\n{}", buildTestRequestDescription(rootRequest, "", new StringBuilder()).toString()); |
387 | 372 | ActionFuture<TestResponse> rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest); |
388 | | - logger.info("action executed"); |
389 | 373 | allowEntireRequest(rootRequest); |
390 | | - logger.info("execution released"); |
391 | 374 | waitForRootTask(rootTaskFuture, true); |
392 | | - logger.info("root task completed"); |
393 | 375 | ensureBansAndCancellationsConsistency(); |
394 | | - logger.info("ensureBansAndCancellationsConsistency completed"); |
395 | 376 |
|
396 | 377 | // Make sure all descendent requests have completed |
397 | 378 | for (TestRequest subRequest : rootRequest.descendants()) { |
398 | | - logger.info("awaiting completion of request {}", subRequest.getDescription()); |
399 | 379 | safeAwait(completedLatches.get(subRequest)); |
400 | 380 | } |
401 | | - logger.info("all requests completed"); |
402 | | - } |
403 | | - |
404 | | - // Temporary addition for investigation into https://github.com/elastic/elasticsearch/issues/123568 |
405 | | - static StringBuilder buildTestRequestDescription(TestRequest request, String prefix, StringBuilder stringBuilder) { |
406 | | - stringBuilder.append(prefix) |
407 | | - .append(Strings.format("id=%d [timeout=%s] %s", request.id, request.timeout, request.node.descriptionWithoutAttributes())) |
408 | | - .append('\n'); |
409 | | - for (TestRequest subRequest : request.subRequests) { |
410 | | - buildTestRequestDescription(subRequest, prefix + " ", stringBuilder); |
411 | | - } |
412 | | - return stringBuilder; |
413 | 381 | } |
414 | 382 |
|
415 | 383 | static TaskId getRootTaskId(TestRequest request) throws Exception { |
@@ -538,8 +506,6 @@ public void writeTo(StreamOutput out) throws IOException { |
538 | 506 |
|
539 | 507 | public static class TransportTestAction extends HandledTransportAction<TestRequest, TestResponse> { |
540 | 508 |
|
541 | | - private static final Logger logger = CancellableTasksIT.logger; |
542 | | - |
543 | 509 | public static ActionType<TestResponse> ACTION = new ActionType<>("internal::test_action"); |
544 | 510 | private final TransportService transportService; |
545 | 511 | private final NodeClient client; |
@@ -599,22 +565,7 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp |
599 | 565 | protected void startSubTask(TaskId parentTaskId, TestRequest subRequest, ActionListener<TestResponse> listener) { |
600 | 566 | subRequest.setParentTask(parentTaskId); |
601 | 567 | CountDownLatch completeLatch = completedLatches.get(subRequest); |
602 | | - ActionListener<TestResponse> latchedListener = new DelegatingActionListener<>( |
603 | | - new LatchedActionListener<>(listener, completeLatch) |
604 | | - ) { |
605 | | - // Temporary logging addition for investigation into https://github.com/elastic/elasticsearch/issues/123568 |
606 | | - @Override |
607 | | - public void onResponse(TestResponse testResponse) { |
608 | | - logger.debug("processing successful response to request [{}]", subRequest.getDescription()); |
609 | | - delegate.onResponse(testResponse); |
610 | | - } |
611 | | - |
612 | | - @Override |
613 | | - public void onFailure(Exception e) { |
614 | | - logger.debug("processing exceptional response to request [{}]: {}", subRequest.getDescription(), e.getMessage()); |
615 | | - super.onFailure(e); |
616 | | - } |
617 | | - }; |
| 568 | + LatchedActionListener<TestResponse> latchedListener = new LatchedActionListener<>(listener, completeLatch); |
618 | 569 | transportService.getThreadPool().generic().execute(new AbstractRunnable() { |
619 | 570 | @Override |
620 | 571 | public void onFailure(Exception e) { |
@@ -645,13 +596,6 @@ protected void doRun() throws Exception { |
645 | 596 | TransportResponseHandler.TRANSPORT_WORKER |
646 | 597 | ) |
647 | 598 | ); |
648 | | - // Temporary addition for investigation into https://github.com/elastic/elasticsearch/issues/123568 |
649 | | - logger.debug( |
650 | | - "sent test request [{}] from [{}] to [{}]", |
651 | | - subRequest.getDescription(), |
652 | | - client.getLocalNodeId(), |
653 | | - subRequest.node.descriptionWithoutAttributes() |
654 | | - ); |
655 | 599 | } |
656 | 600 | } |
657 | 601 | }); |
|
0 commit comments