Skip to content

Commit 1a4d206

Browse files
ingest pipeline docs "ingest lag" (#1672)
#1381 relates to this, we pulled this file out as it's standalone for better review --------- Co-authored-by: Fabrizio Ferri-Benedetti <[email protected]>
1 parent 85eccf4 commit 1a4d206

File tree

3 files changed

+329
-2
lines changed

3 files changed

+329
-2
lines changed

manage-data/ingest/transform-enrich.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ Note that you can also perform transforms on existing {{es}} indices to pivot da
2525
: You can use [{{agent}} processors](/reference/fleet/agent-processors.md) to sanitize or enrich raw data at the source. Use {{agent}} processors if you need to control what data is sent across the wire, or if you need to enrich the raw data with information available on the host.
2626

2727
{{es}} ingest pipelines
28-
: You can use [{{es}} ingest pipelines](transform-enrich/ingest-pipelines.md) to enrich incoming data or normalize field data before the data is indexed. {{es}} ingest pipelines enable you to manipulate the data as it comes in. This approach helps you avoid adding processing overhead to the hosts from which you’re collecting data.
28+
: You can use [{{es}} ingest pipelines](/manage-data/ingest/transform-enrich/ingest-pipelines.md) to enrich incoming data or normalize field data before the data is indexed. {{es}} ingest pipelines enable you to manipulate the data as it comes in. This approach helps you avoid adding processing overhead to the hosts from which you’re collecting data.
2929

3030
: When you define a pipeline, you can configure one or more processors to operate on the incoming data. A typical use case is to transform specific strings to lowercase, or to sort the elements of incoming arrays into a given order. This section describes:
3131
* How to create, view, edit, and delete an ingest pipeline
3232
* How to set up processors to transform the data
3333
* How to test a pipeline before putting it into production.
3434

35-
: You can try out the [Parse logs](transform-enrich/example-parse-logs.md) example which shows you how to set up in ingest pipeline to transform incoming server logs into a standard format.
35+
: You can try out the [Parse logs](/manage-data/ingest/transform-enrich/example-parse-logs.md) example which shows you how to set up in ingest pipeline to transform incoming server logs into a standard format.
3636

3737
: The {{es}} enrich processor enables you to add data from existing indices to your incoming data, based on an enrich policy. The enrich policy contains a set of rules to match incoming documents to the fields containing the data to add. Refer to [Data enrichment](transform-enrich/data-enrichment.md) to learn how to set up an enrich processor. You can also try out a few examples that show how to enrich data based on geographic location, exact values such as email addresses or IDs, or a range of values such as a date or set of IP addresses.
3838

@@ -41,6 +41,9 @@ Note that you can also perform transforms on existing {{es}} indices to pivot da
4141

4242
: If you're ingesting using {{agent}} with Elastic {{integrations}}, you can use the {{ls}} [`elastic_integration filter`](logstash://reference/index.md) and other [{{ls}} filters](logstash-docs-md://lsr/filter-plugins.md) to [extend Elastic integrations](logstash://reference/using-logstash-with-elastic-integrations.md) by transforming data before it goes to {{es}}.
4343

44+
Ingest lag
45+
: Calculate the time it takes for data to travel from its source to {{es}}. This is key for monitoring performance and finding bottlenecks in your data pipelines. Learn how in [Calculate ingest lag](https://www.elastic.co/blog/calculating-ingest-lag-and-storing-ingest-time-in-elasticsearch-to-improve-observability).
46+
4447
Index mapping
4548
: Index mapping lets you control the structure that incoming data has within an {{es}} index. You can define all of the fields that are included in the index and their respective data types. For example, you can set fields for dates, numbers, or geolocations, and define the fields to have specific formats.
4649

Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
---
2+
mapped_pages:
3+
- https://www.elastic.co/docs/manage-data/ingest/transform-enrich/calculate-ingest-lag.html
4+
applies_to:
5+
stack: ga
6+
serverless: ga
7+
---
8+
9+
# Calculate the ingest lag metadata
10+
11+
Ingest lag is the time it takes from when a document is read to when it is received by {{es}}. Store this value in minutes, seconds, or milliseconds, and use it to create visualizations and alerts.
12+
13+
The basic calculation is:
14+
15+
`event.ingested - @timestamp`
16+
17+
## Understanding `event.ingested`
18+
19+
You can obtain `event.ingested` timestamp in two ways:
20+
21+
- `_ingest.timestamp`
22+
Available through the mustache notation `{{_ingest.timestamp}}` in all processors except `script`.
23+
24+
- `metadata().now`
25+
Available only in the `script` processor. Use this instead of `_ingest.timestamp` when working with scripts.
26+
27+
::::{note}
28+
The `event.ingested` option is typically set in the Fleet final pipeline, which runs as the last step in the ingest process. Calculating the latency in seconds is sufficient for most use cases.
29+
::::
30+
31+
## Calculating ingestion latency
32+
33+
The following script is the core of the solution. It creates a new field, `event.ingestion.latency`, which you can use to monitor ingestion performance across your pipelines.
34+
35+
```json
36+
{
37+
"script": {
38+
"description": "Calculates entire ingestion flow latency",
39+
"if": "ctx['@timestamp'] != null",
40+
"source": """
41+
ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']);
42+
ctx.putIfAbsent("event", [:]);
43+
ctx.event.putIfAbsent("ingestion", [:]);
44+
ctx.event.ingestion.latency= ChronoUnit.SECONDS.between(start, metadata().now);
45+
"""
46+
}
47+
}
48+
```
49+
50+
## @timestamp
51+
52+
The value of `@timestamp` can vary depending on the data source. It might represent the time the Elastic Agent read the document, or it might be the actual timestamp extracted from the document itself after parsing.
53+
54+
This distinction affects how ingest lag is calculated. For example, when Elastic Agent reads Windows Event Logs, it sets `@timestamp` based on the log's original timestamp. However, this behavior does not apply to all sources, such as syslog messages or Linux log files, where `@timestamp` is often set later in the pipeline, after parsing.
55+
56+
This inconsistency can lead to inaccurate latency measurements if not accounted for.
57+
58+
```json
59+
POST _ingest/pipeline/_simulate
60+
{
61+
"docs": [{
62+
"_source": {
63+
"@timestamp": "2025-04-03T10:00:00.000Z",
64+
"message": "2025-03-01T09:00:00.000Z user: philipp has logged in"
65+
}
66+
}],
67+
"pipeline": {
68+
"processors": [
69+
{"script": {
70+
"if": "ctx['@timestamp'] != null",
71+
"source": """
72+
ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']);
73+
ctx.latency= ChronoUnit.SECONDS.between(start, metadata().now);
74+
"""
75+
}}
76+
]
77+
}
78+
}
79+
```
80+
81+
In the previous example, the read timestamp was `3rd April at 10:00`, while the actual log message on storage is from `3rd March`. If you calculate the difference at the first step, before any parsing, the result will be accurate. However, if you calculate as the final step in the pipeline, which is typically the case with Elastic Integrations that use `@custom` pipelines, the timestamp of `2025-03-01` will be used as `@timestamp`, leading to an erroneous latency calculation.
82+
83+
For many use cases, simply using `@timestamp` is sufficient, as we expect the Elastic Agent to pick up logs as quickly as possible. During the initial onboarding of new data sources, there might be higher latency due to the ingestion of historical or older data.
84+
85+
## The importance of `event.created`
86+
87+
As discussed above `@timestamp` is set to the timestamp from within the collected log. Whenever we need to calculate a true value, it can be off, because it can take Elastic Agent seconds, minutes, hours, days, ... to pick up something. Imagine you onboard an old server for the first time, your latency will shoot through the roof, since you might collect year old data. `event.created` is not automatically added to any log in Elastic Agent. For that you need to add the following processor to the advanced settings within an integration.
88+
89+
```yaml
90+
- script:
91+
lang: javascript
92+
source: >
93+
function process(event) {
94+
event.put("event.created", Date.now());
95+
}
96+
```
97+
98+
## Architectures
99+
100+
Regardless of the chosen architecture, add a `remove` processor at the end of the pipeline to drop the `_tmp` field. The raw timestamps from the various processing steps are not needed, as the latency in seconds should be sufficient. For additional pipeline architectures, refer to [Ingest architectures](../ingest-reference-architectures.md).
101+
102+
## Logstash
103+
104+
When Logstash is added to the architecture we must add a timestamp, this can only be done by using Ruby and the simplest form is this:
105+
106+
```
107+
ruby {
108+
code => "event.set('[_tmp][logstash_seen]', Time.now());"
109+
}
110+
```
111+
112+
### Elastic Agent to Elasticsearch
113+
114+
Use `@timestamp` and `event.ingested` to calculate the difference. This will give you the following document. The `event.ingestion.latency` is in seconds.
115+
116+
```json
117+
{
118+
"event": {
119+
"ingestion": {
120+
"latency": 443394
121+
}
122+
}
123+
}
124+
```
125+
126+
This script contains the processors needed for an ingest pipeline to calculate the ingest lag.
127+
128+
```json
129+
POST _ingest/pipeline/_simulate
130+
{
131+
"docs": [{
132+
"_source": {
133+
"@timestamp": "2025-04-03T10:00:00.000Z",
134+
"message": "user: philipp has logged in",
135+
"_tmp": {
136+
"logstash": "2025-04-03T10:00:02.456Z"
137+
}
138+
139+
}
140+
}],
141+
"pipeline": {
142+
"processors": [
143+
{
144+
"script": {
145+
"description": "Calculates entire ingestion flow latency",
146+
"if": "ctx['@timestamp'] != null",
147+
"source": """
148+
ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']);
149+
ctx.putIfAbsent("event", [:]);
150+
ctx.event.putIfAbsent("ingestion", [:]);
151+
ctx.event.ingestion.latency= ChronoUnit.SECONDS.between(start, metadata().now);
152+
"""
153+
}
154+
}
155+
]
156+
}
157+
}
158+
```
159+
160+
### Elastic Agent => Logstash => Elasticsearch
161+
162+
Elastic Agent populates the `@timestamp` field, but Logstash doesn't add any timestamp by default. Add a temporary timestamp, for example by setting `_tmp.logstash_seen`. With this, you can calculate the following latency values:
163+
164+
- Total latency: (`@timestamp - event.ingested`)
165+
- Elastic Agent to Logstash: (`@timestamp - _tmp.logstash_seen`)
166+
- Logstash to Elasticsearch: (`_tmp.logstash_seen - event.ingested`)
167+
168+
These values can be especially helpful for debugging, as they allow you to quickly determine where the lag is introduced, and whether the delay is caused by the transfer from Elastic Agent to Logstash or from Logstash to Elasticsearch.
169+
170+
This script calculates these differences, providing latency values for each of the stages previously mentioned.
171+
172+
```json
173+
{
174+
"event": {
175+
"ingestion": {
176+
"latency_logstash_to_elasticsearch": 443091,
177+
"latency": 443093,
178+
"latency_elastic_agent_to_logstash": 1
179+
}
180+
}
181+
}
182+
```
183+
184+
This script contains the processors needed for an ingest pipeline to calculate the ingest lag. If you want to remove the first calculation, ensure that the object `event.ingestion` is available.
185+
186+
```json
187+
POST _ingest/pipeline/_simulate
188+
{
189+
"docs": [{
190+
"_source": {
191+
"@timestamp": "2025-04-03T10:00:00.000Z",
192+
"message": "user: philipp has logged in",
193+
"_tmp": {
194+
"logstash": "2025-04-03T10:00:02.456Z"
195+
}
196+
197+
}
198+
}],
199+
"pipeline": {
200+
"processors": [
201+
{
202+
"script": {
203+
"description": "Calculates entire ingestion flow latency",
204+
"if": "ctx['@timestamp'] != null",
205+
"source": """
206+
ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']);
207+
ctx.putIfAbsent("event", [:]);
208+
ctx.event.putIfAbsent("ingestion", [:]);
209+
ctx.event.ingestion.latency= ChronoUnit.SECONDS.between(start, metadata().now);
210+
"""
211+
}
212+
},
213+
{
214+
"script": {
215+
"description": "Calculates logstash to Elasticsearch latency",
216+
"if": "ctx._tmp?.logstash != null",
217+
"source": """
218+
ZonedDateTime start = ZonedDateTime.parse(ctx._tmp.logstash_seen);
219+
ctx.event.ingestion.latency_logstash_to_elasticsearch=ChronoUnit.SECONDS.between(start, metadata().now);
220+
"""
221+
}
222+
},
223+
{
224+
"script": {
225+
"description": "Calculates Elastic Agent to Logstash latency",
226+
"if": "ctx._tmp?.logstash != null",
227+
"source": """
228+
ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']);
229+
ZonedDateTime end = ZonedDateTime.parse(ctx._tmp.logstash_seen);
230+
ctx.event.ingestion.latency_elastic_agent_to_logstash=ChronoUnit.SECONDS.between(start, end);
231+
"""
232+
}
233+
}
234+
]
235+
}
236+
}
237+
```
238+
239+
### Elastic Agent => Logstash => Kafka => Logstash => Elasticsearch
240+
241+
As with the previous scenario, adding an additional hop introduces another point where latency can occur. The recommendation is to add another temporary timestamp field. For more details, refer to the previous section.
242+
243+
This is a script that calculates the latency for each step in the pipeline. The following values will be generated:
244+
245+
```json
246+
{
247+
"event": {
248+
"ingestion": {
249+
"latency_logstash_to_elasticsearch": 443091,
250+
"latency_logstash_to_logstash": 1,
251+
"latency": 443093,
252+
"latency_elastic_agent_to_logstash": 1
253+
}
254+
}
255+
}
256+
```
257+
258+
This script contains the processors needed for an ingest pipeline to calculate the ingest lag. To remove the first calculation, ensure that the object `event.ingestion` is available. You can also merge all of the steps into one larger script.
259+
260+
```json
261+
POST _ingest/pipeline/_simulate
262+
{
263+
"docs": [{
264+
"_source": {
265+
"@timestamp": "2025-04-03T10:00:00.000Z",
266+
"message": "user: philipp has logged in",
267+
"_tmp": {
268+
"logstash_pre_kafka": "2025-04-03T10:00:01.233Z",
269+
"logstash_post_kafka": "2025-04-03T10:00:02.456Z"
270+
}
271+
272+
}
273+
}],
274+
"pipeline": {
275+
"processors": [
276+
{
277+
"script": {
278+
"description": "Calculates entire ingestion flow latency",
279+
"if": "ctx['@timestamp'] != null",
280+
"source": """
281+
ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']);
282+
ctx.putIfAbsent("event", [:]);
283+
ctx.event.putIfAbsent("ingestion", [:]);
284+
ctx.event.ingestion.latency= ChronoUnit.SECONDS.between(start, metadata().now);
285+
"""
286+
}
287+
},
288+
{
289+
"script": {
290+
"description": "Calculates logstash to logstash latency",
291+
"if": "ctx._tmp?.logstash_pre_kafka != null && ctx._tmp?.logstash_post_kafka != null",
292+
"source": """
293+
ZonedDateTime start = ZonedDateTime.parse(ctx._tmp.logstash_pre_kafka);
294+
ZonedDateTime end = ZonedDateTime.parse(ctx._tmp.logstash_post_kafka);
295+
ctx.event.ingestion.latency_logstash_to_logstash=ChronoUnit.SECONDS.between(start, end);
296+
"""
297+
}
298+
},
299+
{
300+
"script": {
301+
"description": "Calculates logstash post Kafka to Elasticsearch latency",
302+
"if": "ctx._tmp?.logstash_post_kafka != null",
303+
"source": """
304+
ZonedDateTime start = ZonedDateTime.parse(ctx._tmp.logstash_post_kafka);
305+
ctx.event.ingestion.latency_logstash_to_elasticsearch=ChronoUnit.SECONDS.between(start, metadata().now);
306+
"""
307+
}
308+
},
309+
{
310+
"script": {
311+
"description": "Calculates Elastic Agent to pre kafka Logstash latency",
312+
"if": "ctx._tmp?.logstash_pre_kafka != null",
313+
"source": """
314+
ZonedDateTime start = ZonedDateTime.parse(ctx['@timestamp']);
315+
ZonedDateTime end = ZonedDateTime.parse(ctx._tmp.logstash_pre_kafka);
316+
ctx.event.ingestion.latency_elastic_agent_to_logstash=ChronoUnit.SECONDS.between(start, end);
317+
"""
318+
}
319+
}
320+
]
321+
}
322+
}
323+
```

manage-data/toc.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ toc:
9999
- file: ingest/upload-data-files.md
100100
- file: ingest/transform-enrich.md
101101
children:
102+
- file: ingest/transform-enrich/ingest-lag.md
102103
- file: ingest/transform-enrich/ingest-pipelines.md
103104
children:
104105
- file: ingest/transform-enrich/example-parse-logs.md

0 commit comments

Comments
 (0)