Skip to content

Commit 91e45c5

Browse files
committed
fix logic in groups fetching
1 parent efd2b3d commit 91e45c5

File tree

1 file changed

+22
-18
lines changed

1 file changed

+22
-18
lines changed

src/lib/tableProvider.ts

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,18 @@ import { FileMetaData, parquetSchema } from 'hyparquet'
44
import { parquetQueryWorker } from './workers/parquetWorkerClient.js'
55
import type { AsyncBufferFrom } from './workers/types.d.ts'
66

7+
type GroupStatus = {
8+
kind: 'unfetched'
9+
} | {
10+
kind: 'fetching'
11+
promise: Promise<void>
12+
} | {
13+
kind: 'fetched'
14+
}
715
interface VirtualRowGroup {
816
groupStart: number
917
groupEnd: number
10-
fetching: Map<string, boolean>
11-
fetched: Map<string, boolean>
18+
state: Map<string, GroupStatus>
1219
}
1320

1421
/**
@@ -32,8 +39,7 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
3239
groups.push({
3340
groupStart,
3441
groupEnd,
35-
fetching: new Map(header.map(name => [name, false])),
36-
fetched: new Map(header.map(name => [name, false])),
42+
state: new Map(header.map(name => [name, { kind: 'unfetched' }])),
3743
})
3844
groupStart = groupEnd
3945
}
@@ -42,24 +48,22 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
4248
async function fetchVirtualRowGroup({ group, columns }: {
4349
group: VirtualRowGroup, columns: string[]
4450
}): Promise<void> {
45-
const { groupStart, groupEnd, fetching, fetched } = group
46-
const columnsToFetch = columns.filter(column => !fetching.get(column) && !fetched.get(column))
47-
48-
if (columnsToFetch.length === 0) {
49-
// nothing to fetch
50-
return
51-
}
52-
53-
columnsToFetch.forEach(column => {
54-
fetching.set(column, true)
55-
})
51+
const { groupStart, groupEnd, state } = group
52+
const columnsToFetch = columns.filter(column => state.get(column)?.kind === 'unfetched')
53+
const promises = [...group.state.values()].filter((status): status is { kind: 'fetching', promise: Promise<void> } => status.kind === 'fetching').map(status => status.promise)
5654

5755
// TODO(SL): pass AbortSignal to the worker?
58-
await parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd, columns: columnsToFetch, onChunk })
56+
if (columnsToFetch.length > 0) {
57+
const commonPromise = parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd, columns: columnsToFetch, onChunk })
58+
columnsToFetch.forEach(column => {
59+
state.set(column, { kind: 'fetching', promise: commonPromise })
60+
})
61+
promises.push(commonPromise)
62+
}
63+
await Promise.all(promises)
5964

6065
columnsToFetch.forEach(column => {
61-
fetching.set(column, false)
62-
fetched.set(column, true)
66+
state.set(column, { kind: 'fetched' })
6367
})
6468

6569
}

0 commit comments

Comments
 (0)