diff --git a/_data-prepper/pipelines/configuration/processors/key-value.md b/_data-prepper/pipelines/configuration/processors/key-value.md index 7d2a553d5f..26b09245de 100644 --- a/_data-prepper/pipelines/configuration/processors/key-value.md +++ b/_data-prepper/pipelines/configuration/processors/key-value.md @@ -11,6 +11,346 @@ nav_order: 170 You can use the `key_value` processor to parse the specified field into key-value pairs. You can customize the `key_value` processor to parse field information with the following options. The type for each of the following options is `string`. +## Examples + +The following examples demonstrate some of the configuration you can use with this processor. + +The examples don't use security and are for demonstration purposes only. We strongly recommend configuring SSL before using these examples in production. +{: .warning} + +### KV parsing, cleanup and de-duplication + +The following example parses `message` into `key=value` pairs, normalizes and cleans the keys, prefixes them with `meta_`, deduplicates values, and drops keys without values into `parsed_kv`: + +```yaml +kv-basic-pipeline: + source: + http: + path: /logs + ssl: false + + processor: + - key_value: + # Read key=value pairs from the "message" field (default anyway) + source: "message" + # Write parsed pairs into a nested object "parsed_kv" + destination: "parsed_kv" + + # Split pairs on '&' and split key vs value on '=' + field_split_characters: "&" + value_split_characters: "=" + + # Normalize keys and trim garbage whitespace around keys/values + transform_key: "lowercase" + delete_key_regex: "\\s+" # remove spaces from keys + delete_value_regex: "^\\s+|\\s+$" # trim leading/trailing spaces + + # Add a prefix to every key (after normalization + delete_key_regex) + prefix: "meta_" + + # Keep a single unique value for duplicate keys + skip_duplicate_values: true + + # Drop keys whose value is empty/absent (e.g., `empty=` or `novalue`) + drop_keys_with_no_value: true + + sink: + - opensearch: + hosts: ["https://opensearch:9200"] + insecure: true + username: admin + password: "admin_pass" + index_type: custom + index: "kv-basic-%{yyyy.MM.dd}" +``` +{% include copy.html %} + +You can test this pipeline using the following command: + +```bash +curl -sS -X POST "http://localhost:2021/logs" \ + -H "Content-Type: application/json" \ + -d '[ + {"message":"key1=value1&key1=value1&Key Two = value two & empty=&novalue"}, + {"message":"ENV = prod & TEAM = core & owner = alice "} + ]' +``` +{% include copy.html %} + +The documents stored in OpenSearch contain the following information: + +```json +{ + ... + "hits": { + "total": { + "value": 2, + "relation": "eq" + }, + "max_score": 1, + "hits": [ + { + "_index": "kv-basic-2025.10.14", + "_id": "M6d84pkB3P3jd6EROH_f", + "_score": 1, + "_source": { + "message": "key1=value1&key1=value1&Key Two = value two & empty=&novalue", + "parsed_kv": { + "meta_key1": "value1", + "meta_empty": "", + "meta_keytwo": "value two" + } + } + }, + { + "_index": "kv-basic-2025.10.14", + "_id": "NKd84pkB3P3jd6EROH_f", + "_score": 1, + "_source": { + "message": "ENV = prod & TEAM = core & owner = alice ", + "parsed_kv": { + "meta_owner": "alice", + "meta_team": "core", + "meta_env": "prod" + } + } + } + ] + } +} +``` + +### Grouped values to root + +The following example parses `payload` using `&&` between pairs and `==` between key and value, preserves bracketed groups as single values, writes results at the event root without overwriting existing fields, and records unmatched tokens as `null`: + +```yaml +kv-grouping-pipeline: + source: + http: + path: /logs + ssl: false + + processor: + - key_value: + source: "payload" + destination: null + + field_split_characters: "&&" # pair delimiter (OK with grouping) + value_split_characters: null # disable the default "=" + key_value_delimiter_regex: "==" # exact '==' for key/value + + value_grouping: true + remove_brackets: false + overwrite_if_destination_exists: false + non_match_value: null + + sink: + - opensearch: + hosts: ["https://opensearch:9200"] + insecure: true + username: admin + password: "admin_pass" + index_type: custom + index: "kv-regex-%{yyyy.MM.dd}" +``` +{% include copy.html %} + +You can test this pipeline using the following command: + +```bash +curl -sS -X POST "http://localhost:2021/logs" \ + -H "Content-Type: application/json" \ + -d '[ + { + "payload":"a==1&&b==[x=y,z=w]&&c==(inner=thing)&&http==http://example.com path", + "a":"keep-me" + }, + { + "payload":"good==yes&&broken-token&&url==https://opensearch.org home", + "note":"second doc" + } + ]' +``` +{% include copy.html %} + +The documents stored in OpenSearch contain the following information: + +```json +{ + ... + "hits": { + "total": { + "value": 2, + "relation": "eq" + }, + "max_score": 1, + "hits": [ + { + "_index": "kv-regex-2025.10.14", + "_id": "FuCX4pkB344hN2Iu62oT", + "_score": 1, + "_source": { + "payload": "a==1&&b==[x=y,z=w]&&c==(inner=thing)&&http==http://example.com path", + "a": "keep-me", + "b": "[x=y,z=w]", + "c": "(inner=thing)", + "http": "http://example.com path" + } + }, + { + "_index": "kv-regex-2025.10.14", + "_id": "F-CX4pkB344hN2Iu62oT", + "_score": 1, + "_source": { + "payload": "good==yes&&broken-token&&url==https://opensearch.org home", + "note": "second doc", + "broken-token": null, + "good": "yes", + "url": "https://opensearch.org home" + } + } + ] + } +} +``` + +### Conditional recursive KV parsing + +The following example parses bracketed nested `key=value` structures from `body` into `parsed.*` only when `/type == "nested"`, honoring grouping, enforcing strict nesting, and applying default fields while leaving non-nested events untouched: + +```yaml +kv-conditional-recursive-pipeline: + source: + http: + path: /logs + ssl: false + + processor: + - key_value: + source: "body" + destination: "parsed" + + key_value_when: '/type == "nested"' + recursive: true + + # Split rules (per docs; not regex) + field_split_characters: "&" + value_split_characters: "=" + + # Grouping & quoting (per docs) + value_grouping: true + string_literal_character: "\"" + remove_brackets: false + + # Keep only some top-level keys; then set defaults + include_keys: ["item1","item2","owner"] + default_values: + owner: "unknown" + region: "eu-west-1" + + strict_grouping: true + tags_on_failure: ["keyvalueprocessor_failure"] + + sink: + - opensearch: + hosts: ["https://opensearch:9200"] + insecure: true + username: admin + password: "admin_pass" + index_type: custom + index: "kv-recursive-%{yyyy.MM.dd}" +``` +{% include copy.html %} + +You can test this pipeline using the following command: + +```bash +curl -sS -X POST "http://localhost:2021/logs" \ + -H "Content-Type: application/json" \ + -d '[ + { + "type":"nested","body":"item1=[a=1&b=(c=3&d=)]&item2=2&owner=alice" + }, + { + "type":"flat","body":"item1=[should=not&be=parsed]&item2=42" + }, + { + "type":"nested","body":"item1=[desc=\"a=b + c=d\"&x=1]&item2=2" + } +]' +``` +{% include copy.html %} + +The documents stored in OpenSearch contain the following information: + +```json +{ + ... + "hits": { + "total": { + "value": 3, + "relation": "eq" + }, + "max_score": 1, + "hits": [ + { + "_index": "kv-recursive-2025.10.14", + "_id": "Q7fC4pkBc0UY8I7pZ6vZ", + "_score": 1, + "_source": { + "type": "nested", + "body": "item1=[a=1&b=(c=3&d=)]&item2=2&owner=alice", + "parsed": { + "owner": "alice", + "item2": "2", + "item1": { + "a": "1", + "b": { + "c": "3", + "d": { + "e": "5" + } + } + }, + "region": "eu-west-1" + } + } + }, + { + "_index": "kv-recursive-2025.10.14", + "_id": "RLfC4pkBc0UY8I7pZ6vZ", + "_score": 1, + "_source": { + "type": "flat", + "body": "item1=[should=not&be=parsed]&item2=42" + } + }, + { + "_index": "kv-recursive-2025.10.14", + "_id": "RbfC4pkBc0UY8I7pZ6vZ", + "_score": 1, + "_source": { + "type": "nested", + "body": """item1=[desc="a=b + c=d"&x=1]&item2=2""", + "parsed": { + "owner": "unknown", + "item2": "2", + "item1": { + "desc": "\"a=b + c=d\"", + "x": "1" + }, + "region": "eu-west-1" + } + } + } + ] + } +} +``` + +## Configuration + Option | Description | Example :--- | :--- | :--- `source` | The message field to be parsed. Optional. Default value is `message`. | If `source` is `"message1"`, `{"message1": {"key1=value1"}, "message2": {"key2=value2"}}` parses into `{"message1": {"key1=value1"}, "message2": {"key2=value2"}, "parsed_message": {"key1": "value1"}}`.