Skip to content

Commit 038b59f

Browse files
committed
Use correct threads
1 parent c127cda commit 038b59f

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

x-pack/plugin/metrics/src/main/java/org/elasticsearch/xpack/metrics/MetricsRestAction.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import com.google.protobuf.CodedOutputStream;
1313

14+
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.client.internal.node.NodeClient;
1516
import org.elasticsearch.common.bytes.BytesReference;
1617
import org.elasticsearch.rest.BaseRestHandler;
@@ -45,13 +46,17 @@ public boolean mediaTypesValid(RestRequest request) {
4546
@Override
4647
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
4748
if (request.hasContent()) {
48-
var transportRequest = new MetricsTransportAction.MetricsRequest(request.param("index"), request.content());
49-
return channel -> client.execute(MetricsTransportAction.TYPE, transportRequest, new RestResponseListener<>(channel) {
50-
@Override
51-
public RestResponse buildResponse(MetricsTransportAction.MetricsResponse r) throws Exception {
52-
return successResponse();
53-
}
54-
});
49+
var transportRequest = new MetricsTransportAction.MetricsRequest(request.param("index"), request.content().retain());
50+
return channel -> client.execute(
51+
MetricsTransportAction.TYPE,
52+
transportRequest,
53+
ActionListener.releaseBefore(request.content(), new RestResponseListener<>(channel) {
54+
@Override
55+
public RestResponse buildResponse(MetricsTransportAction.MetricsResponse r) throws Exception {
56+
return successResponse();
57+
}
58+
})
59+
);
5560
}
5661

5762
// according to spec empty requests are successful

x-pack/plugin/metrics/src/main/java/org/elasticsearch/xpack/metrics/MetricsTransportAction.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.common.io.stream.StreamInput;
2929
import org.elasticsearch.common.io.stream.StreamOutput;
3030
import org.elasticsearch.common.lucene.uid.Versions;
31-
import org.elasticsearch.common.util.concurrent.EsExecutors;
3231
import org.elasticsearch.index.VersionType;
3332
import org.elasticsearch.index.engine.Engine;
3433
import org.elasticsearch.index.mapper.LuceneDocument;
@@ -39,6 +38,7 @@
3938
import org.elasticsearch.indices.IndicesService;
4039
import org.elasticsearch.injection.guice.Inject;
4140
import org.elasticsearch.tasks.Task;
41+
import org.elasticsearch.threadpool.ThreadPool;
4242
import org.elasticsearch.transport.TransportService;
4343
import org.elasticsearch.xcontent.XContentType;
4444

@@ -62,11 +62,12 @@ public class MetricsTransportAction extends HandledTransportAction<
6262
public MetricsTransportAction(
6363
TransportService transportService,
6464
ActionFilters actionFilters,
65+
ThreadPool threadPool,
6566
ClusterService clusterService,
6667
ProjectResolver projectResolver,
6768
IndicesService indicesService
6869
) {
69-
super(NAME, transportService, actionFilters, MetricsRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
70+
super(NAME, transportService, actionFilters, MetricsRequest::new, threadPool.executor(ThreadPool.Names.WRITE));
7071
this.clusterService = clusterService;
7172
this.projectResolver = projectResolver;
7273
this.indicesService = indicesService;
@@ -95,7 +96,9 @@ protected void doExecute(Task task, MetricsRequest request, ActionListener<Metri
9596
// We receive a batch so there will be multiple documents as a result of processing it.
9697
var documents = createLuceneDocuments(metricsServiceRequest);
9798

98-
// TODO thread pool for writing
99+
// This loop is similar to TransportShardBulkAction, we fork the processing of the entire request
100+
// to ThreadPool.Names.WRITE but we perform all writes on the same thread.
101+
// We expect concurrency to come from a client submitting concurrent requests.
99102
for (var luceneDocument : documents) {
100103
var id = UUIDs.randomBase64UUID();
101104

0 commit comments

Comments
 (0)