Skip to content

Commit 96315ec

Browse files
GalLaloucheelasticsearchmachine
andauthored
Use TaskId in TaskAwareRequest.createRequest (elastic#130131)
* TODO refactor: Use TaskId in TaskAwareRequest.createRequest Use TaskId instead of passing a node name and ID individually. * CR fixes * CR fixes * More CR fixes * Simplify new static method * [CI] Auto commit changes from spotless * Fix compilation error * CR Fixes --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent f509745 commit 96315ec

File tree

17 files changed

+81
-85
lines changed

17 files changed

+81
-85
lines changed

modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public boolean shouldEnable() {
5656
}
5757

5858
@Override
59-
public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
60-
return new CancellableTask(id, type, action, "Logs streams activation toggle request", parentTaskId, headers);
59+
public Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
60+
return new CancellableTask(taskId.getId(), type, action, "Logs streams activation toggle request", parentTaskId, headers);
6161
}
6262
}
6363
}

plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.discovery.SeedHostsResolver;
2626
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
2727
import org.elasticsearch.test.transport.MockTransportService;
28-
import org.elasticsearch.transport.TransportService;
2928
import org.elasticsearch.transport.netty4.Netty4Transport;
3029
import org.elasticsearch.transport.netty4.SharedGroupFactory;
3130
import org.hamcrest.Matchers;
@@ -47,8 +46,7 @@ public class EC2RetriesTests extends AbstractEC2MockAPITestCase {
4746

4847
@Override
4948
protected MockTransportService createTransportService() {
50-
return new MockTransportService(
51-
Settings.EMPTY,
49+
return MockTransportService.createMockTransportService(
5250
new Netty4Transport(
5351
Settings.EMPTY,
5452
TransportVersion.current(),
@@ -59,9 +57,7 @@ protected MockTransportService createTransportService() {
5957
new NoneCircuitBreakerService(),
6058
new SharedGroupFactory(Settings.EMPTY)
6159
),
62-
threadPool,
63-
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
64-
null
60+
threadPool
6561
);
6662
}
6763

plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public TransportAddress[] addressesFromString(String address) {
7878
return new TransportAddress[] { poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress()) };
7979
}
8080
};
81-
return new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
81+
return MockTransportService.createMockTransportService(transport, threadPool);
8282
}
8383

