Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public boolean shouldEnable() {
}

@Override
public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "Logs streams activation toggle request", parentTaskId, headers);
public Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(taskId.getId(), type, action, "Logs streams activation toggle request", parentTaskId, headers);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ protected MockTransportService createTransportService() {
),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
Copy link
Contributor Author

@GalLalouche GalLalouche Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed all clients passed the same Settings.EMPTY, interceptor, and null in the clusterSettings to this constructor (refactored to a factory method), so I've simplified the method by removing those parameters.

null,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,15 @@ public TransportAddress[] addressesFromString(String address) {
return new TransportAddress[] { poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress()) };
}
};
return new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
return new MockTransportService(
Settings.EMPTY,
transport,
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null,
null,
null
);
}

protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,14 +685,16 @@ private void construct(

modules.bindToInstance(Tracer.class, telemetryProvider.getTracer());

assert nodeEnvironment.nodeId() != null : "node ID must be set before constructing the Node";
TaskManager taskManager = new TaskManager(
settings,
threadPool,
Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).flatMap(p -> p.getTaskHeaders().stream()),
Task.HEADERS_TO_COPY.stream()
).collect(Collectors.toSet()),
telemetryProvider.getTracer()
telemetryProvider.getTracer(),
nodeEnvironment.nodeId()
);

ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
Expand Down Expand Up @@ -1097,7 +1099,8 @@ public Map<String, String> queryFields() {
localNodeFactory,
settingsModule.getClusterSettings(),
taskManager,
telemetryProvider.getTracer()
telemetryProvider.getTracer(),
nodeEnvironment.nodeId()
);
final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry());
final SearchTransportService searchTransportService = new SearchTransportService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ TransportService newTransportService(
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
TaskManager taskManager,
Tracer tracer
Tracer tracer,
String initialNodeId
) {
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskManager, tracer);
}
Expand Down
16 changes: 2 additions & 14 deletions server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

package org.elasticsearch.tasks;

import org.elasticsearch.core.Nullable;

import java.util.Map;

/**
Expand Down Expand Up @@ -57,18 +55,8 @@ default Task createTask(long id, String type, String action, TaskId parentTaskId
/**
* Returns the task object that should be used to keep track of the processing of the request, with an extra local node ID.
*/
// TODO remove the above overload, use only this one.
default Task createTask(
// TODO this is only nullable in tests, where the MockNode does not guarantee the localNodeId is set before calling this method. We
// We should fix the tests, and replace this and id with TaskId instead.
@Nullable String localNodeId,
long id,
String type,
String action,
TaskId parentTaskId,
Map<String, String> headers
) {
return createTask(id, type, action, parentTaskId, headers);
default Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return createTask(taskId.getId(), type, action, parentTaskId, headers);
}

