diff --git a/src/lib/tableProvider.ts b/src/lib/tableProvider.ts index 2e12dad4..6e39f9cc 100644 --- a/src/lib/tableProvider.ts +++ b/src/lib/tableProvider.ts @@ -12,7 +12,7 @@ type GroupStatus = { } | { kind: 'fetched' } -interface RowGroup { +interface VirtualRowGroup { groupStart: number groupEnd: number state: Map @@ -28,24 +28,29 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): const cellCache = new Map[]>(header.map(name => [name, []])) - const rowGroups: RowGroup[] = [] + // virtual row groups are up to 1000 rows within row group boundaries + const groups: VirtualRowGroup[] = [] let groupStart = 0 for (const rg of metadata.row_groups) { - const groupEnd = groupStart + Number(rg.num_rows) - rowGroups.push({ - groupStart, - groupEnd, - state: new Map(header.map(name => [name, { kind: 'unfetched' }])), - }) - groupStart = groupEnd + // make virtual row groups of size 1000 + for (let j = 0; j < rg.num_rows; j += 1000) { + const groupSize = Math.min(1000, Number(rg.num_rows) - j) + const groupEnd = groupStart + groupSize + groups.push({ + groupStart, + groupEnd, + state: new Map(header.map(name => [name, { kind: 'unfetched' }])), + }) + groupStart = groupEnd + } } - async function fetchRowGroup({ rowGroup, columns }: { - rowGroup: RowGroup, columns: string[] + async function fetchVirtualRowGroup({ group, columns }: { + group: VirtualRowGroup, columns: string[] }): Promise { - const { groupStart, groupEnd, state } = rowGroup + const { groupStart, groupEnd, state } = group const columnsToFetch = columns.filter(column => state.get(column)?.kind === 'unfetched') - const promises = [...rowGroup.state.values()].filter((status): status is { kind: 'fetching', promise: Promise } => status.kind === 'fetching').map(status => status.promise) + const promises = [...group.state.values()].filter((status): status is { kind: 'fetching', promise: Promise } => status.kind === 'fetching').map(status => status.promise) // TODO(SL): pass AbortSignal to the worker? if (columnsToFetch.length > 0) { @@ -102,12 +107,12 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): const promises: Promise[] = [] - rowGroups.forEach((rowGroup) => { - const { groupStart, groupEnd } = rowGroup + groups.forEach((group) => { + const { groupStart, groupEnd } = group if (groupStart < rowEnd && groupEnd > rowStart) { promises.push( - fetchRowGroup({ - rowGroup, + fetchVirtualRowGroup({ + group, columns, }).then(() => { checkSignal(signal)