@@ -28,6 +28,12 @@ export function computeSortIndex(orderByRanks: { direction: 'ascending' | 'desce
2828 } )
2929}
3030
31+ interface VirtualRowGroup {
32+ groupStart : number
33+ groupEnd : number
34+ fetching : boolean
35+ }
36+
3137/**
3238 * Convert a parquet file into a dataframe.
3339 */
@@ -37,33 +43,40 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
3743 const sortCache = new Map < string , Promise < number [ ] > > ( )
3844 const columnRanksCache = new Map < string , Promise < number [ ] > > ( )
3945 const data = new Array < ResolvableRow | undefined > ( Number ( metadata . num_rows ) )
40- const groups = new Array ( metadata . row_groups . length ) . fill ( false )
46+
47+ // virtual row groups are up to 1000 rows within row group boundaries
48+ const groups : VirtualRowGroup [ ] = [ ]
4149 let groupStart = 0
42- const groupEnds = metadata . row_groups . map ( group => groupStart += Number ( group . num_rows ) )
50+ for ( const rg of metadata . row_groups ) {
51+ // make virtual row groups of size 1000
52+ for ( let j = 0 ; j < rg . num_rows ; j += 1000 ) {
53+ const groupSize = Math . min ( 1000 , Number ( rg . num_rows ) - j )
54+ const groupEnd = groupStart + groupSize
55+ groups . push ( { groupStart, groupEnd, fetching : false } )
56+ groupStart = groupEnd
57+ }
58+ }
4359
44- function fetchRowGroup ( groupIndex : number ) {
45- if ( ! groups [ groupIndex ] ) {
46- const rowStart = groupEnds [ groupIndex - 1 ] ?? 0
47- const rowEnd = groupEnds [ groupIndex ]
48- if ( rowEnd === undefined ) {
49- throw new Error ( `Missing groupEnd for groupIndex: ${ groupIndex } ` )
50- }
60+ function fetchVirtualRowGroup ( virtualGroupIndex : number ) {
61+ const group = groups [ virtualGroupIndex ]
62+ if ( group && ! group . fetching ) {
63+ group . fetching = true
64+ const { groupStart, groupEnd } = group
5165 // Initialize with resolvable promises
52- for ( let i = rowStart ; i < rowEnd ; i ++ ) {
66+ for ( let i = groupStart ; i < groupEnd ; i ++ ) {
5367 data [ i ] = resolvableRow ( header )
5468 data [ i ] ?. index . resolve ( i )
5569 }
56- parquetQueryWorker ( { from, metadata, rowStart, rowEnd } )
57- . then ( ( groupData ) => {
58- for ( let i = rowStart ; i < rowEnd ; i ++ ) {
59- const dataRow = data [ i ]
70+ parquetQueryWorker ( { from, metadata, rowStart : groupStart , rowEnd : groupEnd } )
71+ . then ( groupData => {
72+ for ( let rowIndex = groupStart ; rowIndex < groupEnd ; rowIndex ++ ) {
73+ const dataRow = data [ rowIndex ]
6074 if ( dataRow === undefined ) {
61- throw new Error ( `Missing data row for index ${ i } ` )
75+ throw new Error ( `Missing data row for index ${ rowIndex } ` )
6276 }
63- const j = i - rowStart
64- const row = groupData [ j ]
77+ const row = groupData [ rowIndex - groupStart ]
6578 if ( row === undefined ) {
66- throw new Error ( `Missing row in groupData for index: ${ i - rowStart } ` )
79+ throw new Error ( `Missing row in groupData for index ${ rowIndex } ` )
6780 }
6881 for ( const [ key , value ] of Object . entries ( row ) ) {
6982 const cell = dataRow . cells [ key ]
@@ -75,13 +88,10 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
7588 }
7689 } )
7790 . catch ( ( error : unknown ) => {
78- const prefix = `Error fetching row group ${ groupIndex } (${ rowStart } -${ rowEnd } ).`
79- console . error ( prefix , error )
80- const reason = `${ prefix } ${ error } `
91+ const reason = `Error fetching rows ${ groupStart } -${ groupEnd } : ${ error } `
8192 // reject the index of the first row (it's enough to trigger the error bar)
82- data [ rowStart ] ?. index . reject ( reason )
93+ data [ groupStart ] ?. index . reject ( reason )
8394 } )
84- groups [ groupIndex ] = true
8595 }
8696 }
8797
@@ -110,16 +120,16 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
110120 return {
111121 header,
112122 numRows : Number ( metadata . num_rows ) ,
113- rows ( { start, end, orderBy } : { start : number , end : number , orderBy ?: OrderBy } ) {
123+ rows ( { start, end, orderBy } ) {
114124 if ( orderBy ?. length ) {
115125 const numRows = end - start
116126 const wrapped = new Array ( numRows ) . fill ( null ) . map ( ( ) => resolvableRow ( header ) )
117127
118128 getSortIndex ( orderBy ) . then ( indices => {
119129 // Compute row groups to fetch
120130 for ( const index of indices . slice ( start , end ) ) {
121- const groupIndex = groupEnds . findIndex ( end => index < end )
122- fetchRowGroup ( groupIndex )
131+ const groupIndex = groups . findIndex ( ( { groupEnd } ) => index < groupEnd )
132+ fetchVirtualRowGroup ( groupIndex )
123133 }
124134
125135 // Re-assemble data in sorted order into wrapped
@@ -158,24 +168,16 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
158168 }
159169 }
160170 } ) . catch ( ( error : unknown ) => {
161- console . error (
162- 'Error fetching sort index or resolving sorted rows' ,
163- error
164- )
171+ console . error ( 'Error fetching sort index or resolving sorted rows' , error )
165172 } )
166173
167174 return wrapped
168175 } else {
169- for ( let i = 0 ; i < groups . length ; i ++ ) {
170- const groupStart = groupEnds [ i - 1 ] ?? 0
171- const groupEnd = groupEnds [ i ]
172- if ( groupEnd === undefined ) {
173- throw new Error ( `Missing group end at index ${ i } ` )
174- }
175- if ( start < groupEnd && end > groupStart ) {
176- fetchRowGroup ( i )
176+ groups . forEach ( ( { groupStart, groupEnd } , i ) => {
177+ if ( groupStart < end && groupEnd > start ) {
178+ fetchVirtualRowGroup ( i )
177179 }
178- }
180+ } )
179181 const wrapped = data . slice ( start , end )
180182 if ( wrapped . some ( row => row === undefined ) ) {
181183 throw new Error ( 'Row not fetched' )
0 commit comments