/**
Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class TaskManager implements ClusterStateApplier {

private TaskResultsService taskResultsService;

private final String nodeId;
private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES;

private final Tracer tracer;
Expand All @@ -96,10 +98,15 @@ public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHea
}

public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please mention with a comment this is just for tests (creating the uuid). we should move all these uses to explicitly pass a test nodeid, but that can be a followup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

this(settings, threadPool, taskHeaders, tracer, UUIDs.randomBase64UUID());
}

public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer, @Nullable String nodeId) {
this.threadPool = threadPool;
this.taskHeaders = Set.copyOf(taskHeaders);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.tracer = tracer;
this.nodeId = nodeId;
}

public void setTaskResultsService(TaskResultsService taskResultsService) {
Expand Down Expand Up @@ -142,8 +149,7 @@ public Task register(String type, String action, TaskAwareRequest request, boole
}
}
Task task = request.createTask(
lastDiscoveryNodes.getLocalNodeId(),
taskIdGenerator.incrementAndGet(),
new TaskId(nodeId, taskIdGenerator.incrementAndGet()),
type,
action,
request.getParentTask(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) {
discoveryNode.set(DiscoveryNodeUtils.create(name, address.publishAddress(), emptyMap(), emptySet()));
return discoveryNode.get();
};
TaskManager taskManager = createTaskManager(settings, threadPool, emptySet(), Tracer.NOOP);
TaskManager taskManager = createTaskManager(settings, threadPool, emptySet(), Tracer.NOOP, name);
transportService = new TransportService(
settings,
new Netty4Transport(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception {
NOOP_TRANSPORT_INTERCEPTOR,
x -> clusterService.localNode(),
null,
Collections.emptySet()
Collections.emptySet(),
clusterService.localNode().getId()
);
transportService.start();
transportService.acceptIncomingRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,9 @@ public void testRetryOnReplicaWithRealTransport() throws Exception {
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> clusterService.localNode(),
null,
Collections.emptySet()
Collections.emptySet(),
clusterService.localNode().getId()

);
transportService.start();
transportService.acceptIncomingRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public BoundTransportAddress boundAddress() {
transport,
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ private TransportService startServices(
.version(nodeVersion)
.build(),
null,
Collections.emptySet()
Collections.emptySet(),
nodeNameAndId
);
transportService.start();
transportService.acceptIncomingRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,10 @@ protected TransportService newTransportService(
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
TaskManager taskManager,
Tracer tracer
Tracer tracer,
String initialNodeId
) {

// we use the MockTransportService.TestPlugin class as a marker to create a network
// module with this MockNetworkService. NetworkService is such an integral part of the systme
// we don't allow to plug it in from plugins or anything. this is a test-only override and
Expand All @@ -183,7 +185,8 @@ protected TransportService newTransportService(
localNodeFactory,
clusterSettings,
taskManager,
tracer
tracer,
initialNodeId
);
} else {
return new MockTransportService(
Expand All @@ -193,7 +196,8 @@ protected TransportService newTransportService(
interceptor,
localNodeFactory,
clusterSettings,
taskManager.getTaskHeaders()
taskManager.getTaskHeaders(),
initialNodeId
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public static ClusterService createClusterService(
settings,
clusterSettings,
threadPool,
new TaskManager(settings, threadPool, Collections.emptySet(), Tracer.NOOP)
new TaskManager(settings, threadPool, Collections.emptySet(), Tracer.NOOP, localNode.getId())
);
clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService());
ClusterState.Builder builder = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,21 @@ public static MockTransportService createNewService(
Set<String> taskHeaders,
TransportInterceptor interceptor
) {
String initialNodeId = UUIDs.randomBase64UUID();
return new MockTransportService(
settings,
new StubbableTransport(transport),
threadPool,
interceptor,
boundAddress -> DiscoveryNodeUtils.builder(UUIDs.randomBase64UUID())
boundAddress -> DiscoveryNodeUtils.builder(initialNodeId)
.name(Node.NODE_NAME_SETTING.get(settings))
.address(boundAddress.publishAddress())
.attributes(Node.NODE_ATTRIBUTES.getAsMap(settings))
.roles(DiscoveryNode.getRolesFromSettings(settings))
.version(version)
.build(),
clusterSettings,
createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP)
createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP, initialNodeId)
);
}

Expand All @@ -223,24 +224,25 @@ public static MockTransportService getInstance(String nodeName) {
* updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and
* {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
*/
public MockTransportService(
public static MockTransportService createMockTransportService(
Settings settings,
Transport transport,
ThreadPool threadPool,
TransportInterceptor interceptor,
@Nullable ClusterSettings clusterSettings
) {
this(
String initialNodeId = settings.get(Node.NODE_NAME_SETTING.getKey(), UUIDs.randomBase64UUID());
return new MockTransportService(
settings,
new StubbableTransport(transport),
threadPool,
interceptor,
(boundAddress) -> DiscoveryNodeUtils.builder(settings.get(Node.NODE_NAME_SETTING.getKey(), UUIDs.randomBase64UUID()))
(boundAddress) -> DiscoveryNodeUtils.builder(initialNodeId)
.applySettings(settings)
.address(boundAddress.publishAddress())
.build(),
clusterSettings,
createTaskManager(settings, threadPool, Set.of(), Tracer.NOOP)
createTaskManager(settings, threadPool, Set.of(), Tracer.NOOP, initialNodeId)
);
}

Expand All @@ -258,7 +260,8 @@ public MockTransportService(
TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders
Set<String> taskHeaders,
@Nullable String initialNodeId
) {
this(
settings,
Expand All @@ -267,7 +270,7 @@ public MockTransportService(
interceptor,
localNodeFactory,
clusterSettings,
createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP)
createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP, initialNodeId)
);
}

Expand All @@ -277,7 +280,8 @@ public MockTransportService(
ThreadPool threadPool,
TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings
@Nullable ClusterSettings clusterSettings,
String initialNodeId
) {
this(
settings,
Expand All @@ -286,7 +290,7 @@ public MockTransportService(
interceptor,
localNodeFactory,
clusterSettings,
createTaskManager(settings, threadPool, Set.of(), Tracer.NOOP)
createTaskManager(settings, threadPool, Set.of(), Tracer.NOOP, initialNodeId)
);
}

Expand Down Expand Up @@ -331,19 +335,28 @@ private static TransportAddress[] extractTransportAddresses(TransportService tra
return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]);
}

