Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified solutions/images/logs-streams-patterns.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 1 addition & 3 deletions solutions/observability/streams/management/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,4 @@ navigation_title: Configure advanced settings
---
# Configure advanced settings for streams [streams-advanced-settings]

The **Advanced** tab on the **Manage stream** page shows the underlying configuration details of your stream. While Streams simplifies many configurations, it doesn't support modifying all pipelines and templates. From the **Advanced** tab, you can manually interact with the index or component templates or modify other ingest pipelines that used by the stream.

This UI is intended for advanced users.
The **Advanced** tab shows the underlying {{es}} configuration details of your stream. While Streams simplifies many configurations, it doesn't support modifying all pipelines and templates. From the **Advanced** tab, you can manually interact with the index or component templates or modify other ingest pipelines that used by the stream.
15 changes: 15 additions & 0 deletions solutions/observability/streams/management/data-quality.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
applies_to:
serverless: preview
stack: preview 9.1, ga 9.2
---

# Manage data quality [streams-data-retention]

Use the **Data quality** tab to find failed and degraded documents in your stream. The **Data quality** tab is made up of the following components:

- **Degraded documents**: Documents with the `ignored` property usually because of malformed fields or exceeding the limit of total fields when `ignore_above:false`. This component shows the total number of degraded documents, the percentage, and status (**Good**, **Degraded**, **Poor**).
- **Failed documents**: Documents that were rejected during ingestion.
- **Issues**: {applies_to}`stack: preview 9.2`Find issues with specific fields, how often they've occurred, and when they've occurred.

For more information on data quality, refer to the [data set quality](../../data-set-quality-monitoring.md) documentation.
95 changes: 57 additions & 38 deletions solutions/observability/streams/management/extract.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,61 @@ applies_to:
---
# Extract fields [streams-extract-fields]

Unstructured log messages must be parsed into meaningful fields before you can filter and analyze them effectively. Commonly extracted fields include `@timestamp` and the `log.level`, but you can also extract information like IP addresses, usernames, and ports.
Extracting meaningful fields from your log messages lets you filter and analyze them effectively. For example, you might want to use [Discover](../../../../explore-analyze/discover.md) to filter for log messages with a `WARNING` or `ERROR` log level that occurred during a certain time period to diagnose an issue. If you haven't extracted log level and timestamp fields from your messages, you won't get meaningful results.

