Skip to content

Commit 32e0172

Browse files
authored
Making TransportGetDatabaseConfigurationAction extend TransportNodesAction (#113141) (#113467)
1 parent cb41144 commit 32e0172

File tree

4 files changed

+179
-65
lines changed

4 files changed

+179
-65
lines changed

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/direct/GetDatabaseConfigurationAction.java

Lines changed: 93 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@
99

1010
package org.elasticsearch.ingest.geoip.direct;
1111

12-
import org.elasticsearch.action.ActionResponse;
1312
import org.elasticsearch.action.ActionType;
14-
import org.elasticsearch.action.support.master.AcknowledgedRequest;
15-
import org.elasticsearch.common.Strings;
13+
import org.elasticsearch.action.FailedNodeException;
14+
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
15+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
16+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
17+
import org.elasticsearch.cluster.ClusterName;
18+
import org.elasticsearch.cluster.node.DiscoveryNode;
1619
import org.elasticsearch.common.io.stream.StreamInput;
1720
import org.elasticsearch.common.io.stream.StreamOutput;
18-
import org.elasticsearch.core.TimeValue;
21+
import org.elasticsearch.transport.TransportRequest;
1922
import org.elasticsearch.xcontent.ToXContentObject;
2023
import org.elasticsearch.xcontent.XContentBuilder;
2124

@@ -28,37 +31,26 @@
2831
import static org.elasticsearch.ingest.geoip.direct.DatabaseConfigurationMetadata.MODIFIED_DATE;
2932
import static org.elasticsearch.ingest.geoip.direct.DatabaseConfigurationMetadata.MODIFIED_DATE_MILLIS;
3033
import static org.elasticsearch.ingest.geoip.direct.DatabaseConfigurationMetadata.VERSION;
34+
import static org.elasticsearch.ingest.geoip.direct.GetDatabaseConfigurationAction.Response;
3135

32-
public class GetDatabaseConfigurationAction extends ActionType<GetDatabaseConfigurationAction.Response> {
36+
public class GetDatabaseConfigurationAction extends ActionType<Response> {
3337
public static final GetDatabaseConfigurationAction INSTANCE = new GetDatabaseConfigurationAction();
3438
public static final String NAME = "cluster:admin/ingest/geoip/database/get";
3539

3640
protected GetDatabaseConfigurationAction() {
3741
super(NAME);
3842
}
3943

40-
public static class Request extends AcknowledgedRequest<GetDatabaseConfigurationAction.Request> {
41-
44+
public static class Request extends BaseNodesRequest<Request> {
4245
private final String[] databaseIds;
4346

44-
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... databaseIds) {
45-
super(masterNodeTimeout, ackTimeout);
46-
this.databaseIds = Objects.requireNonNull(databaseIds, "ids may not be null");
47-
}
48-
49-
public Request(StreamInput in) throws IOException {
50-
super(in);
51-
databaseIds = in.readStringArray();
47+
public Request(String... databaseIds) {
48+
super((String[]) null);
49+
this.databaseIds = databaseIds;
5250
}
5351

5452
public String[] getDatabaseIds() {
55-
return this.databaseIds;
56-
}
57-
58-
@Override
59-
public void writeTo(StreamOutput out) throws IOException {
60-
super.writeTo(out);
61-
out.writeStringArray(databaseIds);
53+
return databaseIds;
6254
}
6355

6456
@Override
@@ -77,27 +69,36 @@ public boolean equals(Object obj) {
7769
Request other = (Request) obj;
7870
return Arrays.equals(databaseIds, other.databaseIds);
7971
}
72+
8073
}
8174

82-
public static class Response extends ActionResponse implements ToXContentObject {
75+
public static class Response extends BaseNodesResponse<NodeResponse> implements ToXContentObject {
8376

8477
private final List<DatabaseConfigurationMetadata> databases;
8578

86-
public Response(List<DatabaseConfigurationMetadata> databases) {
79+
public Response(
80+
List<DatabaseConfigurationMetadata> databases,
81+
ClusterName clusterName,
82+
List<NodeResponse> nodes,
83+
List<FailedNodeException> failures
84+
) {
85+
super(clusterName, nodes, failures);
8786
this.databases = List.copyOf(databases); // defensive copy
8887
}
8988

90-
public Response(StreamInput in) throws IOException {
91-
this(in.readCollectionAsList(DatabaseConfigurationMetadata::new));
89+
protected Response(StreamInput in) throws IOException {
90+
super(in);
91+
this.databases = in.readCollectionAsList(DatabaseConfigurationMetadata::new);
9292
}
9393

94-
public List<DatabaseConfigurationMetadata> getDatabases() {
95-
return this.databases;
94+
@Override
95+
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
96+
return in.readCollectionAsList(NodeResponse::new);
9697
}
9798

9899
@Override
99-
public String toString() {
100-
return Strings.toString(this);
100+
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
101+
out.writeCollection(nodes);
101102
}
102103

103104
@Override
@@ -117,6 +118,67 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
117118
builder.endObject();
118119
return builder;
119120
}
121+
}
122+
123+
public static class NodeRequest extends TransportRequest {
124+
125+
private final String[] databaseIds;
126+
127+
public NodeRequest(String... databaseIds) {
128+
super();
129+
this.databaseIds = Objects.requireNonNull(databaseIds, "ids may not be null");
130+
}
131+
132+
public NodeRequest(StreamInput in) throws IOException {
133+
super(in);
134+
databaseIds = in.readStringArray();
135+
}
136+
137+
public String[] getDatabaseIds() {
138+
return this.databaseIds;
139+
}
140+
141+
@Override
142+
public void writeTo(StreamOutput out) throws IOException {
143+
super.writeTo(out);
144+
out.writeStringArray(databaseIds);
145+
}
146+
147+
@Override
148+
public int hashCode() {
149+
return Arrays.hashCode(databaseIds);
150+
}
151+
152+
@Override
153+
public boolean equals(Object obj) {
154+
if (obj == null) {
155+
return false;
156+
}
157+
if (obj.getClass() != getClass()) {
158+
return false;
159+
}
160+
NodeRequest other = (NodeRequest) obj;
161+
return Arrays.equals(databaseIds, other.databaseIds);
162+
}
163+
}
164+
165+
public static class NodeResponse extends BaseNodeResponse {
166+
167+
private final List<DatabaseConfigurationMetadata> databases;
168+
169+
public NodeResponse(DiscoveryNode node, List<DatabaseConfigurationMetadata> databases) {
170+
super(node);
171+
this.databases = List.copyOf(databases); // defensive copy
172+
}
173+
174+
public NodeResponse(StreamInput in) throws IOException {
175+
super(in);
176+
this.databases = in.readCollectionAsList(DatabaseConfigurationMetadata::new);
177+
}
178+
179+
public List<DatabaseConfigurationMetadata> getDatabases() {
180+
return this.databases;
181+
}
120182

121183
@Override
122184
public void writeTo(StreamOutput out) throws IOException {
@@ -136,7 +198,7 @@ public boolean equals(Object obj) {
136198
if (obj.getClass() != getClass()) {
137199
return false;
138200
}
139-
Response other = (Response) obj;
201+
NodeResponse other = (NodeResponse) obj;
140202
return databases.equals(other.databases);
141203
}
142204
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/direct/RestGetDatabaseConfigurationAction.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import java.util.List;
2121

2222
import static org.elasticsearch.rest.RestRequest.Method.GET;
23-
import static org.elasticsearch.rest.RestUtils.getAckTimeout;
24-
import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;
2523

2624
@ServerlessScope(Scope.INTERNAL)
2725
public class RestGetDatabaseConfigurationAction extends BaseRestHandler {
@@ -38,11 +36,7 @@ public String getName() {
3836

3937
@Override
4038
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
41-
final var req = new GetDatabaseConfigurationAction.Request(
42-
getMasterNodeTimeout(request),
43-
getAckTimeout(request),
44-
Strings.splitStringByCommaToArray(request.param("id"))
45-
);
39+
final var req = new GetDatabaseConfigurationAction.Request(Strings.splitStringByCommaToArray(request.param("id")));
4640
return channel -> client.execute(GetDatabaseConfigurationAction.INSTANCE, req, new RestToXContentListener<>(channel));
4741
}
4842
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/direct/TransportGetDatabaseConfigurationAction.java

Lines changed: 79 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,60 +11,78 @@
1111

1212
import org.elasticsearch.ResourceNotFoundException;
1313
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.FailedNodeException;
1415
import org.elasticsearch.action.support.ActionFilters;
15-
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
16-
import org.elasticsearch.cluster.ClusterState;
17-
import org.elasticsearch.cluster.block.ClusterBlockException;
18-
import org.elasticsearch.cluster.block.ClusterBlockLevel;
19-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
16+
import org.elasticsearch.action.support.nodes.TransportNodesAction;
17+
import org.elasticsearch.cluster.node.DiscoveryNode;
2018
import org.elasticsearch.cluster.service.ClusterService;
19+
import org.elasticsearch.common.io.stream.StreamInput;
2120
import org.elasticsearch.common.regex.Regex;
22-
import org.elasticsearch.common.util.concurrent.EsExecutors;
21+
import org.elasticsearch.features.FeatureService;
2322
import org.elasticsearch.ingest.geoip.IngestGeoIpMetadata;
2423
import org.elasticsearch.injection.guice.Inject;
2524
import org.elasticsearch.tasks.Task;
2625
import org.elasticsearch.threadpool.ThreadPool;
2726
import org.elasticsearch.transport.TransportService;
2827

28+
import java.io.IOException;
2929
import java.util.ArrayList;
3030
import java.util.Arrays;
3131
import java.util.LinkedHashSet;
3232
import java.util.List;
3333
import java.util.Map;
3434
import java.util.Set;
3535

36-
public class TransportGetDatabaseConfigurationAction extends TransportMasterNodeAction<
36+
import static org.elasticsearch.ingest.IngestGeoIpFeatures.GET_DATABASE_CONFIGURATION_ACTION_MULTI_NODE;
37+
38+
public class TransportGetDatabaseConfigurationAction extends TransportNodesAction<
3739
GetDatabaseConfigurationAction.Request,
38-
GetDatabaseConfigurationAction.Response> {
40+
GetDatabaseConfigurationAction.Response,
41+
GetDatabaseConfigurationAction.NodeRequest,
42+
GetDatabaseConfigurationAction.NodeResponse,
43+
List<DatabaseConfigurationMetadata>> {
44+
45+
private final FeatureService featureService;
3946

4047
@Inject
4148
public TransportGetDatabaseConfigurationAction(
4249
TransportService transportService,
4350
ClusterService clusterService,
4451
ThreadPool threadPool,
4552
ActionFilters actionFilters,
46-
IndexNameExpressionResolver indexNameExpressionResolver
53+
FeatureService featureService
4754
) {
4855
super(
4956
GetDatabaseConfigurationAction.NAME,
50-
transportService,
5157
clusterService,
52-
threadPool,
58+
transportService,
5359
actionFilters,
54-
GetDatabaseConfigurationAction.Request::new,
55-
indexNameExpressionResolver,
56-
GetDatabaseConfigurationAction.Response::new,
57-
EsExecutors.DIRECT_EXECUTOR_SERVICE
60+
GetDatabaseConfigurationAction.NodeRequest::new,
61+
threadPool.executor(ThreadPool.Names.MANAGEMENT)
5862
);
63+
this.featureService = featureService;
5964
}
6065

6166
@Override
62-
protected void masterOperation(
63-
final Task task,
64-
final GetDatabaseConfigurationAction.Request request,
65-
final ClusterState state,
66-
final ActionListener<GetDatabaseConfigurationAction.Response> listener
67+
protected void doExecute(
68+
Task task,
69+
GetDatabaseConfigurationAction.Request request,
70+
ActionListener<GetDatabaseConfigurationAction.Response> listener
6771
) {
72+
if (featureService.clusterHasFeature(clusterService.state(), GET_DATABASE_CONFIGURATION_ACTION_MULTI_NODE) == false) {
73+
/*
74+
* TransportGetDatabaseConfigurationAction used to be a TransportMasterNodeAction, and not all nodes in the cluster have been
75+
* updated. So we don't want to send node requests to the other nodes because they will blow up. Instead, we just return
76+
* the information that we used to return from the master node (it doesn't make any difference that this might not be the master
77+
* node, because we're only reading the clsuter state).
78+
*/
79+
newResponseAsync(task, request, createActionContext(task, request), List.of(), List.of(), listener);
80+
} else {
81+
super.doExecute(task, request, listener);
82+
}
83+
}
84+
85+
protected List<DatabaseConfigurationMetadata> createActionContext(Task task, GetDatabaseConfigurationAction.Request request) {
6886
final Set<String> ids;
6987
if (request.getDatabaseIds().length == 0) {
7088
// if we did not ask for a specific name, then return all databases
@@ -79,7 +97,7 @@ protected void masterOperation(
7997
);
8098
}
8199

82-
final IngestGeoIpMetadata geoIpMeta = state.metadata().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
100+
final IngestGeoIpMetadata geoIpMeta = clusterService.state().metadata().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
83101
List<DatabaseConfigurationMetadata> results = new ArrayList<>();
84102

85103
for (String id : ids) {
@@ -92,19 +110,54 @@ protected void masterOperation(
92110
} else {
93111
DatabaseConfigurationMetadata meta = geoIpMeta.getDatabases().get(id);
94112
if (meta == null) {
95-
listener.onFailure(new ResourceNotFoundException("database configuration not found: {}", id));
96-
return;
113+
throw new ResourceNotFoundException("database configuration not found: {}", id);
97114
} else {
98115
results.add(meta);
99116
}
100117
}
101118
}
119+
return results;
120+
}
121+
122+
protected void newResponseAsync(
123+
Task task,
124+
GetDatabaseConfigurationAction.Request request,
125+
List<DatabaseConfigurationMetadata> results,
126+
List<GetDatabaseConfigurationAction.NodeResponse> responses,
127+
List<FailedNodeException> failures,
128+
ActionListener<GetDatabaseConfigurationAction.Response> listener
129+
) {
130+
ActionListener.run(
131+
listener,
132+
l -> ActionListener.respondAndRelease(
133+
l,
134+
new GetDatabaseConfigurationAction.Response(results, clusterService.getClusterName(), responses, failures)
135+
)
136+
);
137+
}
102138

103-
listener.onResponse(new GetDatabaseConfigurationAction.Response(results));
139+
@Override
140+
protected GetDatabaseConfigurationAction.Response newResponse(
141+
GetDatabaseConfigurationAction.Request request,
142+
List<GetDatabaseConfigurationAction.NodeResponse> nodeResponses,
143+
List<FailedNodeException> failures
144+
) {
145+
throw new UnsupportedOperationException("Use newResponseAsync instead");
104146
}
105147

106148
@Override
107-
protected ClusterBlockException checkBlock(GetDatabaseConfigurationAction.Request request, ClusterState state) {
108-
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
149+
protected GetDatabaseConfigurationAction.NodeRequest newNodeRequest(GetDatabaseConfigurationAction.Request request) {
150+
return new GetDatabaseConfigurationAction.NodeRequest(request.getDatabaseIds());
109151
}
152+
153+
@Override
154+
protected GetDatabaseConfigurationAction.NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
155+
return new GetDatabaseConfigurationAction.NodeResponse(in);
156+
}
157+
158+
@Override
159+
protected GetDatabaseConfigurationAction.NodeResponse nodeOperation(GetDatabaseConfigurationAction.NodeRequest request, Task task) {
160+
return new GetDatabaseConfigurationAction.NodeResponse(transportService.getLocalNode(), List.of());
161+
}
162+
110163
}

server/src/main/java/org/elasticsearch/ingest/IngestGeoIpFeatures.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@
1717
import static org.elasticsearch.ingest.EnterpriseGeoIpTask.GEOIP_DOWNLOADER_DATABASE_CONFIGURATION;
1818

1919
public class IngestGeoIpFeatures implements FeatureSpecification {
20+
21+
public static final NodeFeature GET_DATABASE_CONFIGURATION_ACTION_MULTI_NODE = new NodeFeature(
22+
"get_database_configuration_action.multi_node"
23+
);
24+
2025
public Set<NodeFeature> getFeatures() {
21-
return Set.of(GEOIP_DOWNLOADER_DATABASE_CONFIGURATION);
26+
return Set.of(GEOIP_DOWNLOADER_DATABASE_CONFIGURATION, GET_DATABASE_CONFIGURATION_ACTION_MULTI_NODE);
2227
}
2328
}

0 commit comments

Comments
 (0)