Skip to content

Commit da4663f

Browse files
authored
Revert "remove the concept of virtual row groups (#281)" (#283)
This reverts commit f9b7ac0.
1 parent 5968073 commit da4663f

File tree

1 file changed

+22
-17
lines changed

1 file changed

+22
-17
lines changed

src/lib/tableProvider.ts

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ type GroupStatus = {
1212
} | {
1313
kind: 'fetched'
1414
}
15-
interface RowGroup {
15+
interface VirtualRowGroup {
1616
groupStart: number
1717
groupEnd: number
1818
state: Map<string, GroupStatus>
@@ -28,24 +28,29 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
2828

2929
const cellCache = new Map<string, ResolvedValue<unknown>[]>(header.map(name => [name, []]))
3030

31-
const rowGroups: RowGroup[] = []
31+
// virtual row groups are up to 1000 rows within row group boundaries
32+
const groups: VirtualRowGroup[] = []
3233
let groupStart = 0
3334
for (const rg of metadata.row_groups) {
34-
const groupEnd = groupStart + Number(rg.num_rows)
35-
rowGroups.push({
36-
groupStart,
37-
groupEnd,
38-
state: new Map(header.map(name => [name, { kind: 'unfetched' }])),
39-
})
40-
groupStart = groupEnd
35+
// make virtual row groups of size 1000
36+
for (let j = 0; j < rg.num_rows; j += 1000) {
37+
const groupSize = Math.min(1000, Number(rg.num_rows) - j)
38+
const groupEnd = groupStart + groupSize
39+
groups.push({
40+
groupStart,
41+
groupEnd,
42+
state: new Map(header.map(name => [name, { kind: 'unfetched' }])),
43+
})
44+
groupStart = groupEnd
45+
}
4146
}
4247

43-
async function fetchRowGroup({ rowGroup, columns }: {
44-
rowGroup: RowGroup, columns: string[]
48+
async function fetchVirtualRowGroup({ group, columns }: {
49+
group: VirtualRowGroup, columns: string[]
4550
}): Promise<void> {
46-
const { groupStart, groupEnd, state } = rowGroup
51+
const { groupStart, groupEnd, state } = group
4752
const columnsToFetch = columns.filter(column => state.get(column)?.kind === 'unfetched')
48-
const promises = [...rowGroup.state.values()].filter((status): status is { kind: 'fetching', promise: Promise<void> } => status.kind === 'fetching').map(status => status.promise)
53+
const promises = [...group.state.values()].filter((status): status is { kind: 'fetching', promise: Promise<void> } => status.kind === 'fetching').map(status => status.promise)
4954

5055
// TODO(SL): pass AbortSignal to the worker?
5156
if (columnsToFetch.length > 0) {
@@ -102,12 +107,12 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
102107

103108
const promises: Promise<void>[] = []
104109

105-
rowGroups.forEach((rowGroup) => {
106-
const { groupStart, groupEnd } = rowGroup
110+
groups.forEach((group) => {
111+
const { groupStart, groupEnd } = group
107112
if (groupStart < rowEnd && groupEnd > rowStart) {
108113
promises.push(
109-
fetchRowGroup({
110-
rowGroup,
114+
fetchVirtualRowGroup({
115+
group,
111116
columns,
112117
}).then(() => {
113118
checkSignal(signal)

0 commit comments

Comments
 (0)