Use the **Processing** tab on the **Manage stream** page to process your data. The UI simulates your changes and provides an immediate preview that's tested end-to-end.
From the **Processing** tab, you can add the [processors](#streams-extract-processors) you need to extract these structured fields. The UI then simulates your changes and provides an immediate [preview](#streams-preview-changes) that's tested end-to-end.

The UI also shows indexing problems, such as mapping conflicts, so you can address them before applying changes.
Streams also shows when you have indexing problems, such as [mapping conflicts](#streams-processing-mapping-conflicts), so you can address them before applying changes.

After creating your processor, all future ingested data will be parsed into structured fields accordingly.

:::{note}
Applied changes aren't retroactive and only affect *future ingested data*.
:::

## Supported processors [streams-extract-processors]

Streams supports the following processors:

- [Date](./extract/date.md): convert date strings into timestamps with options for timezone, locale, and output format settings.
- [Dissect](./extract/dissect.md): extract fields from structured log messages using defined delimiters instead of patterns, making it faster than Grok and ideal for consistently formatted logs.
- [Grok](./extract/grok.md): extract fields from unstructured log messages using predefined or custom patterns, supports multiple match attempts in sequence, and can automatically generate patterns with an LLM connector.
- [Set](./extract/set.md): assign a specific value to a field, creating the field if it doesn’t exist or overwriting its value if it does.
- [Rename](./extract/rename.md): change the name of a field, moving its value to a new field name and removing the original.
- [Append](./extract/append.md): add a value to an existing array field, or create the field as an array if it doesn’t exist.

## Add a processor [streams-add-processors]

Streams uses {{es}} ingest pipelines to process your data. Ingest pipelines are made up of processors that transform your data.
Streams uses [{{es}} ingest pipelines](../../../../manage-data/ingest/transform-enrich/ingest-pipelines.md) made up of processors to transform your data, without requiring you to switch interfaces and manually update pipelines.

To add a processor:
To add a processor from the **Processing** tab:

1. Select **Add processor** to open a list of supported processors.
1. Select a processor from the list:
- [Date](./extract/date.md)
- [Dissect](./extract/dissect.md)
- [Grok](./extract/grok.md)
- GeoIP
- Rename
- Set
- URL Decode
1. Select **Add Processor** to save the processor.
1. Select **Create** → **Create processor** to open a list of supported processors.
1. Select a processor from the **Processor** menu.
1. Configure the processor and select **Create** to save the processor.

After adding all desired processors and conditions, make sure to **Save changes**.

Refer to individual [supported processors](#streams-extract-processors) for more on configuring specific processors.

:::{note}
Editing processors with JSON is planned for a future release, and additional processors may be supported over time.
:::

### Add conditions to processors [streams-add-processor-conditions]

You can provide a condition for each processor under **Optional fields**. Conditions are boolean expressions that are evaluated for each document. Provide a field, a value, and a comparator.
Processors support these comparators:
You can provide a condition for each processor under **Optional fields**. Conditions are boolean expressions that are evaluated for each document.

To add a condition:
1. Select **Create** → **Create condition**.
1. Provide a **Field**, a **Value**, and a comparator. Expand the following dropdown for supported comparators.
1. Select **Create condition**.

After adding all desired processors and conditions, make sure to **Save changes**.

:::{dropdown} Supported comparators
Streams processors support the following comparators:

- equals
- not equals
- less than
Expand All @@ -51,10 +71,11 @@ Processors support these comparators:
- ends with
- exists
- not exists
:::

### Preview changes [streams-preview-changes]

Under **Processors for field extraction**, when you set pipeline processors to modify your documents, **Data preview** shows you a preview of the results with additional filtering options depending on the outcome of the simulation.
After creating processors, the **Data preview** tab shows a preview of the results with additional filtering options depending on the outcome of the simulation.

When you add or edit processors, the **Data preview** updates automatically.

Expand All @@ -63,27 +84,26 @@ To avoid unexpected results, we recommend adding processors rather than removing
:::

**Data preview** loads 100 documents from your existing data and runs your changes using them.
For any newly added processors, this simulation is reliable. You can save individual processors during the preview, and even reorder them.
Selecting **Save changes** applies your changes to the data stream.
For any newly created processors and conditions, the preview is reliable. You can create and reorder individual processors and conditions during the preview.

If you edit the stream again, note the following:
- Adding more processors to the end of the list will work as expected.
- Changing existing processors or re-ordering them may cause unexpected results. Because the pipeline may have already processed the documents used for sampling, the UI cannot accurately simulate changes to existing data.
- Adding a new processor and moving it before an existing processor may cause unexpected results. The UI only simulates the new processor, not the existing ones, so the simulation may not accurately reflect changes to existing data.
Select **Save changes** to apply your changes to the data stream.

![Screenshot of the Grok processor UI](<../../../images/logs-streams-grok.png>)
If you edit the stream after saving your changes, note the following:
- Adding more processors to the end of the list will work as expected.
- Editing or reordering existing processors may cause unexpected results. Because the pipeline may have already processed the documents used for sampling, **Data preview** cannot accurately simulate changes to existing data.
- Adding a new processor and moving it before an existing processor may cause unexpected results. **Data preview** only simulates the new processor, not the existing ones, so the simulation may not accurately reflect changes to existing data.

### Ignore failures [streams-ignore-failures]

Turn on **Ignore failure** to ignore the processor if it fails. This is useful if you want to continue processing the document even if the processor fails.
Each processor has the option to **Ignore failures**. When enabled, processing of the document continues when the processor fails.

### Ignore missing fields [streams-ignore-missing-fields]

Turn on **Ignore missing fields** to ignore the processor if the field is not present. This is useful if you want to continue processing the document even if the field is not present.
Dissect, grok, and rename processors include the **Ignore missing fields** option. When enabled, processing of the document continues when a source field is missing.

## Detect and handle failures [streams-detect-failures]

Documents fail processing for different reasons. Streams helps you to easily find and handle failures before deploying changes.
Documents fail processing for different reasons. Streams helps you to find and handle failures before deploying changes.

In the following screenshot, the **Failed** percentage shows that not all messages matched the provided Grok pattern:

Expand All @@ -97,11 +117,11 @@ Failures are displayed at the bottom of the process editor:

![Screenshot showing failure notifications](<../../../images/logs-streams-processor-failures.png>)

These failures may require action, but in some cases, they serve more as warnings.
These failures may require action, or serve as a warning.

### Mapping conflicts
### Mapping conflicts [streams-processing-mapping-conflicts]

As part of processing, Streams also checks for mapping conflicts by simulating the change end to end. If a mapping conflict is detected, Streams marks the processor as failed and displays a failure message like the following:
As part of processing, Streams also checks for mapping conflicts by simulating the change end-to-end. When Streams detects a mapping conflict, it marks the processor as failed and displays a failure message like the following:

![Screenshot showing mapping conflict notifications](<../../../images/logs-streams-mapping-conflicts.png>)

Expand All @@ -113,12 +133,14 @@ Once saved, the processor provides a quick look at the processor's success rate

![Screenshot showing field stats](<../../../images/logs-streams-field-stats.png>)

## Advanced: How and where do these changes get applied to the underlying datastream? [streams-applied-changes]
## Advanced: How and where do these changes get applied to the underlying data stream? [streams-applied-changes]

% make sure this is all still accurate.

When you save processors, Streams modifies the "best matching" ingest pipeline for the data stream. In short, Streams either chooses the best matching pipeline ending in `@custom` that is already part of your data stream, or it adds one for you.
When you save processors, Streams modifies the "best-matching" ingest pipeline for the data stream. In short, Streams either chooses the best-matching pipeline ending in `@custom` that is already part of your data stream, or it adds one for you.

Streams identifies the appropriate @custom pipeline (for example, `logs-myintegration@custom` or `logs@custom`).
It checks the default_pipeline that is set on the datastream.
It checks the `default_pipeline` that is set on the data stream.

You can view the default pipeline at **Manage stream** → **Advanced** under **Ingest pipeline**.
In this default pipeline, we locate the last processor that calls a pipeline ending in `@custom`. For integrations, this would result in a pipeline name like `logs-myintegration@custom`. Without an integration, the only `@custom` pipeline available may be `logs@custom`.
Expand Down Expand Up @@ -150,7 +172,4 @@ You can still add your own processors manually to the `@custom` pipeline if need
## Known limitations [streams-known-limitations]

- Streams does not support all processors. We are working on adding more processors in the future.
- Streams does not support all processor options. We are working on adding more options in the future.
- The data preview simulation may not accurately reflect the changes to the existing data when editing existing processors or re-ordering them.
- Dots in field names are not supported. You can use the dot expand processor in the `@custom` pipeline as a workaround. You need to manually add the dot expand processor.
- Providing any arbitrary JSON in the Streams UI is not supported. We are working on adding this in the future.
- The data preview simulation may not accurately reflect the changes to the existing data when editing existing processors or re-ordering them. We will allow proper simulations using original documents in a future version.
16 changes: 16 additions & 0 deletions solutions/observability/streams/management/extract/append.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
applies_to:
serverless: ga
stack: preview 9.1, ga 9.2
---
# Append processor [streams-append-processor]
% Need use cases

Use the append processor to add a value to an existing array field, or create the field as an array if it doesn’t exist.

To use an append processor:

1. Set **Source Field** to the field you want append values to.
1. Set **Target field** to the values you want to append to the **Source Field**.

This functionality uses the {{es}} rename pipeline processor. Refer to the [rename processor](elasticsearch://reference/enrich-processor/rename-processor.md) {{es}} documentation for more information.
12 changes: 8 additions & 4 deletions solutions/observability/streams/management/extract/date.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ applies_to:

# Date processor [streams-date-processor]

The date processor parses date strings and uses them as the timestamp of the document.
The date processor parses dates from fields, and then uses the date or timestamp as the timestamp for the document.

To extract a timestamp field using the date processor:

1. Set the **Source Field** to the field containing the timestamp.
1. Set the **Format** field to one of the accepted date formats (ISO8602, UNIX, UNIX_MS, or TAI64N) or use a Java time pattern. Refer to the [example formats](#streams-date-examples) for more information.

This functionality uses the {{es}} date pipeline processor. Refer to the [date processor](elasticsearch://reference/enrich-processor/date-processor.md) {{es}} documentation for more information.

## Examples
## Example formats [streams-date-examples]

The following list provides some common examples of date formats and how to parse them.

Expand All @@ -34,9 +39,8 @@ Sunday, October 15, 2023 => EEEE, MMMM dd, yyyy
2023-10-15 14:30:00 => yyyy-MM-dd HH:mm:ss
```


## Optional fields [streams-date-optional-fields]
The following fields are optional for the date processor:
You can set the following optional fields for the date processor in the **Advanced settings**:

| Field | Description|
| ------- | --------------- |
Expand Down
18 changes: 12 additions & 6 deletions solutions/observability/streams/management/extract/dissect.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@ applies_to:
---
# Dissect processor [streams-dissect-processor]

The dissect processor parses structured log messages and extracts fields from them. Unlike Grok, it does not use a set of predefined patterns to match the log messages. Instead, it uses a set of delimiters to split the log message into fields.
Dissect is much faster than Grok and is ideal for log messages that follow a consistent, structured format.
The dissect processor parses structured log messages and extracts fields from them. It uses a set of delimiters to split the log message into fields instead of predefined patterns to match the log messages.

Dissect is much faster than Grok, and is recommend for log messages that follow a consistent, structured format.

To parse a log message with a dissect processor:
1. Set the **Source Field** to the field you want to dissect
1. Set the delimiters you want to use in the **Pattern** field. Refer to the [example pattern](#streams-dissect-example) for more information on setting delimiters.

This functionality uses the {{es}} dissect pipeline processor. Refer to the [dissect processor](elasticsearch://reference/enrich-processor/dissect-processor.md) {{es}} documentation for more information.

To parse a log message, simply name the field and list the delimiters you want to use. The dissect processor will then split the log message into fields based on the delimiters provided.
## Example dissect pattern [streams-dissect-example]

Example:
The following example shows the dissect pattern for an unstructured log message.

Log Message
**Log message:**
```
2025-04-04T09:04:45+00:00 ERROR 160.200.87.105 127.79.135.127 21582
```
Dissect Pattern

**Dissect Pattern:**
```
%{timestamp} %{log.level} %{source.ip} %{destination.ip} %{destination.port}
```
Loading
Loading