Skip to content

Commit 8607d40

Browse files
authored
Introduce test utils for ingest pipelines (elastic#112733)
Replaces the somewhat-awkward API on `ClusterAdminClient` for manipulating ingest pipelines with some test-specific utilities that are easier to use. Relates elastic#107984 in that this change massively reduces the noise that would otherwise result from removing the trappy timeouts in these APIs.
1 parent e1f7814 commit 8607d40

File tree

39 files changed

+606
-886
lines changed

39 files changed

+606
-886
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919
import org.elasticsearch.action.bulk.FailureStoreMetrics;
2020
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
2121
import org.elasticsearch.action.index.IndexRequest;
22-
import org.elasticsearch.action.ingest.PutPipelineRequest;
2322
import org.elasticsearch.action.support.IndicesOptions;
2423
import org.elasticsearch.action.support.WriteRequest;
2524
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2625
import org.elasticsearch.cluster.metadata.IndexMetadata;
2726
import org.elasticsearch.cluster.metadata.Template;
28-
import org.elasticsearch.common.bytes.BytesArray;
29-
import org.elasticsearch.common.bytes.BytesReference;
3027
import org.elasticsearch.common.compress.CompressedXContent;
3128
import org.elasticsearch.core.Strings;
3229
import org.elasticsearch.index.mapper.DateFieldMapper;
@@ -319,9 +316,7 @@ private void createReroutePipeline(String destination) {
319316
}
320317

321318
private void createPipeline(String processor) {
322-
String pipelineDefinition = Strings.format("{\"processors\": [{%s}]}", processor);
323-
BytesReference bytes = new BytesArray(pipelineDefinition);
324-
clusterAdmin().putPipeline(new PutPipelineRequest(pipeline, bytes, XContentType.JSON)).actionGet();
319+
putJsonPipeline(pipeline, Strings.format("{\"processors\": [{%s}]}", processor));
325320
}
326321

327322
private void indexDocs(String dataStream, int numDocs, String pipeline) {

modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,11 @@
1515
import org.elasticsearch.action.bulk.BulkResponse;
1616
import org.elasticsearch.action.get.MultiGetResponse;
1717
import org.elasticsearch.action.index.IndexRequest;
18-
import org.elasticsearch.action.ingest.PutPipelineRequest;
1918
import org.elasticsearch.action.support.PlainActionFuture;
2019
import org.elasticsearch.action.support.WriteRequest;
2120
import org.elasticsearch.client.internal.Requests;
2221
import org.elasticsearch.cluster.block.ClusterBlockException;
2322
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
24-
import org.elasticsearch.common.bytes.BytesArray;
25-
import org.elasticsearch.common.bytes.BytesReference;
2623
import org.elasticsearch.common.settings.Settings;
2724
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2825
import org.elasticsearch.core.Strings;
@@ -79,7 +76,7 @@ public void testFailureInConditionalProcessor() {
7976
internalCluster().ensureAtLeastNumDataNodes(1);
8077
internalCluster().startMasterOnlyNode();
8178
final String pipelineId = "foo";
82-
clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(Strings.format("""
79+
putJsonPipeline(pipelineId, Strings.format("""
8380
{
8481
"processors": [
8582
{
@@ -99,7 +96,7 @@ public void testFailureInConditionalProcessor() {
9996
}
10097
}
10198
]
102-
}""", MockScriptEngine.NAME)), XContentType.JSON).get();
99+
}""", MockScriptEngine.NAME));
103100

104101
Exception e = expectThrows(
105102
Exception.class,
@@ -126,22 +123,16 @@ public void testScriptDisabled() throws Exception {
126123
String pipelineIdWithScript = pipelineIdWithoutScript + "_script";
127124
internalCluster().startNode();
128125

129-
BytesReference pipelineWithScript = new BytesArray(Strings.format("""
126+
putJsonPipeline(pipelineIdWithScript, Strings.format("""
130127
{
131128
"processors": [ { "script": { "lang": "%s", "source": "my_script" } } ]
132129
}""", MockScriptEngine.NAME));
133-
BytesReference pipelineWithoutScript = new BytesArray("""
130+
putJsonPipeline(pipelineIdWithoutScript, """
134131
{
135132
"processors": [ { "set": { "field": "y", "value": 0 } } ]
136133
}""");
137134

138-
Consumer<String> checkPipelineExists = (id) -> assertThat(
139-
clusterAdmin().prepareGetPipeline(id).get().pipelines().get(0).getId(),
140-
equalTo(id)
141-
);
142-
143-
clusterAdmin().preparePutPipeline(pipelineIdWithScript, pipelineWithScript, XContentType.JSON).get();
144-
clusterAdmin().preparePutPipeline(pipelineIdWithoutScript, pipelineWithoutScript, XContentType.JSON).get();
135+
Consumer<String> checkPipelineExists = (id) -> assertThat(getPipelines(id).pipelines().get(0).getId(), equalTo(id));
145136

146137
checkPipelineExists.accept(pipelineIdWithScript);
147138
checkPipelineExists.accept(pipelineIdWithoutScript);
@@ -197,14 +188,13 @@ public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exceptio
197188
putJsonStoredScript("1", Strings.format("""
198189
{"script": {"lang": "%s", "source": "my_script"} }
199190
""", MockScriptEngine.NAME));
200-
BytesReference pipeline = new BytesArray("""
191+
putJsonPipeline("_id", """
201192
{
202193
"processors" : [
203194
{"set" : {"field": "y", "value": 0}},
204195
{"script" : {"id": "1"}}
205196
]
206197
}""");
207-
clusterAdmin().preparePutPipeline("_id", pipeline, XContentType.JSON).get();
208198

209199
prepareIndex("index").setId("1").setSource("x", 0).setPipeline("_id").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
210200

@@ -232,13 +222,12 @@ public void testWithDedicatedIngestNode() throws Exception {
232222
String node = internalCluster().startNode();
233223
String ingestNode = internalCluster().startNode(onlyRole(DiscoveryNodeRole.INGEST_ROLE));
234224

235-
BytesReference pipeline = new BytesArray("""
225+
putJsonPipeline("_id", """
236226
{
237227
"processors" : [
238228
{"set" : {"field": "y", "value": 0}}
239229
]
240230
}""");
241-
clusterAdmin().preparePutPipeline("_id", pipeline, XContentType.JSON).get();
242231

243232
prepareIndex("index").setId("1").setSource("x", 0).setPipeline("_id").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
244233

@@ -264,7 +253,7 @@ public void testWithDedicatedIngestNode() throws Exception {
264253
public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception {
265254
internalCluster().startNode();
266255

267-
final var pipeline = new BytesArray("""
256+
putJsonPipeline("test_pipeline", """
268257
{
269258
"processors" : [
270259
{
@@ -275,8 +264,8 @@ public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception {
275264
}
276265
]
277266
}""");
267+
278268
final TimeValue timeout = TimeValue.timeValueSeconds(10);
279-
client().admin().cluster().preparePutPipeline("test_pipeline", pipeline, XContentType.JSON).get(timeout);
280269
client().admin().indices().preparePutTemplate("pipeline_template").setPatterns(Collections.singletonList("*")).setSettings("""
281270
{
282271
"index" : {
@@ -357,16 +346,13 @@ public void testForwardBulkWithSystemWritePoolDisabled() throws Exception {
357346
// Create Bulk Request
358347
createIndex("index");
359348

360-
BytesReference source = new BytesArray("""
349+
putJsonPipeline("_id", """
361350
{
362351
"processors" : [
363352
{"set" : {"field": "y", "value": 0}}
364353
]
365354
}""");
366355

367-
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON);
368-
clusterAdmin().putPipeline(putPipelineRequest).get();
369-
370356
int numRequests = scaledRandomIntBetween(32, 128);
371357
BulkRequest bulkRequest = new BulkRequest();
372358
BulkResponse response;

modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/ManyNestedPipelinesIT.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
1616
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
1717
import org.elasticsearch.action.ingest.SimulateProcessorResult;
18-
import org.elasticsearch.common.bytes.BytesArray;
1918
import org.elasticsearch.common.bytes.BytesReference;
2019
import org.elasticsearch.core.Strings;
2120
import org.elasticsearch.ingest.GraphStructureException;
@@ -166,7 +165,7 @@ private void createChainedPipelines(String prefix, int count) {
166165
private void createChainedPipeline(String prefix, int number) {
167166
String pipelineId = prefix + "pipeline_" + number;
168167
String nextPipelineId = prefix + "pipeline_" + (number + 1);
169-
String pipelineTemplate = """
168+
putJsonPipeline(pipelineId, Strings.format("""
170169
{
171170
"processors": [
172171
{
@@ -176,9 +175,7 @@ private void createChainedPipeline(String prefix, int number) {
176175
}
177176
]
178177
}
179-
""";
180-
String pipeline = Strings.format(pipelineTemplate, nextPipelineId);
181-
clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(pipeline), XContentType.JSON).get();
178+
""", nextPipelineId));
182179
}
183180

184181
private void createLastPipeline(String prefix, int number) {
@@ -195,6 +192,6 @@ private void createLastPipeline(String prefix, int number) {
195192
]
196193
}
197194
""";
198-
clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(pipeline), XContentType.JSON).get();
195+
putJsonPipeline(pipelineId, pipeline);
199196
}
200197
}

modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorWithPipelinesIT.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@
1010

1111
import org.elasticsearch.action.DocWriteRequest;
1212
import org.elasticsearch.action.index.IndexRequest;
13-
import org.elasticsearch.action.ingest.PutPipelineRequest;
14-
import org.elasticsearch.common.bytes.BytesArray;
15-
import org.elasticsearch.common.bytes.BytesReference;
1613
import org.elasticsearch.index.mapper.MapperService;
1714
import org.elasticsearch.index.mapper.ParsedDocument;
1815
import org.elasticsearch.ingest.common.IngestCommonPlugin;
@@ -21,7 +18,6 @@
2118
import org.elasticsearch.test.ESIntegTestCase;
2219
import org.elasticsearch.xcontent.FilterXContentParserWrapper;
2320
import org.elasticsearch.xcontent.XContentParser;
24-
import org.elasticsearch.xcontent.XContentType;
2521

2622
import java.io.IOException;
2723
import java.util.Collection;
@@ -44,7 +40,7 @@ public class XContentMeteringParserDecoratorWithPipelinesIT extends ESIntegTestC
4440
public void testDocumentIsReportedWithPipelines() throws Exception {
4541
hasWrappedParser = false;
4642
// pipeline adding fields, changing destination is not affecting reporting
47-
final BytesReference pipelineBody = new BytesArray("""
43+
putJsonPipeline("pipeline", """
4844
{
4945
"processors": [
5046
{
@@ -62,7 +58,6 @@ public void testDocumentIsReportedWithPipelines() throws Exception {
6258
]
6359
}
6460
""");
65-
clusterAdmin().putPipeline(new PutPipelineRequest("pipeline", pipelineBody, XContentType.JSON)).actionGet();
6661

6762
client().index(
6863
new IndexRequest(TEST_INDEX_NAME).setPipeline("pipeline")

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderIT.java

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919
import org.elasticsearch.action.get.GetRequest;
2020
import org.elasticsearch.action.get.GetResponse;
2121
import org.elasticsearch.action.index.IndexRequest;
22-
import org.elasticsearch.action.ingest.PutPipelineRequest;
2322
import org.elasticsearch.action.search.SearchRequest;
2423
import org.elasticsearch.action.search.SearchResponse;
25-
import org.elasticsearch.common.bytes.BytesReference;
2624
import org.elasticsearch.common.settings.MockSecureSettings;
2725
import org.elasticsearch.common.settings.Settings;
2826
import org.elasticsearch.common.util.CollectionUtils;
@@ -36,9 +34,7 @@
3634
import org.elasticsearch.rest.RestStatus;
3735
import org.elasticsearch.test.ESIntegTestCase;
3836
import org.elasticsearch.transport.RemoteTransportException;
39-
import org.elasticsearch.xcontent.XContentBuilder;
4037
import org.elasticsearch.xcontent.XContentType;
41-
import org.elasticsearch.xcontent.json.JsonXContent;
4238
import org.junit.ClassRule;
4339

4440
import java.io.IOException;
@@ -47,7 +43,6 @@
4743

4844
import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER;
4945
import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.MAXMIND_LICENSE_KEY_SETTING;
50-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5146
import static org.hamcrest.Matchers.equalTo;
5247

5348
public class EnterpriseGeoIpDownloaderIT extends ESIntegTestCase {
@@ -155,31 +150,24 @@ private void configureDatabase(String databaseType) throws Exception {
155150
}
156151

157152
private void createGeoIpPipeline(String pipelineName, String databaseType, String sourceField, String targetField) throws IOException {
158-
final BytesReference bytes;
159-
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
160-
builder.startObject();
153+
putJsonPipeline(pipelineName, (builder, params) -> {
154+
builder.field("description", "test");
155+
builder.startArray("processors");
161156
{
162-
builder.field("description", "test");
163-
builder.startArray("processors");
157+
builder.startObject();
164158
{
165-
builder.startObject();
159+
builder.startObject("geoip");
166160
{
167-
builder.startObject("geoip");
168-
{
169-
builder.field("field", sourceField);
170-
builder.field("target_field", targetField);
171-
builder.field("database_file", databaseType + ".mmdb");
172-
}
173-
builder.endObject();
161+
builder.field("field", sourceField);
162+
builder.field("target_field", targetField);
163+
builder.field("database_file", databaseType + ".mmdb");
174164
}
175165
builder.endObject();
176166
}
177-
builder.endArray();
167+
builder.endObject();
178168
}
179-
builder.endObject();
180-
bytes = BytesReference.bytes(builder);
181-
}
182-
assertAcked(clusterAdmin().putPipeline(new PutPipelineRequest(pipelineName, bytes, XContentType.JSON)).actionGet());
169+
return builder.endArray();
170+
});
183171
}
184172

185173
private String ingestDocument(String indexName, String pipelineName, String sourceField) {

0 commit comments

Comments
 (0)