From 763c1c71c05c71d78051d64b9400d24b4f2a253d Mon Sep 17 00:00:00 2001 From: Anton Rubin Date: Mon, 10 Nov 2025 18:44:48 +0000 Subject: [PATCH 1/2] adding delete_source example Signed-off-by: Anton Rubin --- .../pipelines/configuration/processors/csv.md | 114 +++++++++++++++++- 1 file changed, 108 insertions(+), 6 deletions(-) diff --git a/_data-prepper/pipelines/configuration/processors/csv.md b/_data-prepper/pipelines/configuration/processors/csv.md index fb9fc6f9d6e..67ebc39d014 100644 --- a/_data-prepper/pipelines/configuration/processors/csv.md +++ b/_data-prepper/pipelines/configuration/processors/csv.md @@ -16,12 +16,13 @@ The following table describes the options you can use to configure the `csv` pro Option | Required | Type | Description :--- | :--- | :--- | :--- -source | No | String | The field in the event that will be parsed. Default value is `message`. -quote_character | No | String | The character used as a text qualifier for a single column of data. Default value is `"`. -delimiter | No | String | The character separating each column. Default value is `,`. -delete_header | No | Boolean | If specified, the event header (`column_names_source_key`) is deleted after the event is parsed. If there is no event header, no action is taken. Default value is true. -column_names_source_key | No | String | The field in the event that specifies the CSV column names, which will be automatically detected. If there need to be extra column names, the column names are automatically generated according to their index. If `column_names` is also defined, the header in `column_names_source_key` can also be used to generate the event fields. If too few columns are specified in this field, the remaining column names are automatically generated. If too many column names are specified in this field, the CSV processor omits the extra column names. -column_names | No | List | User-specified names for the CSV columns. Default value is `[column1, column2, ..., columnN]` if there are no columns of data in the CSV record and `column_names_source_key` is not defined. If `column_names_source_key` is defined, the header in `column_names_source_key` generates the event fields. If too few columns are specified in this field, the remaining column names are automatically generated. If too many column names are specified in this field, the CSV processor omits the extra column names. +`source` | No | String | The field in the event that will be parsed. Default value is `message`. +`quote_character` | No | String | The character used as a text qualifier for a single column of data. Default value is `"`. +`delimiter` | No | String | The character separating each column. Default value is `,`. +`delete_header` | No | Boolean | If specified, the event header (`column_names_source_key`) is deleted after the event is parsed. If there is no event header, no action is taken. Default value is true. +`column_names_source_key` | No | String | The field in the event that specifies the CSV column names, which will be automatically detected. If there need to be extra column names, the column names are automatically generated according to their index. If `column_names` is also defined, the header in `column_names_source_key` can also be used to generate the event fields. If too few columns are specified in this field, the remaining column names are automatically generated. If too many column names are specified in this field, the CSV processor omits the extra column names. +`column_names` | No | List | User-specified names for the CSV columns. Default value is `[column1, column2, ..., columnN]` if there are no columns of data in the CSV record and `column_names_source_key` is not defined. If `column_names_source_key` is defined, the header in `column_names_source_key` generates the event fields. If too few columns are specified in this field, the remaining column names are automatically generated. If too many column names are specified in this field, the CSV processor omits the extra column names. +`delete_source` | No | Boolean | If `true`, deletes the configured `source` field (default `message`) after CSV parsing. Default is `false`. ## Usage @@ -97,6 +98,107 @@ Then, the processor parses the event into the following output. Because `delete_ {"message": "1,2,3", "a": "1", "b": "2", "c": "3"} ``` +### Delete the source field after parsing + +If you want to remove the original `message` field once columns are extracted, enable `delete_source`. See following example: + +```yaml +csv-pipeline-delete-source: + source: + file: + path: "/full/path/to/ingest.csv" + record_type: "event" + processor: + - csv: + column_names: ["col1", "col2"] + delete_source: true # default is false + sink: + - opensearch: + hosts: ["https://opensearch:9200"] + insecure: true + username: admin + password: admin_pass + index_type: custom + index: csv-demo-%{yyyy.MM.dd} +``` +{% include copy.html %} + +The documents stored in OpenSearch contain the following information: + +```json +{ + ... + "hits" : { + "total" : { + "value" : 2, + "relation" : "eq" + }, + "max_score" : 1.0, + "hits" : [ + { + "_index" : "csv-demo-2025.11.10", + "_id" : "vTgDb5oBcoMYUXV6ocPH", + "_score" : 1.0, + "_source" : { + "col1" : "1", + "col2" : "2", + "column3" : "3" + } + }, + { + "_index" : "csv-demo-2025.11.10", + "_id" : "vjgDb5oBcoMYUXV6ocPI", + "_score" : 1.0, + "_source" : { + "col1" : "4", + "col2" : "5", + "column3" : "6" + } + } + ] + } +} +``` + +If the `delete_source` is set to `false`, the documents would include the `message` field, see following example: + +```json +{ + ... + "hits" : { + "total" : { + "value" : 2, + "relation" : "eq" + }, + "max_score" : 1.0, + "hits" : [ + { + "_index" : "csv-demo-2025.11.10", + "_id" : "fpAKb5oB85vgu48rA-rD", + "_score" : 1.0, + "_source" : { + "message" : "1,2,3", + "col1" : "1", + "col2" : "2", + "column3" : "3" + } + }, + { + "_index" : "csv-demo-2025.11.10", + "_id" : "f5AKb5oB85vgu48rA-rD", + "_score" : 1.0, + "_source" : { + "message" : "4,5,6", + "col1" : "4", + "col2" : "5", + "column3" : "6" + } + } + ] + } +} +``` + ## Metrics The following table describes common [Abstract processor](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/AbstractProcessor.java) metrics. From e021232ed8a5b85af432291a739c80f40d109b45 Mon Sep 17 00:00:00 2001 From: Anton Rubin Date: Wed, 12 Nov 2025 13:07:14 +0000 Subject: [PATCH 2/2] addressing the PR comments Signed-off-by: Anton Rubin --- _data-prepper/pipelines/configuration/processors/csv.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/_data-prepper/pipelines/configuration/processors/csv.md b/_data-prepper/pipelines/configuration/processors/csv.md index 67ebc39d014..c20946330bb 100644 --- a/_data-prepper/pipelines/configuration/processors/csv.md +++ b/_data-prepper/pipelines/configuration/processors/csv.md @@ -22,7 +22,7 @@ Option | Required | Type | Description `delete_header` | No | Boolean | If specified, the event header (`column_names_source_key`) is deleted after the event is parsed. If there is no event header, no action is taken. Default value is true. `column_names_source_key` | No | String | The field in the event that specifies the CSV column names, which will be automatically detected. If there need to be extra column names, the column names are automatically generated according to their index. If `column_names` is also defined, the header in `column_names_source_key` can also be used to generate the event fields. If too few columns are specified in this field, the remaining column names are automatically generated. If too many column names are specified in this field, the CSV processor omits the extra column names. `column_names` | No | List | User-specified names for the CSV columns. Default value is `[column1, column2, ..., columnN]` if there are no columns of data in the CSV record and `column_names_source_key` is not defined. If `column_names_source_key` is defined, the header in `column_names_source_key` generates the event fields. If too few columns are specified in this field, the remaining column names are automatically generated. If too many column names are specified in this field, the CSV processor omits the extra column names. -`delete_source` | No | Boolean | If `true`, deletes the configured `source` field (default `message`) after CSV parsing. Default is `false`. +`delete_source` | No | Boolean | If `true`, deletes the configured `source` field (default `message`) after CSV parsing. This configuration option improves memory pressure if the `source` field is not going to be used as the processing is done in batches. Default is `false`. ## Usage @@ -111,7 +111,7 @@ csv-pipeline-delete-source: processor: - csv: column_names: ["col1", "col2"] - delete_source: true # default is false + delete_source: true sink: - opensearch: hosts: ["https://opensearch:9200"]