Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 17 additions & 22 deletions src/lib/tableProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type GroupStatus = {
} | {
kind: 'fetched'
}
interface VirtualRowGroup {
interface RowGroup {
groupStart: number
groupEnd: number
state: Map<string, GroupStatus>
Expand All @@ -28,29 +28,24 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):

const cellCache = new Map<string, ResolvedValue<unknown>[]>(header.map(name => [name, []]))

// virtual row groups are up to 1000 rows within row group boundaries
const groups: VirtualRowGroup[] = []
const rowGroups: RowGroup[] = []
let groupStart = 0
for (const rg of metadata.row_groups) {
// make virtual row groups of size 1000
for (let j = 0; j < rg.num_rows; j += 1000) {
const groupSize = Math.min(1000, Number(rg.num_rows) - j)
const groupEnd = groupStart + groupSize
groups.push({
groupStart,
groupEnd,
state: new Map(header.map(name => [name, { kind: 'unfetched' }])),
})
groupStart = groupEnd
}
const groupEnd = groupStart + Number(rg.num_rows)
rowGroups.push({
groupStart,
groupEnd,
state: new Map(header.map(name => [name, { kind: 'unfetched' }])),
})
groupStart = groupEnd
}

async function fetchVirtualRowGroup({ group, columns }: {
group: VirtualRowGroup, columns: string[]
async function fetchRowGroup({ rowGroup, columns }: {
rowGroup: RowGroup, columns: string[]
}): Promise<void> {
const { groupStart, groupEnd, state } = group
const { groupStart, groupEnd, state } = rowGroup
const columnsToFetch = columns.filter(column => state.get(column)?.kind === 'unfetched')
const promises = [...group.state.values()].filter((status): status is { kind: 'fetching', promise: Promise<void> } => status.kind === 'fetching').map(status => status.promise)
const promises = [...rowGroup.state.values()].filter((status): status is { kind: 'fetching', promise: Promise<void> } => status.kind === 'fetching').map(status => status.promise)

// TODO(SL): pass AbortSignal to the worker?
if (columnsToFetch.length > 0) {
Expand Down Expand Up @@ -107,12 +102,12 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):

const promises: Promise<void>[] = []

groups.forEach((group) => {
const { groupStart, groupEnd } = group
rowGroups.forEach((rowGroup) => {
const { groupStart, groupEnd } = rowGroup
if (groupStart < rowEnd && groupEnd > rowStart) {
promises.push(
fetchVirtualRowGroup({
group,
fetchRowGroup({
rowGroup,
columns,
}).then(() => {
checkSignal(signal)
Expand Down