@@ -23,6 +23,7 @@ type TransformConfig struct {
2323 Reader parquet.ReaderAtSeeker
2424 Writer io.Writer
2525 Compression * compress.Compression
26+ RowGroupLength int
2627 TransformSchema SchemaTransformer
2728 TransformColumn ColumnTransformer
2829 BeforeClose func (* file.Reader , * file.Writer ) error
@@ -50,6 +51,10 @@ func getWriterProperties(config *TransformConfig, fileReader *file.Reader) (*par
5051 }
5152 }
5253
54+ if config .RowGroupLength > 0 {
55+ writerProperties = append (writerProperties , parquet .WithMaxRowGroupLength (int64 (config .RowGroupLength )))
56+ }
57+
5358 return parquet .NewWriterProperties (writerProperties ... ), nil
5459}
5560
@@ -104,34 +109,85 @@ func TransformByColumn(config *TransformConfig) error {
104109
105110 ctx := pqarrow .NewArrowWriteContext (context .Background (), nil )
106111
107- numRowGroups := fileReader .NumRowGroups ()
108- for rowGroupIndex := 0 ; rowGroupIndex < numRowGroups ; rowGroupIndex += 1 {
109- rowGroupReader := arrowReader .RowGroup (rowGroupIndex )
110- rowGroupWriter := fileWriter .AppendRowGroup ()
112+ if config .RowGroupLength > 0 {
113+ columnReaders := make ([]* pqarrow.ColumnReader , numFields )
111114 for fieldNum := 0 ; fieldNum < numFields ; fieldNum += 1 {
112- arr , readErr := rowGroupReader . Column ( fieldNum ). Read ( ctx )
113- if readErr != nil {
114- return readErr
115+ colReader , err := arrowReader . GetColumn ( ctx , fieldNum )
116+ if err != nil {
117+ return err
115118 }
116- if config .TransformColumn != nil {
117- inputField := inputManifest .Fields [fieldNum ].Field
118- outputField := outputManifest .Fields [fieldNum ].Field
119- transformed , err := config .TransformColumn (inputField , outputField , arr )
120- if err != nil {
121- return err
119+ columnReaders [fieldNum ] = colReader
120+ }
121+
122+ numRows := fileReader .NumRows ()
123+ numRowsWritten := int64 (0 )
124+ for {
125+ rowGroupWriter := fileWriter .AppendRowGroup ()
126+ for fieldNum := 0 ; fieldNum < numFields ; fieldNum += 1 {
127+ colReader := columnReaders [fieldNum ]
128+ arr , readErr := colReader .NextBatch (int64 (config .RowGroupLength ))
129+ if readErr != nil {
130+ return readErr
122131 }
123- if transformed .DataType () != outputField .Type {
124- return fmt .Errorf ("transform generated an unexpected type, got %s, expected %s" , transformed .DataType ().Name (), outputField .Type .Name ())
132+ if config .TransformColumn != nil {
133+ inputField := inputManifest .Fields [fieldNum ].Field
134+ outputField := outputManifest .Fields [fieldNum ].Field
135+ transformed , err := config .TransformColumn (inputField , outputField , arr )
136+ if err != nil {
137+ return err
138+ }
139+ if transformed .DataType () != outputField .Type {
140+ return fmt .Errorf ("transform generated an unexpected type, got %s, expected %s" , transformed .DataType ().Name (), outputField .Type .Name ())
141+ }
142+ arr = transformed
143+ }
144+ colWriter , colWriterErr := pqarrow .NewArrowColumnWriter (arr , 0 , int64 (arr .Len ()), outputManifest , rowGroupWriter , fieldNum )
145+ if colWriterErr != nil {
146+ return colWriterErr
147+ }
148+ if err := colWriter .Write (ctx ); err != nil {
149+ return err
125150 }
126- arr = transformed
127- }
128- colWriter , colWriterErr := pqarrow .NewArrowColumnWriter (arr , 0 , int64 (arr .Len ()), outputManifest , rowGroupWriter , fieldNum )
129- if colWriterErr != nil {
130- return colWriterErr
131151 }
132- if err := colWriter .Write (ctx ); err != nil {
152+ numRowsInGroup , err := rowGroupWriter .NumRows ()
153+ if err != nil {
133154 return err
134155 }
156+ numRowsWritten += int64 (numRowsInGroup )
157+ if numRowsWritten >= numRows {
158+ break
159+ }
160+ }
161+ } else {
162+ numRowGroups := fileReader .NumRowGroups ()
163+ for rowGroupIndex := 0 ; rowGroupIndex < numRowGroups ; rowGroupIndex += 1 {
164+ rowGroupReader := arrowReader .RowGroup (rowGroupIndex )
165+ rowGroupWriter := fileWriter .AppendRowGroup ()
166+ for fieldNum := 0 ; fieldNum < numFields ; fieldNum += 1 {
167+ arr , readErr := rowGroupReader .Column (fieldNum ).Read (ctx )
168+ if readErr != nil {
169+ return readErr
170+ }
171+ if config .TransformColumn != nil {
172+ inputField := inputManifest .Fields [fieldNum ].Field
173+ outputField := outputManifest .Fields [fieldNum ].Field
174+ transformed , err := config .TransformColumn (inputField , outputField , arr )
175+ if err != nil {
176+ return err
177+ }
178+ if transformed .DataType () != outputField .Type {
179+ return fmt .Errorf ("transform generated an unexpected type, got %s, expected %s" , transformed .DataType ().Name (), outputField .Type .Name ())
180+ }
181+ arr = transformed
182+ }
183+ colWriter , colWriterErr := pqarrow .NewArrowColumnWriter (arr , 0 , int64 (arr .Len ()), outputManifest , rowGroupWriter , fieldNum )
184+ if colWriterErr != nil {
185+ return colWriterErr
186+ }
187+ if err := colWriter .Write (ctx ); err != nil {
188+ return err
189+ }
190+ }
135191 }
136192 }
137193
0 commit comments