8484
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes) {

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -685,14 +685,16 @@ private void construct(
685685

686686
modules.bindToInstance(Tracer.class, telemetryProvider.getTracer());
687687

688+
assert nodeEnvironment.nodeId() != null : "node ID must be set before constructing the Node";
688689
TaskManager taskManager = new TaskManager(
689690
settings,
690691
threadPool,
691692
Stream.concat(
692693
pluginsService.filterPlugins(ActionPlugin.class).flatMap(p -> p.getTaskHeaders().stream()),
693694
Task.HEADERS_TO_COPY.stream()
694695
).collect(Collectors.toSet()),
695-
telemetryProvider.getTracer()
696+
telemetryProvider.getTracer(),
697+
nodeEnvironment.nodeId()
696698
);
697699

698700
ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
@@ -1097,7 +1099,8 @@ public Map<String, String> queryFields() {
10971099
localNodeFactory,
10981100
settingsModule.getClusterSettings(),
10991101
taskManager,
1100-
telemetryProvider.getTracer()
1102+
telemetryProvider.getTracer(),
1103+
nodeEnvironment.nodeId()
11011104
);
11021105
final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry());
11031106
final SearchTransportService searchTransportService = new SearchTransportService(

server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ TransportService newTransportService(
114114
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
115115
ClusterSettings clusterSettings,
116116
TaskManager taskManager,
117-
Tracer tracer
117+
Tracer tracer,
118+
String nodeId
118119
) {
119120
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskManager, tracer);
120121
}

server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99

1010
package org.elasticsearch.tasks;
1111

12-
import org.elasticsearch.core.Nullable;
13-
1412
import java.util.Map;
1513

1614
/**
@@ -57,18 +55,8 @@ default Task createTask(long id, String type, String action, TaskId parentTaskId
5755
/**
5856
* Returns the task object that should be used to keep track of the processing of the request, with an extra local node ID.
5957
*/
60-
// TODO remove the above overload, use only this one.
61-
default Task createTask(
62-
// TODO this is only nullable in tests, where the MockNode does not guarantee the localNodeId is set before calling this method. We
63-
// We should fix the tests, and replace this and id with TaskId instead.
64-
@Nullable String localNodeId,
65-
long id,
66-
String type,
67-
String action,
68-
TaskId parentTaskId,
69-
Map<String, String> headers
70-
) {
71-
return createTask(id, type, action, parentTaskId, headers);
58+
default Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
59+
return createTask(taskId.getId(), type, action, parentTaskId, headers);
7260
}
7361

7462
/**

server/src/main/java/org/elasticsearch/tasks/TaskManager.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.ClusterStateApplier;
2222
import org.elasticsearch.cluster.node.DiscoveryNode;
2323
import org.elasticsearch.cluster.node.DiscoveryNodes;
24+
import org.elasticsearch.common.UUIDs;
2425
import org.elasticsearch.common.settings.Settings;
2526
import org.elasticsearch.common.unit.ByteSizeValue;
2627
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -80,6 +81,7 @@ public class TaskManager implements ClusterStateApplier {
8081

8182
private TaskResultsService taskResultsService;
8283

84+
private final String nodeId;
8385
private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES;
8486

8587
private final Tracer tracer;
@@ -95,11 +97,19 @@ public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHea
9597
this(settings, threadPool, taskHeaders, Tracer.NOOP);
9698
}
9799

100+
// For testing (especially the creating a random node ID, which some tests rely on)
98101
public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer) {
102+
this(settings, threadPool, taskHeaders, tracer, UUIDs.randomBase64UUID());
103+
}
104+
105+
// TODO Both of the above overloads should be moved to the test package.
106+
107+
public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer, String nodeId) {
99108
this.threadPool = threadPool;
100109
this.taskHeaders = Set.copyOf(taskHeaders);
101110
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
102111
this.tracer = tracer;
112+
this.nodeId = nodeId;
103113
}
104114

105115
public void setTaskResultsService(TaskResultsService taskResultsService) {
@@ -142,8 +152,7 @@ public Task register(String type, String action, TaskAwareRequest request, boole
142152
}
143153
}
144154
Task task = request.createTask(
145-
lastDiscoveryNodes.getLocalNodeId(),
146-
taskIdGenerator.incrementAndGet(),
155+
new TaskId(nodeId, taskIdGenerator.incrementAndGet()),
147156
type,
148157
action,
149158
request.getParentTask(),

server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) {
177177
discoveryNode.set(DiscoveryNodeUtils.create(name, address.publishAddress(), emptyMap(), emptySet()));
178178
return discoveryNode.get();
179179
};
180-
TaskManager taskManager = createTaskManager(settings, threadPool, emptySet(), Tracer.NOOP);
180+
TaskManager taskManager = createTaskManager(settings, threadPool, emptySet(), Tracer.NOOP, name);
181181
transportService = new TransportService(
182182
settings,
183183
new Netty4Transport(

server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception {
123123
NOOP_TRANSPORT_INTERCEPTOR,
124124
x -> clusterService.localNode(),
125125
null,
126-
Collections.emptySet()
126+
Collections.emptySet(),
127+
clusterService.localNode().getId()
127128
);
128129
transportService.start();
129130
transportService.acceptIncomingRequests();

server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1316,7 +1316,9 @@ public void testRetryOnReplicaWithRealTransport() throws Exception {
13161316
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
13171317
x -> clusterService.localNode(),
13181318
null,
1319-
Collections.emptySet()
1319+
Collections.emptySet(),
1320+
clusterService.localNode().getId()
1321+
13201322
);
13211323
transportService.start();
13221324
transportService.acceptIncomingRequests();

0 commit comments

Comments
 (0)