Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public boolean shouldEnable() {
}

@Override
public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "Logs streams activation toggle request", parentTaskId, headers);
public Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(taskId.getId(), type, action, "Logs streams activation toggle request", parentTaskId, headers);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.discovery.SeedHostsResolver;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty4.Netty4Transport;
import org.elasticsearch.transport.netty4.SharedGroupFactory;
import org.hamcrest.Matchers;
Expand All @@ -47,8 +46,7 @@ public class EC2RetriesTests extends AbstractEC2MockAPITestCase {

@Override
protected MockTransportService createTransportService() {
return new MockTransportService(
Settings.EMPTY,
return MockTransportService.createMockTransportService(
new Netty4Transport(
Settings.EMPTY,
TransportVersion.current(),
Expand All @@ -59,9 +57,7 @@ protected MockTransportService createTransportService() {
new NoneCircuitBreakerService(),
new SharedGroupFactory(Settings.EMPTY)
),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
Copy link
Contributor Author

@GalLalouche GalLalouche Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed all clients passed the same Settings.EMPTY, interceptor, and null in the clusterSettings to this constructor (refactored to a factory method), so I've simplified the method by removing those parameters.

null
threadPool
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public TransportAddress[] addressesFromString(String address) {
return new TransportAddress[] { poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress()) };
}
};
return new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
return MockTransportService.createMockTransportService(transport, threadPool);
}

protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,14 +685,16 @@ private void construct(

modules.bindToInstance(Tracer.class, telemetryProvider.getTracer());

assert nodeEnvironment.nodeId() != null : "node ID must be set before constructing the Node";
TaskManager taskManager = new TaskManager(
settings,
threadPool,
Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).flatMap(p -> p.getTaskHeaders().stream()),
Task.HEADERS_TO_COPY.stream()
).collect(Collectors.toSet()),
telemetryProvider.getTracer()
telemetryProvider.getTracer(),
nodeEnvironment.nodeId()
);

ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
Expand Down Expand Up @@ -1097,7 +1099,8 @@ public Map<String, String> queryFields() {
localNodeFactory,
settingsModule.getClusterSettings(),
taskManager,
telemetryProvider.getTracer()
telemetryProvider.getTracer(),
nodeEnvironment.nodeId()
);
final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry());
final SearchTransportService searchTransportService = new SearchTransportService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ TransportService newTransportService(
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
TaskManager taskManager,
Tracer tracer
Tracer tracer,
String nodeId
) {
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskManager, tracer);
}
Expand Down
16 changes: 2 additions & 14 deletions server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

package org.elasticsearch.tasks;

import org.elasticsearch.core.Nullable;

import java.util.Map;

/**
Expand Down Expand Up @@ -57,18 +55,8 @@ default Task createTask(long id, String type, String action, TaskId parentTaskId
/**
* Returns the task object that should be used to keep track of the processing of the request, with an extra local node ID.
*/
// TODO remove the above overload, use only this one.
default Task createTask(
// TODO this is only nullable in tests, where the MockNode does not guarantee the localNodeId is set before calling this method. We
// We should fix the tests, and replace this and id with TaskId instead.
@Nullable String localNodeId,
long id,
String type,
String action,
TaskId parentTaskId,
Map<String, String> headers
) {
return createTask(id, type, action, parentTaskId, headers);
default Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return createTask(taskId.getId(), type, action, parentTaskId, headers);
}

/**
Expand Down
13 changes: 11 additions & 2 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class TaskManager implements ClusterStateApplier {

private TaskResultsService taskResultsService;

private final String nodeId;
private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES;

private final Tracer tracer;
Expand All @@ -95,11 +97,19 @@ public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHea
this(settings, threadPool, taskHeaders, Tracer.NOOP);
}

// For testing (especially the creating a random node ID, which some tests rely on)
public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please mention with a comment this is just for tests (creating the uuid). we should move all these uses to explicitly pass a test nodeid, but that can be a followup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

this(settings, threadPool, taskHeaders, tracer, UUIDs.randomBase64UUID());
}

// TODO Both of the above overloads should be moved to the test package.

public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer, String nodeId) {
this.threadPool = threadPool;
this.taskHeaders = Set.copyOf(taskHeaders);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.tracer = tracer;
this.nodeId = nodeId;
}

public void setTaskResultsService(TaskResultsService taskResultsService) {
Expand Down Expand Up @@ -142,8 +152,7 @@ public Task register(String type, String action, TaskAwareRequest request, boole
}
}
Task task = request.createTask(
lastDiscoveryNodes.getLocalNodeId(),
taskIdGenerator.incrementAndGet(),
new TaskId(nodeId, taskIdGenerator.incrementAndGet()),
type,
action,
request.getParentTask(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) {
discoveryNode.set(DiscoveryNodeUtils.create(name, address.publishAddress(), emptyMap(), emptySet()));
return discoveryNode.get();
};
TaskManager taskManager = createTaskManager(settings, threadPool, emptySet(), Tracer.NOOP);
TaskManager taskManager = createTaskManager(settings, threadPool, emptySet(), Tracer.NOOP, name);
transportService = new TransportService(
settings,
new Netty4Transport(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception {
NOOP_TRANSPORT_INTERCEPTOR,
x -> clusterService.localNode(),
null,
Collections.emptySet()
Collections.emptySet(),
clusterService.localNode().getId()
);
transportService.start();
transportService.acceptIncomingRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,9 @@ public void testRetryOnReplicaWithRealTransport() throws Exception {
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> clusterService.localNode(),
null,
Collections.emptySet()
Collections.emptySet(),
clusterService.localNode().getId()

);
transportService.start();
transportService.acceptIncomingRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty4.Netty4Transport;
import org.elasticsearch.transport.netty4.SharedGroupFactory;
import org.junit.After;
Expand Down Expand Up @@ -87,13 +86,7 @@ public BoundTransportAddress boundAddress() {
);
}
};
transportService = new MockTransportService(
Settings.EMPTY,
transport,
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null
);
transportService = MockTransportService.createMockTransportService(transport, threadPool);
}

public void testBuildDynamicNodes() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ private TransportService startServices(
.version(nodeVersion)
.build(),
null,
Collections.emptySet()
Collections.emptySet(),
nodeNameAndId
);
transportService.start();
transportService.acceptIncomingRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,10 @@ protected TransportService newTransportService(
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
TaskManager taskManager,
Tracer tracer
Tracer tracer,
String nodeId
) {

// we use the MockTransportService.TestPlugin class as a marker to create a network
// module with this MockNetworkService. NetworkService is such an integral part of the systme
// we don't allow to plug it in from plugins or anything. this is a test-only override and
Expand All @@ -183,7 +185,8 @@ protected TransportService newTransportService(
localNodeFactory,
clusterSettings,
taskManager,
tracer
tracer,
nodeId
);
} else {
return new MockTransportService(
Expand All @@ -193,7 +196,8 @@ protected TransportService newTransportService(
interceptor,
localNodeFactory,
clusterSettings,
taskManager.getTaskHeaders()
taskManager.getTaskHeaders(),
nodeId
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public static ClusterService createClusterService(
settings,
clusterSettings,
threadPool,
new TaskManager(settings, threadPool, Collections.emptySet(), Tracer.NOOP)
new TaskManager(settings, threadPool, Collections.emptySet(), Tracer.NOOP, localNode.getId())
);
clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService());
ClusterState.Builder builder = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
Expand Down
Loading