diff --git a/eslint.config.js b/eslint.config.js index 0f089139..857a7f7c 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -48,6 +48,7 @@ export default typescript.config( eqeqeq: 'error', 'func-style': ['error', 'declaration'], indent: ['error', 2, { SwitchCase: 1 }], + 'key-spacing': 'error', 'no-constant-condition': 'off', 'no-extra-parens': 'error', 'no-multi-spaces': 'error', diff --git a/src/lib/tableProvider.ts b/src/lib/tableProvider.ts index ebcedcd5..9164700b 100644 --- a/src/lib/tableProvider.ts +++ b/src/lib/tableProvider.ts @@ -28,6 +28,12 @@ export function computeSortIndex(orderByRanks: { direction: 'ascending' | 'desce }) } +interface VirtualRowGroup { + groupStart: number + groupEnd: number + fetching: boolean +} + /** * Convert a parquet file into a dataframe. */ @@ -37,33 +43,40 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): const sortCache = new Map>() const columnRanksCache = new Map>() const data = new Array(Number(metadata.num_rows)) - const groups = new Array(metadata.row_groups.length).fill(false) + + // virtual row groups are up to 1000 rows within row group boundaries + const groups: VirtualRowGroup[] = [] let groupStart = 0 - const groupEnds = metadata.row_groups.map(group => groupStart += Number(group.num_rows)) + for (const rg of metadata.row_groups) { + // make virtual row groups of size 1000 + for (let j = 0; j < rg.num_rows; j += 1000) { + const groupSize = Math.min(1000, Number(rg.num_rows) - j) + const groupEnd = groupStart + groupSize + groups.push({ groupStart, groupEnd, fetching: false }) + groupStart = groupEnd + } + } - function fetchRowGroup(groupIndex: number) { - if (!groups[groupIndex]) { - const rowStart = groupEnds[groupIndex - 1] ?? 0 - const rowEnd = groupEnds[groupIndex] - if (rowEnd === undefined) { - throw new Error(`Missing groupEnd for groupIndex: ${groupIndex}`) - } + function fetchVirtualRowGroup(virtualGroupIndex: number) { + const group = groups[virtualGroupIndex] + if (group && !group.fetching) { + group.fetching = true + const { groupStart, groupEnd } = group // Initialize with resolvable promises - for (let i = rowStart; i < rowEnd; i++) { + for (let i = groupStart; i < groupEnd; i++) { data[i] = resolvableRow(header) data[i]?.index.resolve(i) } - parquetQueryWorker({ from, metadata, rowStart, rowEnd }) - .then((groupData) => { - for (let i = rowStart; i < rowEnd; i++) { - const dataRow = data[i] + parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd }) + .then(groupData => { + for (let rowIndex = groupStart; rowIndex < groupEnd; rowIndex++) { + const dataRow = data[rowIndex] if (dataRow === undefined) { - throw new Error(`Missing data row for index ${i}`) + throw new Error(`Missing data row for index ${rowIndex}`) } - const j = i - rowStart - const row = groupData[j] + const row = groupData[rowIndex - groupStart] if (row === undefined) { - throw new Error(`Missing row in groupData for index: ${i - rowStart}`) + throw new Error(`Missing row in groupData for index ${rowIndex}`) } for (const [key, value] of Object.entries(row)) { const cell = dataRow.cells[key] @@ -75,13 +88,10 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): } }) .catch((error: unknown) => { - const prefix = `Error fetching row group ${groupIndex} (${rowStart}-${rowEnd}).` - console.error(prefix, error) - const reason = `${prefix} ${error}` + const reason = `Error fetching rows ${groupStart}-${groupEnd}: ${error}` // reject the index of the first row (it's enough to trigger the error bar) - data[rowStart]?.index.reject(reason) + data[groupStart]?.index.reject(reason) }) - groups[groupIndex] = true } } @@ -110,7 +120,7 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): return { header, numRows: Number(metadata.num_rows), - rows({ start, end, orderBy }: { start: number, end: number, orderBy?: OrderBy}) { + rows({ start, end, orderBy }) { if (orderBy?.length) { const numRows = end - start const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header)) @@ -118,8 +128,8 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): getSortIndex(orderBy).then(indices => { // Compute row groups to fetch for (const index of indices.slice(start, end)) { - const groupIndex = groupEnds.findIndex(end => index < end) - fetchRowGroup(groupIndex) + const groupIndex = groups.findIndex(({ groupEnd }) => index < groupEnd) + fetchVirtualRowGroup(groupIndex) } // Re-assemble data in sorted order into wrapped @@ -158,24 +168,16 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): } } }).catch((error: unknown) => { - console.error( - 'Error fetching sort index or resolving sorted rows', - error - ) + console.error('Error fetching sort index or resolving sorted rows', error) }) return wrapped } else { - for (let i = 0; i < groups.length; i++) { - const groupStart = groupEnds[i - 1] ?? 0 - const groupEnd = groupEnds[i] - if (groupEnd === undefined) { - throw new Error(`Missing group end at index ${i}`) - } - if (start < groupEnd && end > groupStart) { - fetchRowGroup(i) + groups.forEach(({ groupStart, groupEnd }, i) => { + if (groupStart < end && groupEnd > start) { + fetchVirtualRowGroup(i) } - } + }) const wrapped = data.slice(start, end) if (wrapped.some(row => row === undefined)) { throw new Error('Row not fetched')