Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions src/lib/tableProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type GroupStatus = {
} | {
kind: 'fetched'
}
interface RowGroup {
interface VirtualRowGroup {
groupStart: number
groupEnd: number
state: Map<string, GroupStatus>
Expand All @@ -28,24 +28,29 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):

const cellCache = new Map<string, ResolvedValue<unknown>[]>(header.map(name => [name, []]))

const rowGroups: RowGroup[] = []
// virtual row groups are up to 1000 rows within row group boundaries
const groups: VirtualRowGroup[] = []
let groupStart = 0
for (const rg of metadata.row_groups) {
const groupEnd = groupStart + Number(rg.num_rows)
rowGroups.push({
groupStart,
groupEnd,
state: new Map(header.map(name => [name, { kind: 'unfetched' }])),
})
groupStart = groupEnd
// make virtual row groups of size 1000
for (let j = 0; j < rg.num_rows; j += 1000) {
const groupSize = Math.min(1000, Number(rg.num_rows) - j)
const groupEnd = groupStart + groupSize
groups.push({
groupStart,
groupEnd,
state: new Map(header.map(name => [name, { kind: 'unfetched' }])),
})
groupStart = groupEnd
}
}

async function fetchRowGroup({ rowGroup, columns }: {
rowGroup: RowGroup, columns: string[]
async function fetchVirtualRowGroup({ group, columns }: {
group: VirtualRowGroup, columns: string[]
}): Promise<void> {
const { groupStart, groupEnd, state } = rowGroup
const { groupStart, groupEnd, state } = group
const columnsToFetch = columns.filter(column => state.get(column)?.kind === 'unfetched')
const promises = [...rowGroup.state.values()].filter((status): status is { kind: 'fetching', promise: Promise<void> } => status.kind === 'fetching').map(status => status.promise)
const promises = [...group.state.values()].filter((status): status is { kind: 'fetching', promise: Promise<void> } => status.kind === 'fetching').map(status => status.promise)

// TODO(SL): pass AbortSignal to the worker?
if (columnsToFetch.length > 0) {
Expand Down Expand Up @@ -102,12 +107,12 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):

const promises: Promise<void>[] = []

rowGroups.forEach((rowGroup) => {
const { groupStart, groupEnd } = rowGroup
groups.forEach((group) => {
const { groupStart, groupEnd } = group
if (groupStart < rowEnd && groupEnd > rowStart) {
promises.push(
fetchRowGroup({
rowGroup,
fetchVirtualRowGroup({
group,
columns,
}).then(() => {
checkSignal(signal)
Expand Down