4949import org .elasticsearch .cluster .service .MasterServiceTaskQueue ;
5050import org .elasticsearch .common .Priority ;
5151import org .elasticsearch .common .TriConsumer ;
52- import org .elasticsearch .common .bytes .BytesReference ;
5352import org .elasticsearch .common .collect .ImmutableOpenMap ;
5453import org .elasticsearch .common .logging .DeprecationCategory ;
5554import org .elasticsearch .common .logging .DeprecationLogger ;
7877import org .elasticsearch .script .ScriptService ;
7978import org .elasticsearch .threadpool .Scheduler ;
8079import org .elasticsearch .threadpool .ThreadPool ;
81- import org .elasticsearch .xcontent .XContentBuilder ;
8280
83- import java .io .IOException ;
8481import java .time .Instant ;
82+ import java .time .InstantSource ;
83+ import java .time .format .DateTimeFormatter ;
84+ import java .time .format .DateTimeFormatterBuilder ;
8585import java .time .temporal .ChronoUnit ;
8686import java .util .ArrayList ;
8787import java .util .Collection ;
104104import java .util .function .Function ;
105105import java .util .function .IntConsumer ;
106106import java .util .function .Predicate ;
107- import java .util .function .Supplier ;
108107import java .util .stream .Collectors ;
109108
110109import static org .elasticsearch .core .Strings .format ;
@@ -545,9 +544,7 @@ public void putPipeline(
545544 ActionListener <AcknowledgedResponse > listener ,
546545 Consumer <ActionListener <NodesInfoResponse >> nodeInfoListener
547546 ) throws Exception {
548- Map <String , Object > newPipelineConfig = readPipelineConfig (request );
549- validateNoSystemPropertiesInPipelineConfig (newPipelineConfig );
550- if (isNoOpPipelineUpdate (state .metadata ().getProject (projectId ), request , () -> newPipelineConfig )) {
547+ if (isNoOpPipelineUpdate (state .metadata ().getProject (projectId ), request )) {
551548 // existing pipeline matches request pipeline -- no need to update
552549 listener .onResponse (AcknowledgedResponse .TRUE );
553550 return ;
@@ -558,7 +555,7 @@ public void putPipeline(
558555
559556 taskQueue .submitTask (
560557 "put-pipeline-" + request .getId (),
561- new PutPipelineClusterStateUpdateTask (projectId , l , request ),
558+ new PutPipelineClusterStateUpdateTask (projectId , l , request , Instant :: now ),
562559 request .masterNodeTimeout ()
563560 );
564561 }));
@@ -574,10 +571,6 @@ public void validatePipelineRequest(ProjectId projectId, PutPipelineRequest requ
574571 validatePipeline (ingestInfos , projectId , request .getId (), config );
575572 }
576573
577- public static Map <String , Object > readPipelineConfig (PutPipelineRequest request ) {
578- return XContentHelper .convertToMap (request .getSource (), false , request .getXContentType ()).v2 ();
579- }
580-
581574 public static void validateNoSystemPropertiesInPipelineConfig (final Map <String , Object > pipelineConfig ) {
582575 if (pipelineConfig .containsKey (Pipeline .CREATED_DATE_KEY )) {
583576 throw new ElasticsearchParseException ("Provided a pipeline property which is managed by the system: created_date." );
@@ -586,24 +579,23 @@ public static void validateNoSystemPropertiesInPipelineConfig(final Map<String,
586579 }
587580 }
588581
589- public static boolean isNoOpPipelineUpdate (
590- ProjectMetadata metadata ,
591- PutPipelineRequest request ,
592- Supplier <Map <String , Object >> newPipelineConfigSupplier
593- ) {
582+ /** Check whether updating a potentially existing pipeline will be a NOP.
583+ * Will return <code>false</code> if request contains system-properties like `{created,modified}_date,
584+ * these should be rejected later.`*/
585+ public static boolean isNoOpPipelineUpdate (ProjectMetadata metadata , PutPipelineRequest request ) {
594586 IngestMetadata currentIngestMetadata = metadata .custom (IngestMetadata .TYPE );
595587 if (request .getVersion () == null
596588 && currentIngestMetadata != null
597589 && currentIngestMetadata .getPipelines ().containsKey (request .getId ())) {
598590
591+ var newPipelineConfig = XContentHelper .convertToMap (request .getSource (), false , request .getXContentType ()).v2 ();
592+
599593 Map <String , Object > currentConfigWithoutSystemProps = new HashMap <>(
600594 currentIngestMetadata .getPipelines ().get (request .getId ()).getConfig ()
601595 );
602596 currentConfigWithoutSystemProps .remove (Pipeline .CREATED_DATE_KEY );
603597 currentConfigWithoutSystemProps .remove (Pipeline .MODIFIED_DATE_KEY );
604598
605- Map <String , Object > newPipelineConfig = newPipelineConfigSupplier .get ();
606-
607599 return newPipelineConfig .equals (currentConfigWithoutSystemProps );
608600 }
609601
@@ -701,26 +693,42 @@ private static void collectProcessorMetrics(
701693 * Used in this class and externally by the {@link org.elasticsearch.action.ingest.ReservedPipelineAction}
702694 */
703695 public static class PutPipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask {
696+ // always output millis even if instantSource returns millis == 0
697+ private static final DateTimeFormatter ISO8601_WITH_MILLIS_FORMATTER = new DateTimeFormatterBuilder ().appendInstant (3 )
698+ .toFormatter (Locale .ROOT );
699+
704700 private final PutPipelineRequest request ;
701+ private final InstantSource instantSource ;
705702
706- PutPipelineClusterStateUpdateTask (ProjectId projectId , ActionListener <AcknowledgedResponse > listener , PutPipelineRequest request ) {
703+ PutPipelineClusterStateUpdateTask (
704+ final ProjectId projectId ,
705+ final ActionListener <AcknowledgedResponse > listener ,
706+ final PutPipelineRequest request ,
707+ final InstantSource instantSource
708+ ) {
707709 super (projectId , listener );
708710 this .request = request ;
711+ this .instantSource = instantSource ;
709712 }
710713
711714 /**
712715 * Used by {@link org.elasticsearch.action.ingest.ReservedPipelineAction}
713716 */
714717 public PutPipelineClusterStateUpdateTask (ProjectId projectId , PutPipelineRequest request ) {
715- this (projectId , null , request );
718+ this (projectId , null , request , Instant :: now );
716719 }
717720
718721 @ Override
719722 public IngestMetadata execute (IngestMetadata currentIngestMetadata , Collection <IndexMetadata > allIndexMetadata ) {
720- BytesReference pipelineSource = request .getSource ();
723+ final Map <String , PipelineConfiguration > existingPipelines = currentIngestMetadata == null
724+ ? new HashMap <>(1 )
725+ : new HashMap <>(currentIngestMetadata .getPipelines ());
726+ final PipelineConfiguration existingPipeline = existingPipelines .get (request .getId ());
727+ final Map <String , Object > newPipelineConfig = XContentHelper .convertToMap (request .getSource (), true , request .getXContentType ())
728+ .v2 ();
729+
721730 if (request .getVersion () != null ) {
722- var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata .getPipelines ().get (request .getId ()) : null ;
723- if (currentPipeline == null ) {
731+ if (existingPipeline == null ) {
724732 throw new IllegalArgumentException (
725733 String .format (
726734 Locale .ROOT ,
@@ -731,7 +739,7 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
731739 );
732740 }
733741
734- final Integer currentVersion = currentPipeline .getVersion ();
742+ final Integer currentVersion = existingPipeline .getVersion ();
735743 if (Objects .equals (request .getVersion (), currentVersion ) == false ) {
736744 throw new IllegalArgumentException (
737745 String .format (
@@ -744,9 +752,8 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
744752 );
745753 }
746754
747- var pipelineConfig = XContentHelper .convertToMap (request .getSource (), false , request .getXContentType ()).v2 ();
748- final Integer specifiedVersion = (Integer ) pipelineConfig .get ("version" );
749- if (pipelineConfig .containsKey ("version" ) && Objects .equals (specifiedVersion , currentVersion )) {
755+ final Integer specifiedVersion = (Integer ) newPipelineConfig .get ("version" );
756+ if (newPipelineConfig .containsKey ("version" ) && Objects .equals (specifiedVersion , currentVersion )) {
750757 throw new IllegalArgumentException (
751758 String .format (
752759 Locale .ROOT ,
@@ -759,28 +766,15 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
759766
760767 // if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1]
761768 if (specifiedVersion == null ) {
762- pipelineConfig .put ("version" , request .getVersion () == null ? 1 : request .getVersion () + 1 );
763- try {
764- var builder = XContentBuilder .builder (request .getXContentType ().xContent ()).map (pipelineConfig );
765- pipelineSource = BytesReference .bytes (builder );
766- } catch (IOException e ) {
767- throw new IllegalStateException (e );
768- }
769+ newPipelineConfig .put ("version" , request .getVersion () == null ? 1 : request .getVersion () + 1 );
769770 }
770771 }
771772
772- Map <String , PipelineConfiguration > pipelines ;
773- if (currentIngestMetadata != null ) {
774- pipelines = new HashMap <>(currentIngestMetadata .getPipelines ());
775- } else {
776- pipelines = new HashMap <>();
777- }
778-
779- Instant now = Instant .now ().truncatedTo (ChronoUnit .MILLIS );
780- Map <String , Object > newPipelineConfig = XContentHelper .convertToMap (pipelineSource , true , request .getXContentType ()).v2 ();
781- PipelineConfiguration existingPipeline = pipelines .get (request .getId ());
773+ final String iso8601WithMillisNow = ISO8601_WITH_MILLIS_FORMATTER .format (
774+ instantSource .instant ().truncatedTo (ChronoUnit .MILLIS )
775+ );
782776 if (existingPipeline == null ) {
783- newPipelineConfig .put (Pipeline .CREATED_DATE_KEY , now . toString () );
777+ newPipelineConfig .put (Pipeline .CREATED_DATE_KEY , iso8601WithMillisNow );
784778 } else {
785779 Object existingCreatedAt = existingPipeline .getConfig ().get (Pipeline .CREATED_DATE_KEY );
786780 // only set/carry over `created_date` if existing pipeline already has it.
@@ -789,10 +783,10 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
789783 newPipelineConfig .put (Pipeline .CREATED_DATE_KEY , existingCreatedAt );
790784 }
791785 }
792- newPipelineConfig .put (Pipeline .MODIFIED_DATE_KEY , now . toString () );
786+ newPipelineConfig .put (Pipeline .MODIFIED_DATE_KEY , iso8601WithMillisNow );
793787
794- pipelines .put (request .getId (), new PipelineConfiguration (request .getId (), newPipelineConfig ));
795- return new IngestMetadata (pipelines );
788+ existingPipelines .put (request .getId (), new PipelineConfiguration (request .getId (), newPipelineConfig ));
789+ return new IngestMetadata (existingPipelines );
796790 }
797791 }
798792
@@ -803,6 +797,7 @@ void validatePipeline(
803797 String pipelineId ,
804798 Map <String , Object > pipelineConfig
805799 ) throws Exception {
800+ validateNoSystemPropertiesInPipelineConfig (pipelineConfig );
806801 if (ingestInfos .isEmpty ()) {
807802 throw new IllegalStateException ("Ingest info is empty" );
808803 }
0 commit comments