Skip to content

Commit 6130fbb

Browse files
authored
Implement lifecycle on SimulatePipelineRequest (#117585)
Rather than releasing the REST request body after computing the response, we can link the lifecycles of the REST and transport requests and release the REST request body sooner. Not that we expect these bodies to be particularly large in this case, but still it's a better pattern to follow.
1 parent ef8ffc5 commit 6130fbb

File tree

10 files changed

+68
-41
lines changed

10 files changed

+68
-41
lines changed

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.elasticsearch.search.sort.SortOrder;
4242
import org.elasticsearch.test.junit.annotations.TestLogging;
4343
import org.elasticsearch.xcontent.XContentBuilder;
44-
import org.elasticsearch.xcontent.XContentType;
4544
import org.elasticsearch.xcontent.json.JsonXContent;
4645
import org.junit.After;
4746

@@ -67,6 +66,7 @@
6766
import java.util.zip.GZIPInputStream;
6867

6968
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
69+
import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonSimulatePipelineRequest;
7070
import static org.elasticsearch.ingest.geoip.GeoIpTestUtils.copyDefaultDatabases;
7171
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
7272
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
@@ -494,7 +494,7 @@ private SimulateDocumentBaseResult simulatePipeline() throws IOException {
494494
builder.endObject();
495495
bytes = BytesReference.bytes(builder);
496496
}
497-
SimulatePipelineRequest simulateRequest = new SimulatePipelineRequest(bytes, XContentType.JSON);
497+
SimulatePipelineRequest simulateRequest = jsonSimulatePipelineRequest(bytes);
498498
simulateRequest.setId("_id");
499499
// Avoid executing on a coordinating only node, because databases are not available there and geoip processor won't do any lookups.
500500
// (some test seeds repeatedly hit such nodes causing failures)

server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.List;
3838
import java.util.Map;
3939

40+
import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonSimulatePipelineRequest;
4041
import static org.elasticsearch.ingest.IngestPipelineTestUtils.putJsonPipelineRequest;
4142
import static org.elasticsearch.test.NodeRoles.nonIngestNode;
4243
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
@@ -97,7 +98,7 @@ public void testSimulate() throws Exception {
9798
if (randomBoolean()) {
9899
response = clusterAdmin().prepareSimulatePipeline(bytes, XContentType.JSON).setId("_id").get();
99100
} else {
100-
SimulatePipelineRequest request = new SimulatePipelineRequest(bytes, XContentType.JSON);
101+
SimulatePipelineRequest request = jsonSimulatePipelineRequest(bytes);
101102
request.setId("_id");
102103
response = clusterAdmin().simulatePipeline(request).get();
103104
}

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.ActionRequest;
1313
import org.elasticsearch.action.ActionRequestValidationException;
1414
import org.elasticsearch.common.bytes.BytesReference;
15+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1516
import org.elasticsearch.common.io.stream.StreamInput;
1617
import org.elasticsearch.common.io.stream.StreamOutput;
1718
import org.elasticsearch.common.logging.DeprecationLogger;
@@ -41,19 +42,20 @@ public class SimulatePipelineRequest extends ActionRequest implements ToXContent
4142
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(SimulatePipelineRequest.class);
4243
private String id;
4344
private boolean verbose;
44-
private final BytesReference source;
45+
private final ReleasableBytesReference source;
4546
private final XContentType xContentType;
4647
private RestApiVersion restApiVersion;
4748

4849
/**
4950
* Creates a new request with the given source and its content type
5051
*/
51-
public SimulatePipelineRequest(BytesReference source, XContentType xContentType) {
52+
public SimulatePipelineRequest(ReleasableBytesReference source, XContentType xContentType) {
5253
this(source, xContentType, RestApiVersion.current());
5354
}
5455

55-
public SimulatePipelineRequest(BytesReference source, XContentType xContentType, RestApiVersion restApiVersion) {
56+
public SimulatePipelineRequest(ReleasableBytesReference source, XContentType xContentType, RestApiVersion restApiVersion) {
5657
this.source = Objects.requireNonNull(source);
58+
assert source.hasReferences();
5759
this.xContentType = Objects.requireNonNull(xContentType);
5860
this.restApiVersion = restApiVersion;
5961
}
@@ -62,7 +64,7 @@ public SimulatePipelineRequest(BytesReference source, XContentType xContentType,
6264
super(in);
6365
id = in.readOptionalString();
6466
verbose = in.readBoolean();
65-
source = in.readBytesReference();
67+
source = in.readReleasableBytesReference();
6668
xContentType = in.readEnum(XContentType.class);
6769
}
6870

@@ -88,6 +90,7 @@ public void setVerbose(boolean verbose) {
8890
}
8991

9092
public BytesReference getSource() {
93+
assert source.hasReferences();
9194
return source;
9295
}
9396

@@ -250,4 +253,24 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config, RestAp
250253
public RestApiVersion getRestApiVersion() {
251254
return restApiVersion;
252255
}
256+
257+
@Override
258+
public final void incRef() {
259+
source.incRef();
260+
}
261+
262+
@Override
263+
public final boolean tryIncRef() {
264+
return source.tryIncRef();
265+
}
266+
267+
@Override
268+
public final boolean decRef() {
269+
return source.decRef();
270+
}
271+
272+
@Override
273+
public final boolean hasReferences() {
274+
return source.hasReferences();
275+
}
253276
}

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequestBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.ActionRequestBuilder;
1313
import org.elasticsearch.client.internal.ElasticsearchClient;
1414
import org.elasticsearch.common.bytes.BytesReference;
15+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1516
import org.elasticsearch.xcontent.XContentType;
1617

1718
public class SimulatePipelineRequestBuilder extends ActionRequestBuilder<SimulatePipelineRequest, SimulatePipelineResponse> {
@@ -20,7 +21,7 @@ public class SimulatePipelineRequestBuilder extends ActionRequestBuilder<Simulat
2021
* Create a new builder for {@link SimulatePipelineRequest}s
2122
*/
2223
public SimulatePipelineRequestBuilder(ElasticsearchClient client, BytesReference source, XContentType xContentType) {
23-
super(client, SimulatePipelineAction.INSTANCE, new SimulatePipelineRequest(source, xContentType));
24+
super(client, SimulatePipelineAction.INSTANCE, new SimulatePipelineRequest(ReleasableBytesReference.wrap(source), xContentType));
2425
}
2526

2627
/**

server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulatePipelineAction.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.rest.action.ingest;
1111

12-
import org.elasticsearch.action.ActionListener;
1312
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
1413
import org.elasticsearch.client.internal.node.NodeClient;
1514
import org.elasticsearch.common.bytes.ReleasableBytesReference;
@@ -48,12 +47,9 @@ public String getName() {
4847
@Override
4948
public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
5049
Tuple<XContentType, ReleasableBytesReference> sourceTuple = restRequest.contentOrSourceParam();
51-
var content = sourceTuple.v2();
52-
SimulatePipelineRequest request = new SimulatePipelineRequest(sourceTuple.v2(), sourceTuple.v1(), restRequest.getRestApiVersion());
50+
final var request = new SimulatePipelineRequest(sourceTuple.v2(), sourceTuple.v1(), restRequest.getRestApiVersion());
5351
request.setId(restRequest.param("id"));
5452
request.setVerbose(restRequest.paramAsBoolean("verbose", false));
55-
return channel -> client.admin()
56-
.cluster()
57-
.simulatePipeline(request, ActionListener.withRef(new RestToXContentListener<>(channel), content));
53+
return channel -> client.admin().cluster().simulatePipeline(request, new RestToXContentListener<>(channel));
5854
}
5955
}

server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
import org.elasticsearch.xcontent.XContentType;
1717

1818
import java.io.IOException;
19-
import java.nio.charset.StandardCharsets;
2019

20+
import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonSimulatePipelineRequest;
2121
import static org.hamcrest.CoreMatchers.equalTo;
2222

2323
public class SimulatePipelineRequestTests extends ESTestCase {
2424

2525
public void testSerialization() throws IOException {
26-
SimulatePipelineRequest request = new SimulatePipelineRequest(new BytesArray(""), XContentType.JSON);
26+
SimulatePipelineRequest request = jsonSimulatePipelineRequest(new BytesArray(""));
2727
// Sometimes we set an id
2828
if (randomBoolean()) {
2929
request.setId(randomAlphaOfLengthBetween(1, 10));
@@ -44,10 +44,7 @@ public void testSerialization() throws IOException {
4444
}
4545

4646
public void testSerializationWithXContent() throws IOException {
47-
SimulatePipelineRequest request = new SimulatePipelineRequest(
48-
new BytesArray("{}".getBytes(StandardCharsets.UTF_8)),
49-
XContentType.JSON
50-
);
47+
SimulatePipelineRequest request = jsonSimulatePipelineRequest("{}");
5148
assertEquals(XContentType.JSON, request.getXContentType());
5249

5350
BytesStreamOutput output = new BytesStreamOutput();

test/framework/src/main/java/org/elasticsearch/ingest/IngestPipelineTestUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
import org.elasticsearch.action.ingest.DeletePipelineTransportAction;
1515
import org.elasticsearch.action.ingest.PutPipelineRequest;
1616
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
17+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
1718
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1819
import org.elasticsearch.client.internal.ElasticsearchClient;
1920
import org.elasticsearch.common.Strings;
2021
import org.elasticsearch.common.bytes.BytesArray;
2122
import org.elasticsearch.common.bytes.BytesReference;
23+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2224
import org.elasticsearch.logging.LogManager;
2325
import org.elasticsearch.logging.Logger;
2426
import org.elasticsearch.test.ESTestCase;
@@ -124,4 +126,18 @@ public void onFailure(Exception e) {
124126
);
125127
}
126128
}
129+
130+
/**
131+
* Construct a new {@link SimulatePipelineRequest} whose content is the given JSON document, represented as a {@link String}.
132+
*/
133+
public static SimulatePipelineRequest jsonSimulatePipelineRequest(String jsonString) {
134+
return jsonSimulatePipelineRequest(new BytesArray(jsonString));
135+
}
136+
137+
/**
138+
* Construct a new {@link SimulatePipelineRequest} whose content is the given JSON document, represented as a {@link BytesReference}.
139+
*/
140+
public static SimulatePipelineRequest jsonSimulatePipelineRequest(BytesReference jsonBytes) {
141+
return new SimulatePipelineRequest(ReleasableBytesReference.wrap(jsonBytes), XContentType.JSON);
142+
}
127143
}

x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichProcessorIT.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99

1010
import org.elasticsearch.action.index.IndexRequest;
1111
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
12-
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
1312
import org.elasticsearch.action.support.WriteRequest;
14-
import org.elasticsearch.common.bytes.BytesArray;
1513
import org.elasticsearch.common.settings.Settings;
1614
import org.elasticsearch.ingest.common.IngestCommonPlugin;
1715
import org.elasticsearch.plugins.Plugin;
@@ -27,6 +25,7 @@
2725
import java.util.Collection;
2826
import java.util.List;
2927

28+
import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonSimulatePipelineRequest;
3029
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
3130
import static org.hamcrest.Matchers.equalTo;
3231
import static org.hamcrest.Matchers.nullValue;
@@ -90,7 +89,7 @@ public void testEnrichCacheValuesCannotBeCorrupted() {
9089
var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName);
9190
client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
9291

93-
var simulatePipelineRequest = new SimulatePipelineRequest(new BytesArray("""
92+
var simulatePipelineRequest = jsonSimulatePipelineRequest("""
9493
{
9594
"pipeline": {
9695
"processors": [
@@ -119,7 +118,7 @@ public void testEnrichCacheValuesCannotBeCorrupted() {
119118
}
120119
]
121120
}
122-
"""), XContentType.JSON);
121+
""");
123122
var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet();
124123
var result = (SimulateDocumentBaseResult) response.getResults().get(0);
125124
assertThat(result.getFailure(), nullValue());
@@ -132,7 +131,7 @@ public void testEnrichCacheValuesCannotBeCorrupted() {
132131
assertThat(statsResponse.getCacheStats().get(0).misses(), equalTo(1L));
133132
assertThat(statsResponse.getCacheStats().get(0).hits(), equalTo(0L));
134133

135-
simulatePipelineRequest = new SimulatePipelineRequest(new BytesArray("""
134+
simulatePipelineRequest = jsonSimulatePipelineRequest("""
136135
{
137136
"pipeline": {
138137
"processors": [
@@ -155,7 +154,7 @@ public void testEnrichCacheValuesCannotBeCorrupted() {
155154
}
156155
]
157156
}
158-
"""), XContentType.JSON);
157+
""");
159158
response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet();
160159
result = (SimulateDocumentBaseResult) response.getResults().get(0);
161160
assertThat(result.getFailure(), nullValue());

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/license/MachineLearningLicensingIT.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,12 @@
1111
import org.elasticsearch.action.index.IndexRequest;
1212
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
1313
import org.elasticsearch.action.ingest.SimulatePipelineAction;
14-
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
1514
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
1615
import org.elasticsearch.action.search.SearchRequest;
1716
import org.elasticsearch.action.support.PlainActionFuture;
1817
import org.elasticsearch.action.support.WriteRequest;
1918
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2019
import org.elasticsearch.cluster.ClusterState;
21-
import org.elasticsearch.common.bytes.BytesArray;
2220
import org.elasticsearch.common.settings.Settings;
2321
import org.elasticsearch.core.Strings;
2422
import org.elasticsearch.core.TimeValue;
@@ -61,13 +59,13 @@
6159
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
6260
import org.junit.Before;
6361

64-
import java.nio.charset.StandardCharsets;
6562
import java.util.Collections;
6663
import java.util.HashMap;
6764
import java.util.List;
6865
import java.util.Map;
6966
import java.util.Set;
7067

68+
import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonSimulatePipelineRequest;
7169
import static org.hamcrest.Matchers.containsString;
7270
import static org.hamcrest.Matchers.empty;
7371
import static org.hamcrest.Matchers.hasItem;
@@ -541,11 +539,7 @@ public void testMachineLearningCreateInferenceProcessorRestricted() {
541539
}}]
542540
}""", pipeline);
543541
PlainActionFuture<SimulatePipelineResponse> simulatePipelineListener = new PlainActionFuture<>();
544-
client().execute(
545-
SimulatePipelineAction.INSTANCE,
546-
new SimulatePipelineRequest(new BytesArray(simulateSource.getBytes(StandardCharsets.UTF_8)), XContentType.JSON),
547-
simulatePipelineListener
548-
);
542+
client().execute(SimulatePipelineAction.INSTANCE, jsonSimulatePipelineRequest(simulateSource), simulatePipelineListener);
549543

550544
assertThat(simulatePipelineListener.actionGet().getResults(), is(not(empty())));
551545

@@ -575,7 +569,7 @@ public void testMachineLearningCreateInferenceProcessorRestricted() {
575569
// Simulating the pipeline should fail
576570
SimulateDocumentBaseResult simulateResponse = (SimulateDocumentBaseResult) client().execute(
577571
SimulatePipelineAction.INSTANCE,
578-
new SimulatePipelineRequest(new BytesArray(simulateSource.getBytes(StandardCharsets.UTF_8)), XContentType.JSON)
572+
jsonSimulatePipelineRequest(simulateSource)
579573
).actionGet().getResults().get(0);
580574
assertThat(simulateResponse.getFailure(), is(not(nullValue())));
581575
assertThat((simulateResponse.getFailure()).getCause(), is(instanceOf(ElasticsearchSecurityException.class)));
@@ -588,11 +582,7 @@ public void testMachineLearningCreateInferenceProcessorRestricted() {
588582
putJsonPipeline("test_infer_license_pipeline", pipeline);
589583

590584
PlainActionFuture<SimulatePipelineResponse> simulatePipelineListenerNewLicense = new PlainActionFuture<>();
591-
client().execute(
592-
SimulatePipelineAction.INSTANCE,
593-
new SimulatePipelineRequest(new BytesArray(simulateSource.getBytes(StandardCharsets.UTF_8)), XContentType.JSON),
594-
simulatePipelineListenerNewLicense
595-
);
585+
client().execute(SimulatePipelineAction.INSTANCE, jsonSimulatePipelineRequest(simulateSource), simulatePipelineListenerNewLicense);
596586

597587
assertThat(simulatePipelineListenerNewLicense.actionGet().getResults(), is(not(empty())));
598588

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.node.DiscoveryNode;
2222
import org.elasticsearch.cluster.service.ClusterService;
2323
import org.elasticsearch.common.bytes.BytesReference;
24+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2425
import org.elasticsearch.common.logging.HeaderWarning;
2526
import org.elasticsearch.common.settings.Settings;
2627
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -282,7 +283,10 @@ private void getPreview(
282283
builder.startObject();
283284
builder.field("docs", results);
284285
builder.endObject();
285-
var pipelineRequest = new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON);
286+
var pipelineRequest = new SimulatePipelineRequest(
287+
ReleasableBytesReference.wrap(BytesReference.bytes(builder)),
288+
XContentType.JSON
289+
);
286290
pipelineRequest.setId(pipeline);
287291
parentTaskClient.execute(SimulatePipelineAction.INSTANCE, pipelineRequest, pipelineResponseActionListener);
288292
}

0 commit comments

Comments
 (0)