@@ -555,7 +555,7 @@ public void putPipeline(
555
555
556
556
taskQueue .submitTask (
557
557
"put-pipeline-" + request .getId (),
558
- new PutPipelineClusterStateUpdateTask (projectId , l , request , Instant :: now ),
558
+ new PutPipelineClusterStateUpdateTask (projectId , l , request ),
559
559
request .masterNodeTimeout ()
560
560
);
561
561
}));
@@ -580,8 +580,8 @@ public static void validateNoSystemPropertiesInPipelineConfig(final Map<String,
580
580
}
581
581
582
582
/** Check whether updating a potentially existing pipeline will be a NOP.
583
- * Will return <code>false</code> if request contains system-properties like `{ created,modified}_date ,
584
- * these should be rejected later.` */
583
+ * Will return <code>false</code> if request contains system-properties like created or modified_date ,
584
+ * these should be rejected later.*/
585
585
public static boolean isNoOpPipelineUpdate (ProjectMetadata metadata , PutPipelineRequest request ) {
586
586
IngestMetadata currentIngestMetadata = metadata .custom (IngestMetadata .TYPE );
587
587
if (request .getVersion () == null
@@ -700,6 +700,7 @@ public static class PutPipelineClusterStateUpdateTask extends PipelineClusterSta
700
700
private final PutPipelineRequest request ;
701
701
private final InstantSource instantSource ;
702
702
703
+ // constructor allowing for injection of InstantSource/time for testing
703
704
PutPipelineClusterStateUpdateTask (
704
705
final ProjectId projectId ,
705
706
final ActionListener <AcknowledgedResponse > listener ,
@@ -711,19 +712,27 @@ public static class PutPipelineClusterStateUpdateTask extends PipelineClusterSta
711
712
this .instantSource = instantSource ;
712
713
}
713
714
715
+ PutPipelineClusterStateUpdateTask (
716
+ final ProjectId projectId ,
717
+ final ActionListener <AcknowledgedResponse > listener ,
718
+ final PutPipelineRequest request
719
+ ) {
720
+ this (projectId , listener , request , Instant ::now );
721
+ }
722
+
714
723
/**
715
724
* Used by {@link org.elasticsearch.action.ingest.ReservedPipelineAction}
716
725
*/
717
726
public PutPipelineClusterStateUpdateTask (ProjectId projectId , PutPipelineRequest request ) {
718
- this (projectId , null , request , Instant :: now );
727
+ this (projectId , null , request );
719
728
}
720
729
721
730
@ Override
722
731
public IngestMetadata execute (IngestMetadata currentIngestMetadata , Collection <IndexMetadata > allIndexMetadata ) {
723
- final Map <String , PipelineConfiguration > existingPipelines = currentIngestMetadata == null
732
+ final Map <String , PipelineConfiguration > pipelines = currentIngestMetadata == null
724
733
? new HashMap <>(1 )
725
734
: new HashMap <>(currentIngestMetadata .getPipelines ());
726
- final PipelineConfiguration existingPipeline = existingPipelines .get (request .getId ());
735
+ final PipelineConfiguration existingPipeline = pipelines .get (request .getId ());
727
736
final Map <String , Object > newPipelineConfig = XContentHelper .convertToMap (request .getSource (), true , request .getXContentType ())
728
737
.v2 ();
729
738
@@ -785,8 +794,8 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
785
794
}
786
795
newPipelineConfig .put (Pipeline .MODIFIED_DATE_KEY , iso8601WithMillisNow );
787
796
788
- existingPipelines .put (request .getId (), new PipelineConfiguration (request .getId (), newPipelineConfig ));
789
- return new IngestMetadata (existingPipelines );
797
+ pipelines .put (request .getId (), new PipelineConfiguration (request .getId (), newPipelineConfig ));
798
+ return new IngestMetadata (pipelines );
790
799
}
791
800
}
792
801
0 commit comments