Skip to content

Commit 0f9d2bd

Browse files
committed
TODO refactor: Use TaskId in TaskAwareRequest.createRequest
Use TaskId instead of passing a node name and ID individually.
1 parent f16e370 commit 0f9d2bd

File tree

17 files changed

+85
-52
lines changed

17 files changed

+85
-52
lines changed

modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public boolean shouldEnable() {
5656
}
5757

5858
@Override
59-
public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
60-
return new CancellableTask(id, type, action, "Logs streams activation toggle request", parentTaskId, headers);
59+
public Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
60+
return new CancellableTask(taskId.getId(), type, action, "Logs streams activation toggle request", parentTaskId, headers);
6161
}
6262
}
6363
}

plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ protected MockTransportService createTransportService() {
6161
),
6262
threadPool,
6363
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
64+
null,
65+
null,
6466
null
6567
);
6668
}

plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,15 @@ public TransportAddress[] addressesFromString(String address) {
7878
return new TransportAddress[] { poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress()) };
7979
}
8080
};
81-
return new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
81+
return new MockTransportService(
82+
Settings.EMPTY,
83+
transport,
84+
threadPool,
85+
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
86+
null,
87+
null,
88+
null
89+
);
8290
}
8391

8492
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes) {

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -685,14 +685,16 @@ private void construct(
685685

686686
modules.bindToInstance(Tracer.class, telemetryProvider.getTracer());
687687

688+
assert nodeEnvironment.nodeId() != null : "node ID must be set before constructing the Node";
688689
TaskManager taskManager = new TaskManager(
689690
settings,
690691
threadPool,
691692
Stream.concat(
692693
pluginsService.filterPlugins(ActionPlugin.class).flatMap(p -> p.getTaskHeaders().stream()),
693694
Task.HEADERS_TO_COPY.stream()
694695
).collect(Collectors.toSet()),
695-
telemetryProvider.getTracer()
696+
telemetryProvider.getTracer(),
697+
nodeEnvironment.nodeId()
696698
);
697699

698700
ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
@@ -1097,7 +1099,8 @@ public Map<String, String> queryFields() {
10971099
localNodeFactory,
10981100
settingsModule.getClusterSettings(),
10991101
taskManager,
1100-
telemetryProvider.getTracer()
1102+
telemetryProvider.getTracer(),
1103+
nodeEnvironment.nodeId()
11011104
);
11021105
final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry());
11031106
final SearchTransportService searchTransportService = new SearchTransportService(

server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ TransportService newTransportService(
114114
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
115115
ClusterSettings clusterSettings,
116116
TaskManager taskManager,
117-
Tracer tracer
117+
Tracer tracer,
118+
String initialNodeId
118119
) {
119120
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskManager, tracer);
120121
}

server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99

1010
package org.elasticsearch.tasks;
1111

12-
import org.elasticsearch.core.Nullable;
13-
1412
import java.util.Map;
1513

1614
/**
@@ -57,18 +55,8 @@ default Task createTask(long id, String type, String action, TaskId parentTaskId
5755
/**
5856
* Returns the task object that should be used to keep track of the processing of the request, with an extra local node ID.
5957
*/
60-
// TODO remove the above overload, use only this one.
61-
default Task createTask(
62-
// TODO this is only nullable in tests, where the MockNode does not guarantee the localNodeId is set before calling this method. We
63-
// We should fix the tests, and replace this and id with TaskId instead.
64-
@Nullable String localNodeId,
65-
long id,
66-
String type,
67-
String action,
68-
TaskId parentTaskId,
69-
Map<String, String> headers
70-
) {
71-
return createTask(id, type, action, parentTaskId, headers);
58+
default Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
59+
return createTask(taskId.getId(), type, action, parentTaskId, headers);
7260
}
7361

7462
/**

server/src/main/java/org/elasticsearch/tasks/TaskManager.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.ClusterStateApplier;
2222
import org.elasticsearch.cluster.node.DiscoveryNode;
2323
import org.elasticsearch.cluster.node.DiscoveryNodes;
24+
import org.elasticsearch.common.UUIDs;
2425
import org.elasticsearch.common.settings.Settings;
2526
import org.elasticsearch.common.unit.ByteSizeValue;
2627
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -48,6 +49,7 @@
4849
import java.util.List;
4950
import java.util.Map;
5051
import java.util.Objects;
52+
import java.util.Optional;
5153
import java.util.Set;
5254
import java.util.concurrent.ConcurrentHashMap;
5355
import java.util.concurrent.CopyOnWriteArrayList;
@@ -80,6 +82,7 @@ public class TaskManager implements ClusterStateApplier {
8082

8183
private TaskResultsService taskResultsService;
8284

85+
private final String initialNodeId;
8386
private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES;
8487

8588
private final Tracer tracer;
@@ -96,10 +99,15 @@ public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHea
9699
}
97100

98101
public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer) {
102+
this(settings, threadPool, taskHeaders, tracer, UUIDs.randomBase64UUID());
103+
}
104+
105+
public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer, @Nullable String initialNodeId) {
99106
this.threadPool = threadPool;
100107
this.taskHeaders = Set.copyOf(taskHeaders);
101108
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
102109
this.tracer = tracer;
110+
this.initialNodeId = initialNodeId;
103111
}
104112

105113
public void setTaskResultsService(TaskResultsService taskResultsService) {
@@ -141,9 +149,9 @@ public Task register(String type, String action, TaskAwareRequest request, boole
141149
headers.put(key, httpHeader);
142150
}
143151
}
152+
String localNodeId = Optional.ofNullable(lastDiscoveryNodes.getLocalNodeId()).orElse(initialNodeId);
144153
Task task = request.createTask(
145-
lastDiscoveryNodes.getLocalNodeId(),
146-
taskIdGenerator.incrementAndGet(),
154+
new TaskId(localNodeId, taskIdGenerator.incrementAndGet()),
147155
type,
148156
action,
149157
request.getParentTask(),

server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) {
177177
discoveryNode.set(DiscoveryNodeUtils.create(name, address.publishAddress(), emptyMap(), emptySet()));
178178
return discoveryNode.get();
179179
};
180-
TaskManager taskManager = createTaskManager(settings, threadPool, emptySet(), Tracer.NOOP);
180+
TaskManager taskManager = createTaskManager(settings, threadPool, emptySet(), Tracer.NOOP, name);
181181
transportService = new TransportService(
182182
settings,
183183
new Netty4Transport(

server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception {
123123
NOOP_TRANSPORT_INTERCEPTOR,
124124
x -> clusterService.localNode(),
125125
null,
126-
Collections.emptySet()
126+
Collections.emptySet(),
127+
clusterService.localNode().getId()
127128
);
128129
transportService.start();
129130
transportService.acceptIncomingRequests();

server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1316,7 +1316,9 @@ public void testRetryOnReplicaWithRealTransport() throws Exception {
13161316
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
13171317
x -> clusterService.localNode(),
13181318
null,
1319-
Collections.emptySet()
1319+
Collections.emptySet(),
1320+
clusterService.localNode().getId()
1321+
13201322
);
13211323
transportService.start();
13221324
transportService.acceptIncomingRequests();

0 commit comments

Comments
 (0)