diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java index 07eda61784c8f..7608b2b7c3cd8 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java @@ -56,8 +56,8 @@ public boolean shouldEnable() { } @Override - public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map headers) { - return new CancellableTask(id, type, action, "Logs streams activation toggle request", parentTaskId, headers); + public Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(taskId.getId(), type, action, "Logs streams activation toggle request", parentTaskId, headers); } } } diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java index aa19c794dfb6a..cb3af9e3a4169 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.discovery.SeedHostsResolver; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.netty4.Netty4Transport; import org.elasticsearch.transport.netty4.SharedGroupFactory; import org.hamcrest.Matchers; @@ -47,8 +46,7 @@ public class EC2RetriesTests extends AbstractEC2MockAPITestCase { @Override protected MockTransportService createTransportService() { - return new MockTransportService( - Settings.EMPTY, + return MockTransportService.createMockTransportService( new Netty4Transport( Settings.EMPTY, TransportVersion.current(), @@ -59,9 +57,7 @@ protected MockTransportService createTransportService() { new NoneCircuitBreakerService(), new SharedGroupFactory(Settings.EMPTY) ), - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - null + threadPool ); } diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java index cbdbfab3ff41e..38f2ea6f0d768 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java @@ -78,7 +78,7 @@ public TransportAddress[] addressesFromString(String address) { return new TransportAddress[] { poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress()) }; } }; - return new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + return MockTransportService.createMockTransportService(transport, threadPool); } protected List buildDynamicHosts(Settings nodeSettings, int nodes) { diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 48ad9efa4510b..aaaf5383ba80f 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -685,6 +685,7 @@ private void construct( modules.bindToInstance(Tracer.class, telemetryProvider.getTracer()); + assert nodeEnvironment.nodeId() != null : "node ID must be set before constructing the Node"; TaskManager taskManager = new TaskManager( settings, threadPool, @@ -692,7 +693,8 @@ private void construct( pluginsService.filterPlugins(ActionPlugin.class).flatMap(p -> p.getTaskHeaders().stream()), Task.HEADERS_TO_COPY.stream() ).collect(Collectors.toSet()), - telemetryProvider.getTracer() + telemetryProvider.getTracer(), + nodeEnvironment.nodeId() ); ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager); @@ -1097,7 +1099,8 @@ public Map queryFields() { localNodeFactory, settingsModule.getClusterSettings(), taskManager, - telemetryProvider.getTracer() + telemetryProvider.getTracer(), + nodeEnvironment.nodeId() ); final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry()); final SearchTransportService searchTransportService = new SearchTransportService( diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 53c26dc679677..8f013a8ee7dde 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -114,7 +114,8 @@ TransportService newTransportService( Function localNodeFactory, ClusterSettings clusterSettings, TaskManager taskManager, - Tracer tracer + Tracer tracer, + String nodeId ) { return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskManager, tracer); } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java b/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java index de81e9a1a96ef..3775282c8a3dd 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java @@ -9,8 +9,6 @@ package org.elasticsearch.tasks; -import org.elasticsearch.core.Nullable; - import java.util.Map; /** @@ -57,18 +55,8 @@ default Task createTask(long id, String type, String action, TaskId parentTaskId /** * Returns the task object that should be used to keep track of the processing of the request, with an extra local node ID. */ - // TODO remove the above overload, use only this one. - default Task createTask( - // TODO this is only nullable in tests, where the MockNode does not guarantee the localNodeId is set before calling this method. We - // We should fix the tests, and replace this and id with TaskId instead. - @Nullable String localNodeId, - long id, - String type, - String action, - TaskId parentTaskId, - Map headers - ) { - return createTask(id, type, action, parentTaskId, headers); + default Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map headers) { + return createTask(taskId.getId(), type, action, parentTaskId, headers); } /** diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index 249bb1d43119c..1b9b682666158 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -80,6 +81,7 @@ public class TaskManager implements ClusterStateApplier { private TaskResultsService taskResultsService; + private final String nodeId; private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES; private final Tracer tracer; @@ -95,11 +97,19 @@ public TaskManager(Settings settings, ThreadPool threadPool, Set taskHea this(settings, threadPool, taskHeaders, Tracer.NOOP); } + // For testing (especially the creating a random node ID, which some tests rely on) public TaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders, Tracer tracer) { + this(settings, threadPool, taskHeaders, tracer, UUIDs.randomBase64UUID()); + } + + // TODO Both of the above overloads should be moved to the test package. + + public TaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders, Tracer tracer, String nodeId) { this.threadPool = threadPool; this.taskHeaders = Set.copyOf(taskHeaders); this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); this.tracer = tracer; + this.nodeId = nodeId; } public void setTaskResultsService(TaskResultsService taskResultsService) { @@ -142,8 +152,7 @@ public Task register(String type, String action, TaskAwareRequest request, boole } } Task task = request.createTask( - lastDiscoveryNodes.getLocalNodeId(), - taskIdGenerator.incrementAndGet(), + new TaskId(nodeId, taskIdGenerator.incrementAndGet()), type, action, request.getParentTask(), diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 2ddcaa5d97033..a5b85a87d5573 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -177,7 +177,7 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) { discoveryNode.set(DiscoveryNodeUtils.create(name, address.publishAddress(), emptyMap(), emptySet())); return discoveryNode.get(); }; - TaskManager taskManager = createTaskManager(settings, threadPool, emptySet(), Tracer.NOOP); + TaskManager taskManager = createTaskManager(settings, threadPool, emptySet(), Tracer.NOOP, name); transportService = new TransportService( settings, new Netty4Transport( diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index 3491fb0a706b3..a50d598743497 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -123,7 +123,8 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, - Collections.emptySet() + Collections.emptySet(), + clusterService.localNode().getId() ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 233138c8eedbb..abacd386ea905 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1316,7 +1316,9 @@ public void testRetryOnReplicaWithRealTransport() throws Exception { TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, - Collections.emptySet() + Collections.emptySet(), + clusterService.localNode().getId() + ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/discovery/FileBasedSeedHostsProviderTests.java b/server/src/test/java/org/elasticsearch/discovery/FileBasedSeedHostsProviderTests.java index eb8623595613b..c387a0105180b 100644 --- a/server/src/test/java/org/elasticsearch/discovery/FileBasedSeedHostsProviderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/FileBasedSeedHostsProviderTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.netty4.Netty4Transport; import org.elasticsearch.transport.netty4.SharedGroupFactory; import org.junit.After; @@ -87,13 +86,7 @@ public BoundTransportAddress boundAddress() { ); } }; - transportService = new MockTransportService( - Settings.EMPTY, - transport, - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - null - ); + transportService = MockTransportService.createMockTransportService(transport, threadPool); } public void testBuildDynamicNodes() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index d92e291712388..0ae2f78db3006 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -92,7 +92,8 @@ private TransportService startServices( .version(nodeVersion) .build(), null, - Collections.emptySet() + Collections.emptySet(), + nodeNameAndId ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 9a99b5c881941..4c2e0a3c6c047 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -167,8 +167,10 @@ protected TransportService newTransportService( Function localNodeFactory, ClusterSettings clusterSettings, TaskManager taskManager, - Tracer tracer + Tracer tracer, + String nodeId ) { + // we use the MockTransportService.TestPlugin class as a marker to create a network // module with this MockNetworkService. NetworkService is such an integral part of the systme // we don't allow to plug it in from plugins or anything. this is a test-only override and @@ -183,7 +185,8 @@ protected TransportService newTransportService( localNodeFactory, clusterSettings, taskManager, - tracer + tracer, + nodeId ); } else { return new MockTransportService( @@ -193,7 +196,8 @@ protected TransportService newTransportService( interceptor, localNodeFactory, clusterSettings, - taskManager.getTaskHeaders() + taskManager.getTaskHeaders(), + nodeId ); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index e0936702c0487..75b94e46ae522 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -160,7 +160,7 @@ public static ClusterService createClusterService( settings, clusterSettings, threadPool, - new TaskManager(settings, threadPool, Collections.emptySet(), Tracer.NOOP) + new TaskManager(settings, threadPool, Collections.emptySet(), Tracer.NOOP, localNode.getId()) ); clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService()); ClusterState.Builder builder = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 3f6cf453fd0d1..bf96ac0eb2711 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -188,12 +188,13 @@ public static MockTransportService createNewService( Set taskHeaders, TransportInterceptor interceptor ) { + String nodeId = UUIDs.randomBase64UUID(); return new MockTransportService( settings, new StubbableTransport(transport), threadPool, interceptor, - boundAddress -> DiscoveryNodeUtils.builder(UUIDs.randomBase64UUID()) + boundAddress -> DiscoveryNodeUtils.builder(nodeId) .name(Node.NODE_NAME_SETTING.get(settings)) .address(boundAddress.publishAddress()) .attributes(Node.NODE_ATTRIBUTES.getAsMap(settings)) @@ -201,7 +202,7 @@ public static MockTransportService createNewService( .version(version) .build(), clusterSettings, - createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP) + createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP, nodeId) ); } @@ -216,31 +217,17 @@ public static MockTransportService getInstance(String nodeName) { private final Transport original; private final EsThreadPoolExecutor testExecutor; - /** - * Build the service. - * - * @param clusterSettings if non null the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings - * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and - * {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}. - */ - public MockTransportService( - Settings settings, - Transport transport, - ThreadPool threadPool, - TransportInterceptor interceptor, - @Nullable ClusterSettings clusterSettings - ) { - this( - settings, + /** Build the service. */ + public static MockTransportService createMockTransportService(Transport transport, ThreadPool threadPool) { + String nodeId = UUIDs.randomBase64UUID(); + return new MockTransportService( + Settings.EMPTY, new StubbableTransport(transport), threadPool, - interceptor, - (boundAddress) -> DiscoveryNodeUtils.builder(settings.get(Node.NODE_NAME_SETTING.getKey(), UUIDs.randomBase64UUID())) - .applySettings(settings) - .address(boundAddress.publishAddress()) - .build(), - clusterSettings, - createTaskManager(settings, threadPool, Set.of(), Tracer.NOOP) + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + (boundAddress) -> DiscoveryNodeUtils.builder(nodeId).address(boundAddress.publishAddress()).build(), + null, // clusterSettings + createTaskManager(Settings.EMPTY, threadPool, Set.of(), Tracer.NOOP, nodeId) ); } @@ -258,7 +245,8 @@ public MockTransportService( TransportInterceptor interceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, - Set taskHeaders + Set taskHeaders, + String nodeId ) { this( settings, @@ -267,7 +255,7 @@ public MockTransportService( interceptor, localNodeFactory, clusterSettings, - createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP) + createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP, nodeId) ); } @@ -277,7 +265,8 @@ public MockTransportService( ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, - @Nullable ClusterSettings clusterSettings + @Nullable ClusterSettings clusterSettings, + String nodeId ) { this( settings, @@ -286,7 +275,7 @@ public MockTransportService( interceptor, localNodeFactory, clusterSettings, - createTaskManager(settings, threadPool, Set.of(), Tracer.NOOP) + createTaskManager(settings, threadPool, Set.of(), Tracer.NOOP, nodeId) ); } @@ -331,19 +320,28 @@ private static TransportAddress[] extractTransportAddresses(TransportService tra return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]); } - public static TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders, Tracer tracer) { - if (MockTaskManager.SPY_TASK_MANAGER_SETTING.get(settings)) { - return spy(createMockTaskManager(settings, threadPool, taskHeaders, tracer)); - } else { - return createMockTaskManager(settings, threadPool, taskHeaders, tracer); - } + public static TaskManager createTaskManager( + Settings settings, + ThreadPool threadPool, + Set taskHeaders, + Tracer tracer, + String nodeId + ) { + TaskManager mockTaskManager = createMockTaskManager(settings, threadPool, taskHeaders, tracer, nodeId); + return MockTaskManager.SPY_TASK_MANAGER_SETTING.get(settings) ? spy(mockTaskManager) : mockTaskManager; } - private static TaskManager createMockTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders, Tracer tracer) { + private static TaskManager createMockTaskManager( + Settings settings, + ThreadPool threadPool, + Set taskHeaders, + Tracer tracer, + String nodeId + ) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return new TaskManager(settings, threadPool, taskHeaders, tracer); + return new TaskManager(settings, threadPool, taskHeaders, tracer, nodeId); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java index cb1b9c4cc655d..ffc0c1e670ed5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java @@ -248,9 +248,9 @@ public EsqlQueryRequest allowPartialResults(boolean allowPartialResults) { } @Override - public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map headers) { - var status = new EsqlQueryStatus(new AsyncExecutionId(UUIDs.randomBase64UUID(), new TaskId(localNodeId, id))); - return new EsqlQueryRequestTask(query, id, type, action, parentTaskId, headers, status); + public Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map headers) { + var status = new EsqlQueryStatus(new AsyncExecutionId(UUIDs.randomBase64UUID(), taskId)); + return new EsqlQueryRequestTask(query, taskId.getId(), type, action, parentTaskId, headers, status); } private static class EsqlQueryRequestTask extends CancellableTask { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestTests.java index 16547ed0bfb3a..00ac4837a1fe7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestTests.java @@ -611,7 +611,7 @@ public void testTask() throws IOException { EsqlQueryRequest request = parseEsqlQueryRequestSync(requestJson); String localNode = randomAlphaOfLength(2); - Task task = request.createTask(localNode, id, "transport", EsqlQueryAction.NAME, TaskId.EMPTY_TASK_ID, Map.of()); + Task task = request.createTask(new TaskId(localNode, id), "transport", EsqlQueryAction.NAME, TaskId.EMPTY_TASK_ID, Map.of()); assertThat(task.getDescription(), equalTo(query)); TaskInfo taskInfo = task.taskInfo(localNode, true);