public static TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer) {
if (MockTaskManager.SPY_TASK_MANAGER_SETTING.get(settings)) {
return spy(createMockTaskManager(settings, threadPool, taskHeaders, tracer));
} else {
return createMockTaskManager(settings, threadPool, taskHeaders, tracer);
}
public static TaskManager createTaskManager(
Settings settings,
ThreadPool threadPool,
Set<String> taskHeaders,
Tracer tracer,
@Nullable String initialNodeId
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be nullable, and also not "initial"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I've went over and removed all new @Nullable, and renamed initialNodeId to nodeId.

) {
TaskManager mockTaskManager = createMockTaskManager(settings, threadPool, taskHeaders, tracer, initialNodeId);
return MockTaskManager.SPY_TASK_MANAGER_SETTING.get(settings) ? spy(mockTaskManager) : mockTaskManager;
}

private static TaskManager createMockTaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer) {
private static TaskManager createMockTaskManager(
Settings settings,
ThreadPool threadPool,
Set<String> taskHeaders,
Tracer tracer,
@Nullable String initialNodeId
) {
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
return new MockTaskManager(settings, threadPool, taskHeaders);
} else {
return new TaskManager(settings, threadPool, taskHeaders, tracer);
return new TaskManager(settings, threadPool, taskHeaders, tracer, initialNodeId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ public EsqlQueryRequest allowPartialResults(boolean allowPartialResults) {
}

@Override
public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
var status = new EsqlQueryStatus(new AsyncExecutionId(UUIDs.randomBase64UUID(), new TaskId(localNodeId, id)));
return new EsqlQueryRequestTask(query, id, type, action, parentTaskId, headers, status);
public Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
var status = new EsqlQueryStatus(new AsyncExecutionId(UUIDs.randomBase64UUID(), taskId));
return new EsqlQueryRequestTask(query, taskId.getId(), type, action, parentTaskId, headers, status);
}

private static class EsqlQueryRequestTask extends CancellableTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ public void testTask() throws IOException {

EsqlQueryRequest request = parseEsqlQueryRequestSync(requestJson);
String localNode = randomAlphaOfLength(2);
Task task = request.createTask(localNode, id, "transport", EsqlQueryAction.NAME, TaskId.EMPTY_TASK_ID, Map.of());
Task task = request.createTask(new TaskId(localNode, id), "transport", EsqlQueryAction.NAME, TaskId.EMPTY_TASK_ID, Map.of());
assertThat(task.getDescription(), equalTo(query));

TaskInfo taskInfo = task.taskInfo(localNode, true);
Expand Down