Skip to content

Commit f679813

Browse files
authored
DOC-1242 Handling logical types in parquet_encode and parquet_decode processors (#222)
1 parent d54a31f commit f679813

File tree

2 files changed

+89
-69
lines changed

2 files changed

+89
-69
lines changed

modules/components/pages/processors/parquet_decode.adoc

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,42 +17,72 @@ Introduced in version 4.4.0.
1717
endif::[]
1818

1919
```yml
20-
# Config fields, showing default values
20+
# Configuration fields, showing default values
2121
label: ""
22-
parquet_decode: {}
22+
parquet_decode:
23+
handle_logical_types: v1
2324
```
2425

25-
This processor uses https://github.com/parquet-go/parquet-go[https://github.com/parquet-go/parquet-go^], which is itself experimental. Therefore changes could be made into how this processor functions outside of major version releases.
26+
== Fields
27+
28+
=== `handle_logical_types`
29+
30+
Set to `v2` to enable enhanced decoding of logical types, or keep the default value (`v1`) to ignore logical type metadata when decoding values.
31+
32+
In Parquet format, logical types are represented using standard physical types along with metadata that provides additional context. For example, UUIDs are stored as a `FIXED_LEN_BYTE_ARRAY` physical type, but the schema metadata identifies them as UUIDs. By enabling `v2`, this processor uses the metadata descriptions of logical types to produce more meaningful values during decoding.
33+
34+
NOTE: For backward compatibility, this field enables logical-type handling for the specified Parquet format version, and all earlier versions. When creating new pipelines, Redpanda recommends that you use the newest documented version.
35+
36+
*Type*: `string`
37+
38+
*Default*: `v1`
39+
40+
Options:
41+
42+
[cols="2,8"]
43+
|===
44+
| Option | Description
45+
46+
| `v1`
47+
| No special handling of logical types.
48+
49+
| `v2`
50+
a| Logical types with enhanced decoding:
51+
52+
* `TIMESTAMP`: Decodes as an RFC3339 string describing the time. If the `isAdjustedToUTC` flag is set to `true` in the Parquet file, the time zone is set to UTC. If the flag is set to `false`, the time zone is set to local time.
53+
54+
* `UUID`: Decodes as a string: `00112233-4455-6677-8899-aabbccddeeff`.
55+
56+
|===
57+
58+
```yml
59+
# Examples
60+
61+
handle_logical_types: v2
62+
```
2663

2764
== Examples
2865

29-
[tabs]
30-
======
31-
Reading Parquet Files from AWS S3::
32-
+
33-
--
66+
=== Reading Parquet files from AWS S3
3467

35-
In this example we consume files from AWS S3 as they're written by listening onto an SQS queue for upload events. We make sure to use the `to_the_end` scanner which means files are read into memory in full, which then allows us to use a `parquet_decode` processor to expand each file into a batch of messages. Finally, we write the data out to local files as newline delimited JSON.
68+
In this example, a pipeline consumes Parquet files as soon as they are uploaded to an AWS S3 bucket. The pipeline listens to an SQS queue for upload events, and uses the `to_the_end` scanner to read the files into memory in full. The `parquet_decode` processor then decodes each file into a batch of structured messages. Finally, the data is written to local files in newline-delimited JSON format.
3669

3770
```yaml
3871
input:
3972
aws_s3:
4073
bucket: TODO
41-
prefix: foos/
74+
prefix: files/
4275
scanner:
4376
to_the_end: {}
4477
sqs:
4578
url: TODO
4679
processors:
47-
- parquet_decode: {}
48-
80+
- parquet_decode:
81+
handle_logical_types: v2
4982
output:
5083
file:
5184
codec: lines
52-
path: './foos/${! meta("s3_key") }.jsonl'
85+
path: './files/${! meta("s3_key") }.jsonl'
5386
```
5487

55-
--
56-
======
57-
5888
// end::single-source[]

modules/components/pages/processors/parquet_encode.adoc

Lines changed: 43 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ component_type_dropdown::[]
1212

1313
Encodes https://parquet.apache.org/docs/[Parquet files^] from a batch of structured messages.
1414

15+
1516
ifndef::env-cloud[]
1617
Introduced in version 4.4.0.
1718
endif::[]
@@ -23,7 +24,7 @@ Common::
2324
--
2425
2526
```yml
26-
# Common config fields, showing default values
27+
# Common configuration fields, showing default values
2728
label: ""
2829
parquet_encode:
2930
schema: [] # No default (required)
@@ -36,7 +37,7 @@ Advanced::
3637
--
3738
3839
```yml
39-
# All config fields, showing default values
40+
# All configuration fields, showing default values
4041
label: ""
4142
parquet_encode:
4243
schema: [] # No default (required)
@@ -47,64 +48,25 @@ parquet_encode:
4748
--
4849
======
4950

50-
This processor uses https://github.com/parquet-go/parquet-go[https://github.com/parquet-go/parquet-go^], which is itself experimental. Therefore changes could be made into how this processor functions outside of major version releases.
51-
52-
53-
== Examples
54-
55-
[tabs]
56-
======
57-
Writing Parquet Files to AWS S3::
58-
+
59-
--
60-
61-
In this example we use the batching mechanism of an `aws_s3` output to collect a batch of messages in memory, which then converts it to a parquet file and uploads it.
62-
63-
```yaml
64-
output:
65-
aws_s3:
66-
bucket: TODO
67-
path: 'stuff/${! timestamp_unix() }-${! uuid_v4() }.parquet'
68-
batching:
69-
count: 1000
70-
period: 10s
71-
processors:
72-
- parquet_encode:
73-
schema:
74-
- name: id
75-
type: INT64
76-
- name: weight
77-
type: DOUBLE
78-
- name: content
79-
type: BYTE_ARRAY
80-
default_compression: zstd
81-
```
82-
83-
--
84-
======
85-
8651
== Fields
8752

8853
=== `schema`
8954

9055
Parquet schema.
9156

92-
9357
*Type*: `array`
9458

95-
9659
=== `schema[].name`
9760

98-
The name of the column.
61+
The name of the column you want to encode.
9962

10063

10164
*Type*: `string`
10265

10366

10467
=== `schema[].type`
10568

106-
The type of the column, only applicable for leaf columns with no child fields. Some logical types can be specified here such as UTF8.
107-
69+
The data type of the column to encode. This field is only applicable for leaf columns with no child fields. The following options include logical types.
10870

10971
*Type*: `string`
11072

@@ -116,12 +78,15 @@ Options:
11678
, `FLOAT`
11779
, `DOUBLE`
11880
, `BYTE_ARRAY`
119-
, `UTF8`
120-
.
81+
, `TIMESTAMP`
82+
, `BSON`
83+
, `ENUM`
84+
, `JSON`
85+
, `UUID`
12186

12287
=== `schema[].repeated`
12388

124-
Whether the field is repeated.
89+
Whether a field is repeated.
12590

12691

12792
*Type*: `bool`
@@ -130,7 +95,7 @@ Whether the field is repeated.
13095

13196
=== `schema[].optional`
13297

133-
Whether the field is optional.
98+
Whether a field is optional.
13499

135100

136101
*Type*: `bool`
@@ -162,7 +127,7 @@ The default compression type to use for fields.
162127

163128
*Type*: `string`
164129

165-
*Default*: `"uncompressed"`
130+
*Default*: `uncompressed`
166131

167132
Options:
168133
`uncompressed`
@@ -171,16 +136,14 @@ Options:
171136
, `brotli`
172137
, `zstd`
173138
, `lz4raw`
174-
.
175139

176140
=== `default_encoding`
177141

178-
The default encoding type to use for fields. A custom default encoding is only necessary when consuming data with libraries that do not support `DELTA_LENGTH_BYTE_ARRAY` and is therefore best left unset where possible.
179-
142+
The default encoding type to use for fields. A custom default encoding is only necessary when consuming data with libraries that do not support `DELTA_LENGTH_BYTE_ARRAY`.
180143

181144
*Type*: `string`
182145

183-
*Default*: `"DELTA_LENGTH_BYTE_ARRAY"`
146+
*Default*: `DELTA_LENGTH_BYTE_ARRAY`
184147

185148
ifndef::env-cloud[]
186149
Requires version 4.11.0 or newer
@@ -189,6 +152,33 @@ endif::[]
189152
Options:
190153
`DELTA_LENGTH_BYTE_ARRAY`
191154
, `PLAIN`
192-
.
155+
156+
== Examples
157+
158+
=== Writing Parquet files to AWS S3
159+
160+
In this example, a pipeline uses an `aws_s3` output as a batching mechanism. Messages are collected in memory and encoded into a Parquet file, which is then uploaded to an AWS S3 bucket.
161+
162+
```yaml
163+
output:
164+
aws_s3:
165+
bucket: TODO
166+
path: 'stuff/${! timestamp_unix() }-${! uuid_v4() }.parquet'
167+
batching:
168+
count: 1000
169+
period: 10s
170+
processors:
171+
- parquet_encode:
172+
schema:
173+
- name: id
174+
type: INT64
175+
- name: weight
176+
type: DOUBLE
177+
- name: content
178+
type: BYTE_ARRAY
179+
default_compression: zstd
180+
```
181+
182+
193183

194184
// end::single-source[]

0 commit comments

Comments
 (0)