Skip to content

Commit 6ab670d

Browse files
authored
[Streams] Update processors for 9.3 release (#4614)
1 parent 9efbc03 commit 6ab670d

File tree

14 files changed

+141
-26
lines changed

14 files changed

+141
-26
lines changed

solutions/observability/streams/management/extract.md

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ applies_to:
33
serverless: ga
44
stack: preview =9.1, ga 9.2+
55
---
6-
# Extract fields [streams-extract-fields]
6+
# Process documents [streams-extract-fields]
77

8-
After selecting a stream, use the **Processing** tab to add [processors](#streams-extract-processors) that extract meaningful fields from your log messages. These fields let you filter and analyze your data more effectively.
8+
After selecting a stream, use the **Processing** tab to add [processors](#streams-extract-processors) and [conditions](#streams-add-processor-conditions) that modify your documents and extract meaningful fields, so you can filter and analyze your data more effectively.
99

1010
For example, in [Discover](../../../../explore-analyze/discover.md), extracted fields might let you filter for log messages with an `ERROR` log level that occurred during a specific time period to help diagnose an issue. Without extracting the log level and timestamp fields from your messages, those filters wouldn't return meaningful results.
1111

@@ -14,7 +14,7 @@ The **Processing** tab also:
1414
- Simulates your processors and provides an immediate [preview](#streams-preview-changes) that's tested end to end
1515
- Flags indexing issues, like [mapping conflicts](#streams-processing-mapping-conflicts), so you can address them before applying changes
1616

17-
After creating your processor, all future data ingested into the stream is parsed into structured fields accordingly.
17+
After creating your processor, Streams parses all future data ingested into the stream into structured fields accordingly.
1818

1919
:::{note}
2020
Applied changes aren't retroactive and only affect *future ingested data*.
@@ -24,13 +24,28 @@ Applied changes aren't retroactive and only affect *future ingested data*.
2424

2525
Streams supports the following processors:
2626

27+
- [**Drop**](./extract/drop.md): Drops the document without raising any errors. This is useful to prevent the document from getting indexed based on a condition.
28+
- [**Remove**](./extract/remove.md): Removes existing fields.
2729
- [**Date**](./extract/date.md): Converts date strings into timestamps, with options for timezone, locale, and output formatting.
30+
- [**Convert**](./extract/convert.md): Converts a field in the currently ingested document to a different type, such as converting a string to an integer.
31+
- [**Replace**](./extract/replace.md): Replaces parts of a string field according to a regular expression pattern with a replacement string.
2832
- [**Dissect**](./extract/dissect.md): Extracts fields from structured log messages using defined delimiters instead of patterns, making it faster than Grok and ideal for consistently formatted logs.
2933
- [**Grok**](./extract/grok.md): Extracts fields from unstructured log messages using predefined or custom patterns, supports multiple match attempts in sequence, and can automatically generate patterns with an [LLM connector](/explore-analyze/ai-features/llm-guides/llm-connectors.md).
3034
- [**Set**](./extract/set.md): Assigns a specific value to a field, creating the field if it doesn’t exist or overwriting its value if it does.
35+
- [**Math**](./extract/math.md): Evaluates arithmetic or logical expressions.
3136
- [**Rename**](./extract/rename.md): Changes the name of a field, moving its value to a new field name and removing the original.
3237
- [**Append**](./extract/append.md): Adds a value to an existing array field, or creates the field as an array if it doesn’t exist.
3338

39+
### Processor limitations and inconsistencies [streams-processor-inconsistencies]
40+
41+
Streams exposes a Streamlang configuration, but internally it relies on {{es}} ingest pipeline processors and ES|QL. Streamlang doesn’t always have 1:1 parity with the ingest processors because it needs to support options that work in both ingest pipelines and ES|QL. In most cases, you won’t need to worry about these details, but the underlying design decisions still affect the UI and available configuration options. The following are some limitations and inconsistencies when using Streamlang processors:
42+
43+
- **Consistently typed fields**: ES|QL requires one consistent type per column, so workflows that produce mixed types across documents won’t transpile.
44+
- **Conversion of types**: ES|QL and ingest pipelines accept different conversion combinations and strictness (especially for strings), so `convert` can behave differently across targets.
45+
- **Multi-value commands/functions**: Fields can contain one or multiple values. ES|QL and ingest processors don’t always handle these cases the same way. For example, grok in ES|QL handles multiple values automatically, while the grok processor does not
46+
- **Conditional execution**: ES|QL's enforced table shape limits conditional casting, parsing, and wildcard field operations that ingest pipelines can do per-document.
47+
- **Arrays of objects / flattening**: Ingest pipelines preserve nested JSON arrays, while ES|QL flattens to columns, so operations like rename and delete on parent objects can differ or fail.
48+
3449
## Add a processor [streams-add-processors]
3550

3651
Streams uses [{{es}} ingest pipelines](../../../../manage-data/ingest/transform-enrich/ingest-pipelines.md) made up of processors to transform your data, without requiring you to switch interfaces and manually update pipelines.
@@ -49,7 +64,7 @@ Refer to individual [supported processors](#streams-extract-processors) for more
4964
Editing processors with JSON is planned for a future release, and additional processors may be supported over time.
5065
:::
5166

52-
### Add conditions to processors [streams-add-processor-conditions]
67+
### Add conditions [streams-add-processor-conditions]
5368

5469
You can add conditions to processors so they only run on data that meets those conditions. Each condition is a boolean expression that's evaluated for every document.
5570

@@ -76,6 +91,8 @@ Streams processors support the following comparators:
7691
- not exists
7792
:::
7893

94+
After creating a condition, add a processor or another condition to it by selecting the {icon}`plus_in_circle`.
95+
7996
### Preview changes [streams-preview-changes]
8097

8198
After you create processors, the **Data preview** tab simulates processor results with additional filtering options depending on the outcome of the simulation.
@@ -93,9 +110,9 @@ After making sure everything in the **Data preview** tab is correct, select **Sa
93110

94111
If you edit the stream after saving your changes, keep the following in mind:
95112

96-
- Adding processors to the end of the list will work as expected.
97-
- Editing or reordering existing processors can cause inaccurate results. Because the pipeline may have already processed the documents used for sampling, **Data preview** cannot accurately simulate changes to existing data.
98-
- Adding a new processor and moving it before an existing processor may cause inaccurate results. **Data preview** only simulates the new processor, not the existing ones, so the simulation may not accurately reflect changes to existing data.
113+
- Adding processors to the end of the list works as expected.
114+
- Editing or reordering existing processors can cause inaccurate results. Because the pipeline might have already processed the documents used for sampling, **Data preview** cannot accurately simulate changes to existing data.
115+
- Adding a new processor and moving it before an existing processor can cause inaccurate results. **Data preview** only simulates the new processor, not the existing ones, so the simulation may not accurately reflect changes to existing data.
99116

100117
### Ignore failures [streams-ignore-failures]
101118

@@ -122,7 +139,7 @@ Selecting **Failed** shows the documents that weren't parsed correctly:
122139
:screenshot:
123140
:::
124141

125-
Failures are displayed at the bottom of the process editor. Some failures may require fixes, while others simply serve as a warning:
142+
Streams displays failures at the bottom of the process editor. Some failures might require fixes, while others serve as a warning:
126143

127144
:::{image} ../../../images/logs-streams-processor-failures.png
128145
:screenshot:
@@ -179,10 +196,10 @@ Streams then creates and manages the `<data_stream_name>@stream.processing` pipe
179196
### User interaction with pipelines
180197

181198
Do not manually modify the `<data_stream_name>@stream.processing` pipeline created by Streams.
182-
You can still add your own processors manually to the `@custom` pipeline if needed. Adding processors before the pipeline processor created by Streams may cause unexpected behavior.
199+
You can still add your own processors manually to the `@custom` pipeline if needed. Adding processors before the pipeline processor created by Streams might cause unexpected behavior.
183200

184201
## Known limitations [streams-known-limitations]
185202

186203
- Streams does not support all processors. More processors will be added in future versions.
187-
- The data preview simulation may not accurately reflect the changes to the existing data when editing existing processors or re-ordering them. Streams will allow proper simulations using original documents in a future version.
204+
- The data preview simulation might not accurately reflect the changes to the existing data when editing existing processors or re-ordering them. Streams will allow proper simulations using original documents in a future version.
188205
- Streams can't properly handle arrays. While it supports basic actions like appending or renaming, it can't access individual array elements. For classic streams, the workaround is to use the [manual pipeline configuration](./extract/manual-pipeline-configuration.md) that supports Painless scripting and all ingest processors.

solutions/observability/streams/management/extract/append.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ applies_to:
66
# Append processor [streams-append-processor]
77
% Need use cases
88

9-
Use the append processor to add a value to an existing array field, or create the field as an array if it doesn’t exist.
9+
Use the **Append** processor to add a value to an existing array field, or create the field as an array if it doesn’t exist.
1010

1111
To use an append processor:
1212

@@ -15,4 +15,4 @@ To use an append processor:
1515
1. Set **Source Field** to the field you want append values to.
1616
1. Set **Target field** to the values you want to append to the **Source Field**.
1717

18-
This functionality uses the {{es}} rename pipeline processor. Refer to the [rename processor](elasticsearch://reference/enrich-processor/rename-processor.md) {{es}} documentation for more information.
18+
This functionality uses the {{es}} [append processor](elasticsearch://reference/enrich-processor/append-processor.md) internally, but you configure it in Streamlang. Streamlang doesn’t always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies).
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
---
2+
applies_to:
3+
serverless: ga
4+
stack: ga 9.3+
5+
---
6+
7+
# Convert processor [streams-convert-processor]
8+
The **Convert** processor converts a field to a different data type. For example, you could convert a string to an integer.
9+
10+
To convert a field to a different data type:
11+
12+
1. Select **Create****Create processor**.
13+
1. Select **Convert** from the **Processor** menu.
14+
1. Set the **Source Field** to the field you want to convert.
15+
1. (Optional) Set **Target field** to write the converted value to a different field.
16+
1. Set **Type** to the output data type.
17+
18+
::::{note}
19+
If you add a **Convert** processor inside a condition group (a **WHERE** block), you must set a **Target field**.
20+
::::
21+
22+
This functionality uses the {{es}} [Convert processor](elasticsearch://reference/enrich-processor/convert-processor.md) internally, but you configure it in Streamlang. Streamlang doesn’t always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies).

solutions/observability/streams/management/extract/date.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ applies_to:
66

77
# Date processor [streams-date-processor]
88

9-
The date processor parses dates from fields, and then uses the date or timestamp as the timestamp for the document.
9+
The **Date** processor parses dates from fields, and then uses the date or timestamp as the timestamp for the document.
1010

1111
To extract a timestamp field using the date processor:
1212

@@ -15,7 +15,7 @@ To extract a timestamp field using the date processor:
1515
1. Set the **Source Field** to the field containing the timestamp.
1616
1. Set the **Format** field to one of the accepted date formats (ISO8602, UNIX, UNIX_MS, or TAI64N) or use a Java time pattern. Refer to the [example formats](#streams-date-examples) for more information.
1717

18-
This functionality uses the {{es}} date pipeline processor. Refer to the [date processor](elasticsearch://reference/enrich-processor/date-processor.md) {{es}} documentation for more information.
18+
This functionality uses the {{es}} [Date processor](elasticsearch://reference/enrich-processor/date-processor.md) internally, but you configure it in Streamlang. Streamlang doesn’t always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies).
1919

2020
## Example formats [streams-date-examples]
2121

solutions/observability/streams/management/extract/dissect.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ applies_to:
55
---
66
# Dissect processor [streams-dissect-processor]
77

8-
The dissect processor parses structured log messages and extracts fields from them. It uses a set of delimiters to split the log message into fields instead of predefined patterns to match the log messages.
8+
The **Dissect** processor parses structured log messages and extracts fields from them. It uses a set of delimiters to split the log message into fields instead of predefined patterns to match the log messages.
99

1010
Dissect is much faster than Grok, and is recommend for log messages that follow a consistent, structured format.
1111

1212
To parse a log message with a dissect processor:
1313

1414
1. Select **Create****Create processor**.
1515
1. Select **Dissect** from the **Processor** menu.
16-
1. Set the **Source Field** to the field you want to dissect
16+
1. Set the **Source Field** to the field you want to dissect.
1717
1. Set the delimiters you want to use in the **Pattern** field. Refer to the [example pattern](#streams-dissect-example) for more information on setting delimiters.
1818

19-
This functionality uses the {{es}} dissect pipeline processor. Refer to the [dissect processor](elasticsearch://reference/enrich-processor/dissect-processor.md) {{es}} documentation for more information.
19+
This functionality uses the {{es}} [Dissect processor](elasticsearch://reference/enrich-processor/dissect-processor.md) internally, but you configure it in Streamlang. Streamlang doesn’t always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies).
2020

2121
## Example dissect pattern [streams-dissect-example]
2222

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
---
2+
applies_to:
3+
serverless: ga
4+
stack: ga 9.3+
5+
---
6+
7+
# Drop document processor [streams-drop-processor]
8+
9+
The **Drop document** processor prevents documents from being indexed when they meet a specific condition, without raising an error.
10+
11+
To configure a condition for dropping documents:
12+
13+
1. Select **Create****Create processor**.
14+
1. Select **Drop document** from the **Processor** menu.
15+
1. Set the **Condition** for when you want to drop a document.
16+
17+
:::{warning}
18+
The default is the `always` condition. Not setting a specific condition results in every document that matches the drop condition getting dropped from indexing.
19+
:::
20+
21+
This functionality uses the {{es}} [Drop processor](elasticsearch://reference/enrich-processor/drop-processor.md) internally, but you configure it in Streamlang. Streamlang doesn’t always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies).

solutions/observability/streams/management/extract/grok.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ applies_to:
55
---
66
# Grok processor [streams-grok-processor]
77

8-
The grok processor parses unstructured log messages using a set of predefined patterns to match the log messages and extract the fields. The grok processor is very powerful and can parse a wide variety of log formats.
8+
The **Grok** processor parses unstructured log messages using a set of predefined patterns to match the log messages and extract the fields. The grok processor is powerful and can parse a wide variety of log formats.
99

1010
You can provide multiple patterns to the grok processor. The grok processor tries to match the log message against each pattern in the order they are provided. If a pattern matches, it extracts the fields and the remaining patterns won't be used.
1111

@@ -20,7 +20,7 @@ To parse a log message with a grok processor:
2020
1. Set the **Source Field** to the field you want to search for grok matches.
2121
1. Set the patterns you want to use in the **Grok patterns** field. Refer to the [example pattern](#streams-grok-example) for more information on patterns.
2222

23-
This functionality uses the {{es}} Grok pipeline processor. Refer to the [Grok processor](elasticsearch://reference/enrich-processor/grok-processor.md) {{es}} documentation for more information.
23+
This functionality uses the {{es}} [Grok processor](elasticsearch://reference/enrich-processor/grok-processor.md) internally, but you configure it in Streamlang. Streamlang doesn’t always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies).
2424

2525
## Example grok pattern [streams-grok-example]
2626

solutions/observability/streams/management/extract/manual-pipeline-configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ applies_to:
66
# Manual pipeline configuration [streams-manual-pipeline-configuration]
77

88
:::{note}
9-
The manual pipeline configuration processor is only available on [classic streams](../../streams.md#streams-classic-vs-wired).
9+
The **manual pipeline configuration** processor is only available on [classic streams](../../streams.md#streams-classic-vs-wired).
1010
:::
1111

1212
The **Manual pipeline configuration** lets you create a JSON-encoded array of ingest pipeline processors.This is helpful if you want to add more advanced processing that isn't currently available as part of the UI-based processors.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
---
2+
applies_to:
3+
serverless: ga
4+
stack: ga 9.3+
5+
---
6+
7+
# Math processor [streams-math-processor]
8+
9+
The **Math** processor evaluates arithmetic or logical expressions and stores the result in the target field.
10+
11+
To calculate a value using an expression and store the result in a target field:
12+
13+
1. Select **Create****Create processor**.
14+
1. Select **Math** from the **Processor** menu.
15+
1. Set the **Target field** where you want to write the expression result.
16+
1. Set your expression in the **Expression** field. You can directly reference fields in your expression (for example, `bytes / duration`).
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
---
2+
applies_to:
3+
serverless: ga
4+
stack: ga 9.3+
5+
---
6+
7+
# Remove processor [streams-remove-processor]
8+
9+
The **Remove** processor removes a field (**Remove**) or removes a field and all its nested fields (**Remove by prefix**) from your documents.
10+
11+
To remove a field:
12+
13+
1. Select **Create****Create processor**.
14+
1. From the **Processor** menu, select **Remove** to remove a field or **Remove by prefix** to remove a field and all its nested fields.
15+
1. Set the **Source Field** to the field you want to remove.
16+
17+
This functionality uses the {{es}} [Remove processor](elasticsearch://reference/enrich-processor/remove-processor.md) internally, but you configure it in Streamlang. Streamlang doesn’t always have 1:1 parity with the ingest processor options and behavior. Refer to [Processor limitations and inconsistencies](../extract.md#streams-processor-inconsistencies).

0 commit comments

Comments
 (0)