Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 65 additions & 13 deletions _data-prepper/pipelines/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,31 +65,83 @@ If a pipeline component fails to process and send an event, then the source rece

Pipelines also support conditional routing, which enables the routing of events to different sinks based on specific conditions. To add conditional routing, specify a list of named routes using the `route` component and assign specific routes to sinks using the `routes` property. Any sink with the `routes` property only accepts events matching at least one of the routing conditions.

In the following example pipeline, `application-logs` is a named route with a condition set to `/log_type == "application"`. The route uses [Data Prepper expressions](https://github.com/opensearch-project/data-prepper/tree/main/examples) to define the condition. Data Prepper routes events satisfying this condition to the first OpenSearch sink. By default, Data Prepper routes all events to sinks without a defined route, as shown in the third OpenSearch sink of the given pipeline:
In the following pipeline, routes are defined at the pipeline level under route. The route uses [Data Prepper expressions](https://github.com/opensearch-project/data-prepper/tree/main/examples) to define the condition. Two named routes are declared:

- `errors: /level == "ERROR"`

- `slow_requests: /latency_ms != null and /latency_ms >= 1000`

Each OpenSearch sink can opt in to one or more routes using the `routes:` setting. Events that satisfy a route’s condition are delivered to the sinks that reference that route, for example, the first sink receives events matching errors, and the second sink receives events matching slow_requests.

By default, any sink without a `routes:` list receives all events, regardless of whether they matched other routes. In the following example, the third sink has no `routes:` setting, so it acts as a catch-all and receives every event, including those already routed to the first two sinks:

```yml
conditional-routing-sample-pipeline:
routes-demo-pipeline:
source:
http:
processor:
path: /logs
ssl: false

route:
- application-logs: '/log_type == "application"'
- http-logs: '/log_type == "apache"'
- errors: '/level == "ERROR"'
- slow_requests: '/latency_ms != null and /latency_ms >= 1000'

sink:
# 1) Only events matching the "errors" route
- opensearch:
hosts: [ "https://opensearch:9200" ]
index: application_logs
routes: [application-logs]
hosts: ["https://opensearch:9200"]
insecure: true
username: admin
password: "admin_pass"
index_type: custom
index: "routed-errors-%{yyyy.MM.dd}"
routes: [errors]

# 2) Only events matching the "slow_requests" route
- opensearch:
hosts: [ "https://opensearch:9200" ]
index: http_logs
routes: [http-logs]
hosts: ["https://opensearch:9200"]
insecure: true
username: admin
password: "admin_pass"
index_type: custom
index: "routed-slow-%{yyyy.MM.dd}"
routes: [slow_requests]

# 3) All events
- opensearch:
hosts: [ "https://opensearch:9200" ]
index: all_logs
hosts: ["https://opensearch:9200"]
insecure: true
username: admin
password: "admin_pass"
index_type: custom
index: "routed-other-%{yyyy.MM.dd}"
```
{% include copy.html %}

You can test this pipeline using the following command:

```bash
curl -sS -X POST "http://localhost:2021/logs" \
-H "Content-Type: application/json" \
-d '[
{"level":"ERROR","message":"DB connection failed","latency_ms":120},
{"level":"INFO","message":"GET /api/items","latency_ms":1500},
{"level":"INFO","message":"health check ok","latency_ms":42}
]'
```
{% include copy.html %}

The documents are stored in the corresponding indexes:

```
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
...
green open routed-other-2025.10.14 IBZTXO3ySBGky0tIHRaRmg 1 1 3 0 5.4kb 5.4kb
green open routed-slow-2025.10.14 J-hzZ9m8RkWvpMKC_oQLVQ 1 1 1 0 5kb 5kb
green open routed-errors-2025.10.14 v3r7JzPfQVOS8dWOBF1o2w 1 1 1 0 5kb 5kb
...
```

## Next steps

- See [Common uses cases]({{site.url}}{{site.baseurl}}/data-prepper/common-use-cases/common-use-cases/) for example configurations.