Skip to content

Commit 7133846

Browse files
committed
making the transport action a TransportNodesAction
1 parent d6e40e3 commit 7133846

File tree

6 files changed

+151
-47
lines changed

6 files changed

+151
-47
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
9292
import org.elasticsearch.persistent.PersistentTasksNodeService;
9393
import org.elasticsearch.plugins.ClusterPlugin;
94+
import org.elasticsearch.sample.TransportPutSampleConfigAction;
9495
import org.elasticsearch.script.ScriptMetadata;
9596
import org.elasticsearch.snapshots.RegisteredPolicySnapshots;
9697
import org.elasticsearch.snapshots.SnapshotsInfoService;
@@ -278,6 +279,12 @@ public static List<Entry> getNamedWriteables() {
278279
PersistentTasksCustomMetadata::new,
279280
PersistentTasksCustomMetadata::readDiffFrom
280281
);
282+
registerProjectCustom(
283+
entries,
284+
TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME,
285+
TransportPutSampleConfigAction.SamplingConfigCustomMetadata::new,
286+
TransportPutSampleConfigAction.SamplingConfigCustomMetadata::readDiffFrom
287+
);
281288
// Cluster scoped persistent tasks
282289
registerMetadataCustom(
283290
entries,
@@ -358,6 +365,13 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
358365
ClusterPersistentTasksCustomMetadata::fromXContent
359366
)
360367
);
368+
entries.add(
369+
new NamedXContentRegistry.Entry(
370+
Metadata.ProjectCustom.class,
371+
new ParseField(TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME),
372+
TransportPutSampleConfigAction.SamplingConfigCustomMetadata::fromXContent
373+
)
374+
);
361375
entries.add(
362376
new NamedXContentRegistry.Entry(
363377
Metadata.ProjectCustom.class,

server/src/main/java/org/elasticsearch/indices/IndicesModule.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.action.admin.indices.rollover.MinSizeCondition;
2323
import org.elasticsearch.action.admin.indices.rollover.OptimalShardCountCondition;
2424
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
25-
import org.elasticsearch.cluster.metadata.Metadata;
2625
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2726
import org.elasticsearch.index.mapper.BinaryFieldMapper;
2827
import org.elasticsearch.index.mapper.BooleanFieldMapper;
@@ -78,7 +77,6 @@
7877
import org.elasticsearch.injection.guice.AbstractModule;
7978
import org.elasticsearch.plugins.FieldPredicate;
8079
import org.elasticsearch.plugins.MapperPlugin;
81-
import org.elasticsearch.sample.TransportPutSampleConfigAction;
8280
import org.elasticsearch.xcontent.NamedXContentRegistry;
8381
import org.elasticsearch.xcontent.ParseField;
8482

@@ -116,13 +114,7 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
116114
new NamedWriteableRegistry.Entry(Condition.class, MaxDocsCondition.NAME, MaxDocsCondition::new),
117115
new NamedWriteableRegistry.Entry(Condition.class, MaxSizeCondition.NAME, MaxSizeCondition::new),
118116
new NamedWriteableRegistry.Entry(Condition.class, MaxPrimaryShardSizeCondition.NAME, MaxPrimaryShardSizeCondition::new),
119-
new NamedWriteableRegistry.Entry(Condition.class, MaxPrimaryShardDocsCondition.NAME, MaxPrimaryShardDocsCondition::new),
120-
new NamedWriteableRegistry.Entry(Condition.class, OptimalShardCountCondition.NAME, OptimalShardCountCondition::new),
121-
new NamedWriteableRegistry.Entry(
122-
Metadata.ProjectCustom.class,
123-
TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME,
124-
TransportPutSampleConfigAction.SamplingConfigCustomMetadata::new
125-
)
117+
new NamedWriteableRegistry.Entry(Condition.class, MaxPrimaryShardDocsCondition.NAME, MaxPrimaryShardDocsCondition::new)
126118
);
127119
}
128120

