|
91 | 91 |
|
92 | 92 | import static org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils.executeAndAssertSuccessful;
|
93 | 93 | import static org.elasticsearch.core.Tuple.tuple;
|
| 94 | +import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; |
94 | 95 | import static org.elasticsearch.ingest.IngestService.NOOP_PIPELINE_NAME;
|
95 | 96 | import static org.elasticsearch.ingest.IngestService.hasPipeline;
|
96 | 97 | import static org.hamcrest.Matchers.containsInAnyOrder;
|
@@ -309,6 +310,68 @@ public void testInnerUpdatePipelines() {
|
309 | 310 | assertThat(ingestService.pipelines(), sameInstance(pipelines));
|
310 | 311 | }
|
311 | 312 |
|
| 313 | + public void testInnerUpdatePipelinesValidation() { |
| 314 | + Map<String, Processor.Factory> processors = new HashMap<>(); |
| 315 | + processors.put("fail_validation", (factories, tag, description, config) -> { |
| 316 | + // ordinary validation issues happen at processor construction time |
| 317 | + throw newConfigurationException("fail_validation", tag, "no_property_name", "validation failure reason"); |
| 318 | + }); |
| 319 | + processors.put("fail_extra_validation", (factories, tag, description, config) -> { |
| 320 | + // 'extra validation' issues happen post- processor construction time |
| 321 | + return new FakeProcessor("fail_extra_validation", tag, description, ingestDocument -> {}) { |
| 322 | + @Override |
| 323 | + public void extraValidation() throws Exception { |
| 324 | + throw newConfigurationException("fail_extra_validation", tag, "no_property_name", "extra validation failure reason"); |
| 325 | + } |
| 326 | + }; |
| 327 | + }); |
| 328 | + |
| 329 | + { |
| 330 | + // a processor that fails ordinary validation (i.e. the processor factory throws an exception while constructing it) |
| 331 | + // will result in a placeholder pipeline being substituted |
| 332 | + |
| 333 | + IngestService ingestService = createWithProcessors(processors); |
| 334 | + PipelineConfiguration config = new PipelineConfiguration("_id", new BytesArray(""" |
| 335 | + {"processors": [{"fail_validation" : {}}]}"""), XContentType.JSON); |
| 336 | + IngestMetadata ingestMetadata = new IngestMetadata(Map.of("_id", config)); |
| 337 | + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); |
| 338 | + ClusterState previousClusterState = clusterState; |
| 339 | + clusterState = ClusterState.builder(clusterState) |
| 340 | + .metadata(Metadata.builder().putCustom(IngestMetadata.TYPE, ingestMetadata)) |
| 341 | + .build(); |
| 342 | + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); |
| 343 | + |
| 344 | + Pipeline pipeline = ingestService.getPipeline("_id"); |
| 345 | + assertThat( |
| 346 | + pipeline.getDescription(), |
| 347 | + equalTo("this is a place holder pipeline, because pipeline with id [_id] could not be loaded") |
| 348 | + ); |
| 349 | + } |
| 350 | + |
| 351 | + { |
| 352 | + // a processor that fails extra validation (i.e. an exception is throw from `extraValidation`) |
| 353 | + // will be processed just fine -- extraValidation is for rest/transport validation, not for when |
| 354 | + // a processor is being created from a processor factory |
| 355 | + |
| 356 | + IngestService ingestService = createWithProcessors(processors); |
| 357 | + PipelineConfiguration config = new PipelineConfiguration("_id", new BytesArray(""" |
| 358 | + {"processors": [{"fail_extra_validation" : {}}]}"""), XContentType.JSON); |
| 359 | + IngestMetadata ingestMetadata = new IngestMetadata(Map.of("_id", config)); |
| 360 | + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); |
| 361 | + ClusterState previousClusterState = clusterState; |
| 362 | + clusterState = ClusterState.builder(clusterState) |
| 363 | + .metadata(Metadata.builder().putCustom(IngestMetadata.TYPE, ingestMetadata)) |
| 364 | + .build(); |
| 365 | + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); |
| 366 | + |
| 367 | + Pipeline pipeline = ingestService.getPipeline("_id"); |
| 368 | + assertThat(pipeline.getDescription(), nullValue()); |
| 369 | + assertThat(pipeline.getProcessors().size(), equalTo(1)); |
| 370 | + Processor processor = pipeline.getProcessors().get(0); |
| 371 | + assertThat(processor.getType(), equalTo("fail_extra_validation")); |
| 372 | + } |
| 373 | + } |
| 374 | + |
312 | 375 | public void testDelete() {
|
313 | 376 | IngestService ingestService = createWithProcessors();
|
314 | 377 | PipelineConfiguration config = new PipelineConfiguration("_id", new BytesArray("""
|
@@ -887,7 +950,7 @@ public void testGetPipelines() {
|
887 | 950 | assertThat(pipelines.get(1).getId(), equalTo("_id2"));
|
888 | 951 | }
|
889 | 952 |
|
890 |
| - public void testValidate() throws Exception { |
| 953 | + public void testValidateProcessorTypeOnAllNodes() throws Exception { |
891 | 954 | IngestService ingestService = createWithProcessors();
|
892 | 955 | PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray("""
|
893 | 956 | {
|
@@ -928,6 +991,69 @@ public void testValidate() throws Exception {
|
928 | 991 | ingestService.validatePipeline(ingestInfos, putRequest.getId(), pipelineConfig2);
|
929 | 992 | }
|
930 | 993 |
|
| 994 | + public void testValidateConfigurationExceptions() { |
| 995 | + IngestService ingestService = createWithProcessors(Map.of("fail_validation", (factories, tag, description, config) -> { |
| 996 | + // ordinary validation issues happen at processor construction time |
| 997 | + throw newConfigurationException("fail_validation", tag, "no_property_name", "validation failure reason"); |
| 998 | + })); |
| 999 | + PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(""" |
| 1000 | + { |
| 1001 | + "processors": [ |
| 1002 | + { |
| 1003 | + "fail_validation": { |
| 1004 | + } |
| 1005 | + } |
| 1006 | + ] |
| 1007 | + }"""), XContentType.JSON); |
| 1008 | + var pipelineConfig = XContentHelper.convertToMap(putRequest.getSource(), false, putRequest.getXContentType()).v2(); |
| 1009 | + |
| 1010 | + // other validation actually consults this map, but this validation does not. however, it must not be empty. |
| 1011 | + DiscoveryNode node1 = new DiscoveryNode("_node_id1", buildNewFakeTransportAddress(), Map.of(), Set.of(), Version.CURRENT); |
| 1012 | + Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>(); |
| 1013 | + ingestInfos.put(node1, new IngestInfo(List.of())); |
| 1014 | + |
| 1015 | + ElasticsearchParseException e = expectThrows( |
| 1016 | + ElasticsearchParseException.class, |
| 1017 | + () -> ingestService.validatePipeline(ingestInfos, putRequest.getId(), pipelineConfig) |
| 1018 | + ); |
| 1019 | + assertEquals("[no_property_name] validation failure reason", e.getMessage()); |
| 1020 | + assertEquals("fail_validation", e.getMetadata("es.processor_type").get(0)); |
| 1021 | + } |
| 1022 | + |
| 1023 | + public void testValidateExtraValidationConfigurationExceptions() { |
| 1024 | + IngestService ingestService = createWithProcessors(Map.of("fail_extra_validation", (factories, tag, description, config) -> { |
| 1025 | + // 'extra validation' issues happen post- processor construction time |
| 1026 | + return new FakeProcessor("fail_extra_validation", tag, description, ingestDocument -> {}) { |
| 1027 | + @Override |
| 1028 | + public void extraValidation() throws Exception { |
| 1029 | + throw newConfigurationException("fail_extra_validation", tag, "no_property_name", "extra validation failure reason"); |
| 1030 | + } |
| 1031 | + }; |
| 1032 | + })); |
| 1033 | + PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(""" |
| 1034 | + { |
| 1035 | + "processors": [ |
| 1036 | + { |
| 1037 | + "fail_extra_validation": { |
| 1038 | + } |
| 1039 | + } |
| 1040 | + ] |
| 1041 | + }"""), XContentType.JSON); |
| 1042 | + var pipelineConfig = XContentHelper.convertToMap(putRequest.getSource(), false, putRequest.getXContentType()).v2(); |
| 1043 | + |
| 1044 | + // other validation actually consults this map, but this validation does not. however, it must not be empty. |
| 1045 | + DiscoveryNode node1 = new DiscoveryNode("_node_id1", buildNewFakeTransportAddress(), Map.of(), Set.of(), Version.CURRENT); |
| 1046 | + Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>(); |
| 1047 | + ingestInfos.put(node1, new IngestInfo(List.of())); |
| 1048 | + |
| 1049 | + ElasticsearchParseException e = expectThrows( |
| 1050 | + ElasticsearchParseException.class, |
| 1051 | + () -> ingestService.validatePipeline(ingestInfos, putRequest.getId(), pipelineConfig) |
| 1052 | + ); |
| 1053 | + assertEquals("[no_property_name] extra validation failure reason", e.getMessage()); |
| 1054 | + assertEquals("fail_extra_validation", e.getMetadata("es.processor_type").get(0)); |
| 1055 | + } |
| 1056 | + |
931 | 1057 | public void testExecuteIndexPipelineExistsButFailedParsing() {
|
932 | 1058 | IngestService ingestService = createWithProcessors(
|
933 | 1059 | Map.of("mock", (factories, tag, description, config) -> new AbstractProcessor("mock", "description") {
|
|
0 commit comments