@@ -12,13 +12,11 @@ package streaming
1212
1313import (
1414 "bytes"
15- "encoding/binary"
1615 "errors"
1716 "fmt"
1817
1918 "github.com/parquet-go/parquet-go"
2019 "github.com/parquet-go/parquet-go/format"
21- "github.com/segmentio/encoding/thrift"
2220
2321 "github.com/redpanda-data/benthos/v4/public/service"
2422)
@@ -66,41 +64,40 @@ func objectMessageToRow(msg *service.Message, out []any, nameToPosition map[stri
6664 return nil
6765}
6866
69- func constructRowGroupFromObject (
67+ // writeRowGroupFromObject writes a batch of object messages directly to a concurrent row group's column writers,
68+ // then flushes (compresses) the row group. Values are written directly to the column writers as they are converted.
69+ func writeRowGroupFromObject (
7070 batch service.MessageBatch ,
7171 schema * parquet.Schema ,
7272 transformers []* dataTransformer ,
7373 mode SchemaMode ,
74- ) ([]parquet.Row , []* statsBuffer , error ) {
75- // We write all of our data in a columnar fashion, but need to pivot that data so that we can feed it into
76- // out parquet library (which sadly will redo the pivot - maybe we need a lower level abstraction...).
77- // So create a massive matrix that we will write stuff in columnar form, but then we don't need to move any
78- // data to create rows of the data via an in-place transpose operation.
79- //
80- // TODO: Consider caching/pooling this matrix as I expect many are similarily sized.
74+ rg * parquet.ConcurrentRowGroupWriter ,
75+ ) ([]* statsBuffer , error ) {
8176 rowWidth := len (schema .Fields ())
82- matrix := make ([]parquet.Value , len (batch )* rowWidth )
8377 nameToPosition := make (map [string ]int , rowWidth )
8478 stats := make ([]* statsBuffer , rowWidth )
8579 buffers := make ([]typedBuffer , rowWidth )
80+ columnWriters := rg .ColumnWriters ()
81+
8682 for idx , t := range transformers {
8783 leaf , ok := schema .Lookup (t .name )
8884 if ! ok {
89- return nil , nil , fmt .Errorf ("invariant failed: unable to find column %q" , t .name )
85+ return nil , fmt .Errorf ("invariant failed: unable to find column %q" , t .name )
9086 }
9187 buffers [idx ] = t .bufferFactory ()
92- buffers [idx ].Prepare ( matrix , leaf .ColumnIndex , rowWidth )
88+ buffers [idx ].Reset ( columnWriters [ leaf .ColumnIndex ], leaf . ColumnIndex )
9389 stats [idx ] = & statsBuffer {}
9490 nameToPosition [t .name ] = idx
9591 }
96- // First we need to shred our record into columns, snowflake's data model
97- // is thankfully a flat list of columns, so no dremel style record shredding
98- // is needed
92+
93+ // Shred records into columns - snowflake's data model is a flat list of columns,
94+ // so no dremel style record shredding is needed. Values are written directly
95+ // to column writers as they are converted.
9996 row := make ([]any , rowWidth )
10097 for _ , msg := range batch {
10198 err := objectMessageToRow (msg , row , nameToPosition , mode )
10299 if err != nil {
103- return nil , nil , err
100+ return nil , err
104101 }
105102 for i , v := range row {
106103 t := transformers [i ]
@@ -109,24 +106,21 @@ func constructRowGroupFromObject(
109106 err = t .converter .ValidateAndConvert (s , v , b )
110107 if err != nil {
111108 if errors .Is (err , errNullValue ) {
112- return nil , nil , & NonNullColumnError {msg , t .column .Name }
109+ return nil , & NonNullColumnError {msg , t .column .Name }
113110 }
114- // There is not special typed error for a validation error, there really isn't
115- // anything we can do about it.
116- return nil , nil , fmt .Errorf ("invalid data for column %s: %w" , t .name , err )
111+ return nil , fmt .Errorf ("invalid data for column %s: %w" , t .name , err )
117112 }
118113 // reset the column as nil for the next row
119114 row [i ] = nil
120115 }
121116 }
122- // Now all our values have been written to each buffer - here is where we do our matrix
123- // transpose mentioned above
124- rows := make ([]parquet.Row , len (batch ))
125- for i := range rows {
126- rowStart := i * rowWidth
127- rows [i ] = matrix [rowStart : rowStart + rowWidth ]
117+
118+ // Flush compresses the row group data
119+ if err := rg .Flush (); err != nil {
120+ return nil , fmt .Errorf ("failed to flush row group: %w" , err )
128121 }
129- return rows , stats , nil
122+
123+ return stats , nil
130124}
131125
132126// arrayMessageToRow converts a message into columnar form using the provided name to index mapping.
@@ -161,31 +155,35 @@ func arrayMessageToRow(msg *service.Message, out []any, mode SchemaMode) error {
161155 return nil
162156}
163157
164- func constructRowGroupFromArray (
158+ // writeRowGroupFromArray writes a batch of array messages directly to a concurrent row group's column writers,
159+ // then flushes (compresses) the row group. Values are written directly to the column writers as they are converted.
160+ func writeRowGroupFromArray (
165161 batch service.MessageBatch ,
166162 schema * parquet.Schema ,
167163 transformers []* dataTransformer ,
168164 mode SchemaMode ,
169- ) ([] parquet.Row , [] * statsBuffer , error ) {
170- // TODO: Switch to using concurrent row groups to write this stuff
165+ rg * parquet.ConcurrentRowGroupWriter ,
166+ ) ([] * statsBuffer , error ) {
171167 rowWidth := len (schema .Fields ())
172- matrix := make ([]parquet.Value , len (batch )* rowWidth )
173168 stats := make ([]* statsBuffer , rowWidth )
174169 buffers := make ([]typedBuffer , rowWidth )
170+ columnWriters := rg .ColumnWriters ()
171+
175172 for idx , t := range transformers {
176173 leaf , ok := schema .Lookup (t .name )
177174 if ! ok {
178- return nil , nil , fmt .Errorf ("invariant failed: unable to find column %q" , t .name )
175+ return nil , fmt .Errorf ("invariant failed: unable to find column %q" , t .name )
179176 }
180177 buffers [idx ] = t .bufferFactory ()
181- buffers [idx ].Prepare ( matrix , leaf .ColumnIndex , rowWidth )
178+ buffers [idx ].Reset ( columnWriters [ leaf .ColumnIndex ], leaf . ColumnIndex )
182179 stats [idx ] = & statsBuffer {}
183180 }
181+
184182 row := make ([]any , rowWidth )
185183 for _ , msg := range batch {
186184 err := arrayMessageToRow (msg , row , mode )
187185 if err != nil {
188- return nil , nil , err
186+ return nil , err
189187 }
190188 for i , v := range row {
191189 t := transformers [i ]
@@ -194,29 +192,27 @@ func constructRowGroupFromArray(
194192 err = t .converter .ValidateAndConvert (s , v , b )
195193 if err != nil {
196194 if errors .Is (err , errNullValue ) {
197- return nil , nil , & NonNullColumnError {msg , t .column .Name }
195+ return nil , & NonNullColumnError {msg , t .column .Name }
198196 }
199- // There is not special typed error for a validation error, there really isn't
200- // anything we can do about it.
201- return nil , nil , fmt .Errorf ("invalid data for column %s: %w" , t .name , err )
197+ return nil , fmt .Errorf ("invalid data for column %s: %w" , t .name , err )
202198 }
203199 // reset the column as nil for the next row
204200 row [i ] = nil
205201 }
206202 }
207- // Now all our values have been written to each buffer - here is where we do our matrix
208- // transpose mentioned above
209- rows := make ([]parquet.Row , len (batch ))
210- for i := range rows {
211- rowStart := i * rowWidth
212- rows [i ] = matrix [rowStart : rowStart + rowWidth ]
203+
204+ // Flush compresses the row group data
205+ if err := rg .Flush (); err != nil {
206+ return nil , fmt .Errorf ("failed to flush row group: %w" , err )
213207 }
214- return rows , stats , nil
208+
209+ return stats , nil
215210}
216211
217212type parquetWriter struct {
218- b * bytes.Buffer
219- w * parquet.GenericWriter [any ]
213+ b * bytes.Buffer
214+ w * parquet.GenericWriter [any ]
215+ schema * parquet.Schema
220216}
221217
222218func newParquetWriter (rpcnVersion string , schema * parquet.Schema ) * parquetWriter {
@@ -230,53 +226,32 @@ func newParquetWriter(rpcnVersion string, schema *parquet.Schema) *parquetWriter
230226 parquet .Compression (& parquet .Zstd ),
231227 parquet .WriteBufferSize (0 ),
232228 )
233- return & parquetWriter {b , w }
229+ return & parquetWriter {b , w , schema }
234230}
235231
236- // WriteFile writes a new parquet file using the rows and metadata.
237- //
238- // NOTE: metadata is sticky - if you want the next file to remove metadata you need to set the value to the empty string
239- // to actually remove it. In the usage of this method in this package, the metadata keys are all always the same.
240- func (w * parquetWriter ) WriteFile (rows []parquet.Row , metadata map [string ]string ) (out []byte , err error ) {
232+ // BeginRowGroup creates a new concurrent row group for parallel construction.
233+ func (w * parquetWriter ) BeginRowGroup () * parquet.ConcurrentRowGroupWriter {
234+ return w .w .BeginRowGroup ()
235+ }
236+
237+ // Reset prepares the writer for a new file with the given metadata.
238+ func (w * parquetWriter ) Reset (metadata map [string ]string ) {
241239 for k , v := range metadata {
242240 w .w .SetKeyValueMetadata (k , v )
243241 }
244242 w .b .Reset ()
245243 w .w .Reset (w .b )
246- defer func () {
247- if r := recover (); r != nil {
248- err = fmt .Errorf ("encoding panic: %v" , r )
249- }
250- }()
251- _ , err = w .w .WriteRows (rows )
252- if err != nil {
253- return
254- }
255- err = w .w .Close ()
256- out = w .b .Bytes ()
257- return
258244}
259245
260- func readParquetMetadata (parquetFile []byte ) (metadata format.FileMetaData , err error ) {
261- if len (parquetFile ) < 8 {
262- return format.FileMetaData {}, fmt .Errorf ("too small of parquet file: %d" , len (parquetFile ))
263- }
264- trailingBytes := parquetFile [len (parquetFile )- 8 :]
265- if string (trailingBytes [4 :]) != "PAR1" {
266- return metadata , fmt .Errorf ("missing magic bytes, got: %q" , trailingBytes [4 :])
267- }
268- footerSize := int (binary .LittleEndian .Uint32 (trailingBytes ))
269- if len (parquetFile ) < footerSize + 8 {
270- return metadata , fmt .Errorf ("too small of parquet file: %d, footer size: %d" , len (parquetFile ), footerSize )
271- }
272- footerBytes := parquetFile [len (parquetFile )- (footerSize + 8 ) : len (parquetFile )- 8 ]
273- if err := thrift .Unmarshal (new (thrift.CompactProtocol ), footerBytes , & metadata ); err != nil {
274- return metadata , fmt .Errorf ("unable to extract parquet metadata: %w" , err )
246+ // Close finalizes the parquet file and returns the bytes.
247+ func (w * parquetWriter ) Close () ([]byte , * format.FileMetaData , error ) {
248+ if err := w .w .Close (); err != nil {
249+ return nil , nil , err
275250 }
276- return
251+ return w . b . Bytes (), w . w . File (). Metadata (), nil
277252}
278253
279- func totalUncompressedSize (metadata format.FileMetaData ) int32 {
254+ func totalUncompressedSize (metadata * format.FileMetaData ) int32 {
280255 var size int64
281256 for _ , rowGroup := range metadata .RowGroups {
282257 size += rowGroup .TotalByteSize
0 commit comments