server/src/main/java/org/elasticsearch/ingest/SamplingService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.index.IndexRequest;
1313
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1414
import org.elasticsearch.common.bytes.BytesReference;
15+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1516
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
1617
import org.elasticsearch.common.xcontent.XContentHelper;
1718
import org.elasticsearch.plugins.internal.XContentParserDecorator;
@@ -68,6 +69,8 @@ public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexReque
6869
String condition = samplingConfig.condition;
6970
if (evaluateCondition(ingestDocument, condition)) {
7071
if (Math.random() < samplingConfig.rate) {
72+
indexRequest.incRef();
73+
((ReleasableBytesReference) indexRequest.source()).incRef();
7174
samplesForIndex.add(indexRequest);
7275
System.out.println("Sampling " + indexRequest);
7376
}

server/src/main/java/org/elasticsearch/sample/GetSampleAction.java

Lines changed: 89 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,30 @@
99

1010
package org.elasticsearch.sample;
1111

12-
import org.elasticsearch.action.ActionRequest;
1312
import org.elasticsearch.action.ActionRequestValidationException;
14-
import org.elasticsearch.action.ActionResponse;
1513
import org.elasticsearch.action.ActionType;
14+
import org.elasticsearch.action.FailedNodeException;
1615
import org.elasticsearch.action.IndicesRequest;
1716
import org.elasticsearch.action.index.IndexRequest;
1817
import org.elasticsearch.action.support.IndicesOptions;
18+
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
19+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
20+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
21+
import org.elasticsearch.cluster.ClusterName;
22+
import org.elasticsearch.cluster.node.DiscoveryNode;
1923
import org.elasticsearch.common.collect.Iterators;
2024
import org.elasticsearch.common.io.stream.StreamInput;
2125
import org.elasticsearch.common.io.stream.StreamOutput;
26+
import org.elasticsearch.common.io.stream.Writeable;
2227
import org.elasticsearch.common.xcontent.ChunkedToXContent;
2328
import org.elasticsearch.tasks.CancellableTask;
2429
import org.elasticsearch.tasks.Task;
2530
import org.elasticsearch.tasks.TaskId;
31+
import org.elasticsearch.transport.AbstractTransportRequest;
2632
import org.elasticsearch.xcontent.ToXContent;
2733

2834
import java.io.IOException;
35+
import java.util.Collection;
2936
import java.util.Iterator;
3037
import java.util.List;
3138
import java.util.Map;
@@ -43,28 +50,35 @@ private GetSampleAction() {
4350
super(NAME);
4451
}
4552

46-
public static class Response extends ActionResponse implements ChunkedToXContent {
53+
public static class Response extends BaseNodesResponse<NodeResponse> implements Writeable, ChunkedToXContent {
4754

48-
private final List<IndexRequest> samples;
55+
public Response(StreamInput in) throws IOException {
56+
super(in);
57+
}
4958

50-
public Response(final List<IndexRequest> samples) {
51-
this.samples = samples;
59+
public Response(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
60+
super(clusterName, nodes, failures);
5261
}
5362

5463
public List<IndexRequest> getSamples() {
55-
return samples;
64+
return getNodes().stream().map(n -> n.samples).filter(Objects::nonNull).flatMap(Collection::stream).toList();
5665
}
5766

5867
@Override
59-
public void writeTo(StreamOutput out) throws IOException {
60-
out.writeCollection(samples);
68+
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
69+
return in.readCollectionAsList(NodeResponse::new);
70+
}
71+
72+
@Override
73+
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
74+
out.writeCollection(nodes);
6175
}
6276

6377
@Override
6478
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
6579
return Iterators.concat(
6680
chunk((builder, p) -> builder.startObject().startArray("samples")),
67-
Iterators.flatMap(samples.iterator(), sample -> single((builder, params1) -> {
81+
Iterators.flatMap(getSamples().iterator(), sample -> single((builder, params1) -> {
6882
Map<String, Object> source = sample.sourceAsMap();
6983
builder.value(source);
7084
return builder;
@@ -75,40 +89,64 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
7589

7690
@Override
7791
public boolean equals(Object o) {
78-
if (this == o) {
79-
return true;
80-
}
81-
if (o == null || getClass() != o.getClass()) {
82-
return false;
83-
}
84-
GetSampleAction.Response response = (GetSampleAction.Response) o;
85-
return samples.equals(response.samples);
92+
if (this == o) return true;
93+
if (o == null || getClass() != o.getClass()) return false;
94+
Response that = (Response) o;
95+
return Objects.equals(getNodes(), that.getNodes()) && Objects.equals(failures(), that.failures());
8696
}
8797

8898
@Override
8999
public int hashCode() {
90-
return Objects.hash(samples);
100+
return Objects.hash(getNodes(), failures());
101+
}
102+
103+
}
104+
105+
public static class NodeResponse extends BaseNodeResponse {
106+
private final List<IndexRequest> samples;
107+
108+
protected NodeResponse(StreamInput in) throws IOException {
109+
super(in);
110+
samples = in.readCollectionAsList(IndexRequest::new);
111+
}
112+
113+
protected NodeResponse(DiscoveryNode node, List<IndexRequest> samples) {
114+
super(node);
115+
this.samples = samples;
116+
}
117+
118+
public List<IndexRequest> getSamples() {
119+
return samples;
91120
}
92121

93122
@Override
94-
public String toString() {
95-
return "Response{samples=" + samples + '}';
123+
public void writeTo(StreamOutput out) throws IOException {
124+
super.writeTo(out);
125+
out.writeCollection(samples);
126+
}
127+
128+
@Override
129+
public boolean equals(Object o) {
130+
if (this == o) return true;
131+
if (o == null || getClass() != o.getClass()) return false;
132+
NodeResponse that = (NodeResponse) o;
133+
return samples.equals(that.samples);
134+
}
135+
136+
@Override
137+
public int hashCode() {
138+
return Objects.hash(samples);
96139
}
97140
}
98141

99-
public static class Request extends ActionRequest implements IndicesRequest.Replaceable {
142+
public static class Request extends BaseNodesRequest implements IndicesRequest.Replaceable {
100143
private String[] names;
101144

102145
public Request(String[] names) {
103-
super();
146+
super((String[]) null);
104147
this.names = names;
105148
}
106149

107-
public Request(StreamInput in) throws IOException {
108-
super(in);
109-
this.names = in.readStringArray();
110-
}
111-
112150
@Override
113151
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
114152
return new CancellableTask(id, type, action, "", parentTaskId, headers);
@@ -138,4 +176,27 @@ public IndicesOptions indicesOptions() {
138176
return IndicesOptions.DEFAULT;
139177
}
140178
}
179+
180+
public static class NodeRequest extends AbstractTransportRequest {
181+
private final String index;
182+
183+
public NodeRequest(String index) {
184+
this.index = index;
185+
}
186+
187+
public NodeRequest(StreamInput in) throws IOException {
188+
super(in);
189+
this.index = in.readString();
190+
}
191+
192+
@Override
193+
public void writeTo(StreamOutput out) throws IOException {
194+
super.writeTo(out);
195+
out.writeString(index);
196+
}
197+
198+
public String getIndex() {
199+
return index;
200+
}
201+
}
141202
}

server/src/main/java/org/elasticsearch/sample/TransportGetSampleAction.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,28 @@
99

1010
package org.elasticsearch.sample;
1111

12-
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.FailedNodeException;
1313
import org.elasticsearch.action.index.IndexRequest;
1414
import org.elasticsearch.action.support.ActionFilters;
15-
import org.elasticsearch.action.support.HandledTransportAction;
15+
import org.elasticsearch.action.support.nodes.TransportNodesAction;
16+
import org.elasticsearch.cluster.node.DiscoveryNode;
1617
import org.elasticsearch.cluster.service.ClusterService;
17-
import org.elasticsearch.common.util.concurrent.EsExecutors;
18+
import org.elasticsearch.common.io.stream.StreamInput;
1819
import org.elasticsearch.ingest.SamplingService;
1920
import org.elasticsearch.injection.guice.Inject;
2021
import org.elasticsearch.tasks.Task;
2122
import org.elasticsearch.threadpool.ThreadPool;
2223
import org.elasticsearch.transport.TransportService;
2324

25+
import java.io.IOException;
2426
import java.util.List;
2527

26-
public class TransportGetSampleAction extends HandledTransportAction<GetSampleAction.Request, GetSampleAction.Response> {
28+
import static org.elasticsearch.sample.GetSampleAction.NodeRequest;
29+
import static org.elasticsearch.sample.GetSampleAction.NodeResponse;
30+
import static org.elasticsearch.sample.GetSampleAction.Request;
31+
import static org.elasticsearch.sample.GetSampleAction.Response;
32+
33+
public class TransportGetSampleAction extends TransportNodesAction<Request, Response, NodeRequest, NodeResponse, Void> {
2734
private final SamplingService samplingService;
2835

2936
@Inject
@@ -34,15 +41,37 @@ public TransportGetSampleAction(
3441
ActionFilters actionFilters,
3542
SamplingService samplingService
3643
) {
37-
super(GetSampleAction.NAME, transportService, actionFilters, GetSampleAction.Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
44+
super(
45+
GetSampleAction.NAME,
46+
clusterService,
47+
transportService,
48+
actionFilters,
49+
NodeRequest::new,
50+
threadPool.executor(ThreadPool.Names.MANAGEMENT)
51+
);
3852
this.samplingService = samplingService;
3953
}
4054

55+
@SuppressWarnings("checkstyle:LineLength")
56+
@Override
57+
protected Response newResponse(Request request, List<NodeResponse> nodeResponses, List<FailedNodeException> failures) {
58+
return new Response(clusterService.getClusterName(), nodeResponses, failures);
59+
}
60+
61+
@Override
62+
protected NodeRequest newNodeRequest(Request request) {
63+
return new NodeRequest(request.indices()[0]);
64+
}
65+
66+
@Override
67+
protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
68+
return new NodeResponse(in);
69+
}
70+
4171
@Override
42-
protected void doExecute(Task task, GetSampleAction.Request request, ActionListener<GetSampleAction.Response> listener) {
43-
String index = request.indices()[0];
72+
protected NodeResponse nodeOperation(NodeRequest request, Task task) {
73+
String index = request.getIndex();
4474
List<IndexRequest> samples = samplingService.getSamples(index);
45-
GetSampleAction.Response response = new GetSampleAction.Response(samples);
46-
listener.onResponse(response);
75+
return new NodeResponse(transportService.getLocalNode(), samples == null ? List.of() : samples);
4776
}
4877
}

server/src/main/java/org/elasticsearch/sample/TransportPutSampleConfigAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.cluster.ClusterState;
2323
import org.elasticsearch.cluster.ClusterStateAckListener;
2424
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
25+
import org.elasticsearch.cluster.NamedDiff;
2526
import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor;
2627
import org.elasticsearch.cluster.block.ClusterBlockException;
2728
import org.elasticsearch.cluster.metadata.Metadata;
@@ -258,6 +259,10 @@ public SamplingConfigCustomMetadata(StreamInput in) throws IOException {
258259
);
259260
}
260261

262+
public static NamedDiff<Metadata.ProjectCustom> readDiffFrom(StreamInput in) throws IOException {
263+
return readDiffFrom(Metadata.ProjectCustom.class, NAME, in);
264+
}
265+
261266
@Override
262267
public EnumSet<Metadata.XContentContext> context() {
263268
return ALL_CONTEXTS;
@@ -270,7 +275,7 @@ public TransportVersion getMinimalSupportedVersion() {
270275

271276
@Override
272277
public String getWriteableName() {
273-
return "sample_config";
278+
return NAME;
274279
}
275280

276281
@Override

0 commit comments

Comments
 (0)