Skip to content

Commit 8a7f522

Browse files
authored
Pipelines: Add created_date and modified_date (#130847)
Add new system-managed properties to pipelines to allow for better tracking of changes.
1 parent d2e75cc commit 8a7f522

File tree

14 files changed

+422
-44
lines changed

14 files changed

+422
-44
lines changed

docs/changelog/130847.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 130847
2+
summary: "Pipelines: Add `created_date` and `modified_date`"
3+
area: Ingest Node
4+
type: enhancement
5+
issues: []
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
setup:
2+
- requires:
3+
test_runner_features: capabilities
4+
capabilities:
5+
- method: PUT
6+
path: /_ingest/pipeline/{id}
7+
capabilities: [ pipeline_tracking_info ]
8+
reason: "Pipelines have tracking info: modified_date and created_date"
9+
10+
---
11+
"Test creating and getting pipeline returns created_date and modified_date":
12+
- do:
13+
ingest.put_pipeline:
14+
id: "my_pipeline"
15+
body: >
16+
{
17+
"processors": []
18+
}
19+
- match: { acknowledged: true }
20+
21+
- do:
22+
ingest.get_pipeline:
23+
human: true
24+
id: "my_pipeline"
25+
- gte: { my_pipeline.created_date_millis: 0 }
26+
- gte: { my_pipeline.modified_date_millis: 0 }
27+
- match: { my_pipeline.created_date: "/^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$/" }
28+
- match: { my_pipeline.modified_date: "/^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$/" }
29+
30+
---
31+
"Test PUT setting created_date":
32+
- do:
33+
catch: bad_request
34+
ingest.put_pipeline:
35+
id: "my_pipeline"
36+
body: >
37+
{
38+
"processors": [],
39+
"created_date": "2025-07-04T12:50:48.415Z"
40+
}
41+
- match: { status: 400 }
42+
- match: { error.reason: "Provided a pipeline property which is managed by the system: created_date." }
43+
44+
---
45+
"Test PUT setting created_date_millis":
46+
- do:
47+
catch: bad_request
48+
ingest.put_pipeline:
49+
id: "my_pipeline"
50+
body: >
51+
{
52+
"processors": [],
53+
"created_date_millis": 0
54+
}
55+
- match: { status: 400 }
56+
- match: { error.reason: "Provided a pipeline property which is managed by the system: created_date_millis." }
57+
58+
---
59+
"Test PUT setting modified_date_millis":
60+
- do:
61+
catch: bad_request
62+
ingest.put_pipeline:
63+
id: "my_pipeline"
64+
body: >
65+
{
66+
"processors": [],
67+
"modified_date_millis": 0
68+
}
69+
- match: { status: 400 }
70+
- match: { error.reason: "Provided a pipeline property which is managed by the system: modified_date_millis." }
71+
72+
---
73+
"Test PUT setting modified_date":
74+
- do:
75+
catch: bad_request
76+
ingest.put_pipeline:
77+
id: "my_pipeline"
78+
body: >
79+
{
80+
"processors": [],
81+
"modified_date": "2025-07-04T12:50:48.415Z"
82+
}
83+
- match: { status: 400 }
84+
- match: { error.reason: "Provided a pipeline property which is managed by the system: modified_date." }

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/simulate.ingest/10_basic.yml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,3 +748,42 @@ setup:
748748
- match: { docs.1.doc._index: "index-2" }
749749
- match: { docs.1.doc._source.foo: "rab" }
750750
- match: { docs.1.doc.executed_pipelines: ["my-pipeline"] }
751+
752+
---
753+
"Test simulate with pipeline with created_date":
754+
- requires:
755+
test_runner_features: capabilities
756+
capabilities:
757+
- method: PUT
758+
path: /_ingest/pipeline/{id}
759+
capabilities: [ pipeline_tracking_info ]
760+
reason: "Pipelines have tracking info: modified_date and created_date"
761+
- requires:
762+
test_runner_features: contains
763+
- skip:
764+
features: headers
765+
- do:
766+
catch: request
767+
headers:
768+
Content-Type: application/json
769+
simulate.ingest:
770+
pipeline: "my_pipeline"
771+
body: >
772+
{
773+
"docs": [
774+
{
775+
"_index": "index-1",
776+
"_source": {
777+
"foo": "bar"
778+
}
779+
}
780+
],
781+
"pipeline_substitutions": {
782+
"my_pipeline": {
783+
"processors": [],
784+
"created_date": "asd"
785+
}
786+
}
787+
}
788+
- match: { status: 500 }
789+
- contains: { error.reason: "Provided a pipeline property which is managed by the system: created_date." }

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ static TransportVersion def(int id) {
352352
public static final TransportVersion ESQL_TOPN_TIMINGS = def(9_128_0_00);
353353
public static final TransportVersion NODE_WEIGHTS_ADDED_TO_NODE_BALANCE_STATS = def(9_129_0_00);
354354
public static final TransportVersion RERANK_SNIPPETS = def(9_130_0_00);
355+
public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_131_0_00);
355356

356357
/*
357358
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.Strings;
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.core.UpdateForV10;
16+
import org.elasticsearch.ingest.Pipeline;
1617
import org.elasticsearch.ingest.PipelineConfiguration;
1718
import org.elasticsearch.rest.RestStatus;
1819
import org.elasticsearch.xcontent.ToXContentObject;
@@ -74,7 +75,25 @@ public RestStatus status() {
7475
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
7576
builder.startObject();
7677
for (PipelineConfiguration pipeline : pipelines) {
77-
builder.field(pipeline.getId(), summary ? Map.of() : pipeline.getConfig());
78+
builder.startObject(pipeline.getId());
79+
for (final Map.Entry<String, Object> configProperty : (summary ? Map.<String, Object>of() : pipeline.getConfig()).entrySet()) {
80+
if (Pipeline.CREATED_DATE_MILLIS.equals(configProperty.getKey())) {
81+
builder.timestampFieldsFromUnixEpochMillis(
82+
Pipeline.CREATED_DATE_MILLIS,
83+
Pipeline.CREATED_DATE,
84+
(Long) configProperty.getValue()
85+
);
86+
} else if (Pipeline.MODIFIED_DATE_MILLIS.equals(configProperty.getKey())) {
87+
builder.timestampFieldsFromUnixEpochMillis(
88+
Pipeline.MODIFIED_DATE_MILLIS,
89+
Pipeline.MODIFIED_DATE,
90+
(Long) configProperty.getValue()
91+
);
92+
} else {
93+
builder.field(configProperty.getKey(), configProperty.getValue());
94+
}
95+
}
96+
builder.endObject();
7897
}
7998
builder.endObject();
8099
return builder;

server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ static void executeDocument(
4949
pipeline.getMetadata(),
5050
verbosePipelineProcessor,
5151
pipeline.getFieldAccessPattern(),
52-
pipeline.getDeprecated()
52+
pipeline.getDeprecated(),
53+
pipeline.getCreatedDateMillis().orElse(null),
54+
pipeline.getModifiedDateMillis().orElse(null)
5355
);
5456
ingestDocument.executePipeline(verbosePipeline, (result, e) -> {
5557
handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 69 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
5050
import org.elasticsearch.common.Priority;
5151
import org.elasticsearch.common.TriConsumer;
52-
import org.elasticsearch.common.bytes.BytesReference;
5352
import org.elasticsearch.common.collect.ImmutableOpenMap;
5453
import org.elasticsearch.common.logging.DeprecationCategory;
5554
import org.elasticsearch.common.logging.DeprecationLogger;
@@ -78,9 +77,9 @@
7877
import org.elasticsearch.script.ScriptService;
7978
import org.elasticsearch.threadpool.Scheduler;
8079
import org.elasticsearch.threadpool.ThreadPool;
81-
import org.elasticsearch.xcontent.XContentBuilder;
8280

83-
import java.io.IOException;
81+
import java.time.Instant;
82+
import java.time.InstantSource;
8483
import java.util.ArrayList;
8584
import java.util.Collection;
8685
import java.util.Collections;
@@ -569,16 +568,36 @@ public void validatePipelineRequest(ProjectId projectId, PutPipelineRequest requ
569568
validatePipeline(ingestInfos, projectId, request.getId(), config);
570569
}
571570

571+
public static void validateNoSystemPropertiesInPipelineConfig(final Map<String, Object> pipelineConfig) {
572+
if (pipelineConfig.containsKey(Pipeline.CREATED_DATE_MILLIS)) {
573+
throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: created_date_millis.");
574+
} else if (pipelineConfig.containsKey(Pipeline.CREATED_DATE)) {
575+
throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: created_date.");
576+
} else if (pipelineConfig.containsKey(Pipeline.MODIFIED_DATE_MILLIS)) {
577+
throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: modified_date_millis.");
578+
} else if (pipelineConfig.containsKey(Pipeline.MODIFIED_DATE)) {
579+
throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: modified_date.");
580+
}
581+
}
582+
583+
/** Check whether updating a potentially existing pipeline will be a NOP.
584+
* Will return <code>false</code> if request contains system-properties like created or modified_date,
585+
* these should be rejected later.*/
572586
public static boolean isNoOpPipelineUpdate(ProjectMetadata metadata, PutPipelineRequest request) {
573587
IngestMetadata currentIngestMetadata = metadata.custom(IngestMetadata.TYPE);
574588
if (request.getVersion() == null
575589
&& currentIngestMetadata != null
576590
&& currentIngestMetadata.getPipelines().containsKey(request.getId())) {
577-
var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
578-
var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
579-
if (currentPipeline.getConfig().equals(pipelineConfig)) {
580-
return true;
581-
}
591+
592+
var newPipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
593+
594+
Map<String, Object> currentConfigWithoutSystemProps = new HashMap<>(
595+
currentIngestMetadata.getPipelines().get(request.getId()).getConfig()
596+
);
597+
currentConfigWithoutSystemProps.remove(Pipeline.CREATED_DATE_MILLIS);
598+
currentConfigWithoutSystemProps.remove(Pipeline.MODIFIED_DATE_MILLIS);
599+
600+
return newPipelineConfig.equals(currentConfigWithoutSystemProps);
582601
}
583602

584603
return false;
@@ -676,10 +695,26 @@ private static void collectProcessorMetrics(
676695
*/
677696
public static class PutPipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask {
678697
private final PutPipelineRequest request;
679-
680-
PutPipelineClusterStateUpdateTask(ProjectId projectId, ActionListener<AcknowledgedResponse> listener, PutPipelineRequest request) {
698+
private final InstantSource instantSource;
699+
700+
// constructor allowing for injection of InstantSource/time for testing
701+
PutPipelineClusterStateUpdateTask(
702+
final ProjectId projectId,
703+
final ActionListener<AcknowledgedResponse> listener,
704+
final PutPipelineRequest request,
705+
final InstantSource instantSource
706+
) {
681707
super(projectId, listener);
682708
this.request = request;
709+
this.instantSource = instantSource;
710+
}
711+
712+
PutPipelineClusterStateUpdateTask(
713+
final ProjectId projectId,
714+
final ActionListener<AcknowledgedResponse> listener,
715+
final PutPipelineRequest request
716+
) {
717+
this(projectId, listener, request, Instant::now);
683718
}
684719

685720
/**
@@ -691,10 +726,15 @@ public PutPipelineClusterStateUpdateTask(ProjectId projectId, PutPipelineRequest
691726

692727
@Override
693728
public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<IndexMetadata> allIndexMetadata) {
694-
BytesReference pipelineSource = request.getSource();
729+
final Map<String, PipelineConfiguration> pipelines = currentIngestMetadata == null
730+
? new HashMap<>(1)
731+
: new HashMap<>(currentIngestMetadata.getPipelines());
732+
final PipelineConfiguration existingPipeline = pipelines.get(request.getId());
733+
final Map<String, Object> newPipelineConfig = XContentHelper.convertToMap(request.getSource(), true, request.getXContentType())
734+
.v2();
735+
695736
if (request.getVersion() != null) {
696-
var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.getId()) : null;
697-
if (currentPipeline == null) {
737+
if (existingPipeline == null) {
698738
throw new IllegalArgumentException(
699739
String.format(
700740
Locale.ROOT,
@@ -705,7 +745,7 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
705745
);
706746
}
707747

708-
final Integer currentVersion = currentPipeline.getVersion();
748+
final Integer currentVersion = existingPipeline.getVersion();
709749
if (Objects.equals(request.getVersion(), currentVersion) == false) {
710750
throw new IllegalArgumentException(
711751
String.format(
@@ -718,9 +758,8 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
718758
);
719759
}
720760

721-
var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
722-
final Integer specifiedVersion = (Integer) pipelineConfig.get("version");
723-
if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
761+
final Integer specifiedVersion = (Integer) newPipelineConfig.get("version");
762+
if (newPipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
724763
throw new IllegalArgumentException(
725764
String.format(
726765
Locale.ROOT,
@@ -733,24 +772,24 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
733772

734773
// if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1]
735774
if (specifiedVersion == null) {
736-
pipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1);
737-
try {
738-
var builder = XContentBuilder.builder(request.getXContentType().xContent()).map(pipelineConfig);
739-
pipelineSource = BytesReference.bytes(builder);
740-
} catch (IOException e) {
741-
throw new IllegalStateException(e);
742-
}
775+
newPipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1);
743776
}
744777
}
745778

746-
Map<String, PipelineConfiguration> pipelines;
747-
if (currentIngestMetadata != null) {
748-
pipelines = new HashMap<>(currentIngestMetadata.getPipelines());
779+
final long nowMillis = instantSource.millis();
780+
if (existingPipeline == null) {
781+
newPipelineConfig.put(Pipeline.CREATED_DATE_MILLIS, nowMillis);
749782
} else {
750-
pipelines = new HashMap<>();
783+
Object existingCreatedAt = existingPipeline.getConfig().get(Pipeline.CREATED_DATE_MILLIS);
784+
// only set/carry over `created_date` if existing pipeline already has it.
785+
// would be confusing if existing pipelines were all updated to have `created_date` set to now.
786+
if (existingCreatedAt != null) {
787+
newPipelineConfig.put(Pipeline.CREATED_DATE_MILLIS, existingCreatedAt);
788+
}
751789
}
790+
newPipelineConfig.put(Pipeline.MODIFIED_DATE_MILLIS, nowMillis);
752791

753-
pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType()));
792+
pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), newPipelineConfig));
754793
return new IngestMetadata(pipelines);
755794
}
756795
}
@@ -762,6 +801,7 @@ void validatePipeline(
762801
String pipelineId,
763802
Map<String, Object> pipelineConfig
764803
) throws Exception {
804+
validateNoSystemPropertiesInPipelineConfig(pipelineConfig);
765805
if (ingestInfos.isEmpty()) {
766806
throw new IllegalStateException("Ingest info is empty");
767807
}

0 commit comments

Comments
 (0)