Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eslint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export default typescript.config(
eqeqeq: 'error',
'func-style': ['error', 'declaration'],
indent: ['error', 2, { SwitchCase: 1 }],
'key-spacing': 'error',
'no-constant-condition': 'off',
'no-extra-parens': 'error',
'no-multi-spaces': 'error',
Expand Down
67 changes: 36 additions & 31 deletions src/lib/tableProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ export function computeSortIndex(orderByRanks: { direction: 'ascending' | 'desce
})
}

interface VirtualRowGroup {
groupStart: number
groupEnd: number
fetching: boolean
}

/**
* Convert a parquet file into a dataframe.
*/
Expand All @@ -37,33 +43,41 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
const sortCache = new Map<string, Promise<number[]>>()
const columnRanksCache = new Map<string, Promise<number[]>>()
const data = new Array<ResolvableRow | undefined>(Number(metadata.num_rows))
const groups = new Array(metadata.row_groups.length).fill(false)

// virtual row groups are up to 1000 rows within row group boundaries
const groups: VirtualRowGroup[] = []
let groupStart = 0
const groupEnds = metadata.row_groups.map(group => groupStart += Number(group.num_rows))
for (const rg of metadata.row_groups) {
// make virtual row groups of size 1000
for (let j = 0; j < rg.num_rows; j += 1000) {
const groupSize = Math.min(1000, Number(rg.num_rows) - j)
const groupEnd = groupStart + groupSize
groups.push({ groupStart, groupEnd, fetching: false })
groupStart = groupEnd
}
}

function fetchRowGroup(groupIndex: number) {
if (!groups[groupIndex]) {
const rowStart = groupEnds[groupIndex - 1] ?? 0
const rowEnd = groupEnds[groupIndex]
if (rowEnd === undefined) {
throw new Error(`Missing groupEnd for groupIndex: ${groupIndex}`)
}
const group = groups[groupIndex]
if (group && !group.fetching) {
group.fetching = true
const { groupStart, groupEnd } = group
// Initialize with resolvable promises
for (let i = rowStart; i < rowEnd; i++) {
for (let i = groupStart; i < groupEnd; i++) {
data[i] = resolvableRow(header)
data[i]?.index.resolve(i)
}
parquetQueryWorker({ from, metadata, rowStart, rowEnd })
.then((groupData) => {
for (let i = rowStart; i < rowEnd; i++) {
parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd })
.then(groupData => {
for (let i = groupStart; i < groupEnd; i++) {
const dataRow = data[i]
if (dataRow === undefined) {
throw new Error(`Missing data row for index ${i}`)
}
const j = i - rowStart
const j = i - groupStart
const row = groupData[j]
if (row === undefined) {
throw new Error(`Missing row in groupData for index: ${i - rowStart}`)
throw new Error(`Missing row in groupData for index: ${i - groupStart}`)
}
for (const [key, value] of Object.entries(row)) {
const cell = dataRow.cells[key]
Expand All @@ -75,13 +89,12 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
}
})
.catch((error: unknown) => {
const prefix = `Error fetching row group ${groupIndex} (${rowStart}-${rowEnd}).`
const prefix = `Error fetching row group ${groupIndex} (${groupStart}-${groupEnd}).`
console.error(prefix, error)
const reason = `${prefix} ${error}`
// reject the index of the first row (it's enough to trigger the error bar)
data[rowStart]?.index.reject(reason)
data[groupStart]?.index.reject(reason)
})
groups[groupIndex] = true
}
}

Expand Down Expand Up @@ -110,15 +123,15 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
return {
header,
numRows: Number(metadata.num_rows),
rows({ start, end, orderBy }: { start: number, end: number, orderBy?: OrderBy}) {
rows({ start, end, orderBy }) {
if (orderBy?.length) {
const numRows = end - start
const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header))

getSortIndex(orderBy).then(indices => {
// Compute row groups to fetch
for (const index of indices.slice(start, end)) {
const groupIndex = groupEnds.findIndex(end => index < end)
const groupIndex = groups.findIndex(({ groupEnd }) => index < groupEnd)
fetchRowGroup(groupIndex)
}

Expand Down Expand Up @@ -158,24 +171,16 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
}
}
}).catch((error: unknown) => {
console.error(
'Error fetching sort index or resolving sorted rows',
error
)
console.error('Error fetching sort index or resolving sorted rows', error)
})

return wrapped
} else {
for (let i = 0; i < groups.length; i++) {
const groupStart = groupEnds[i - 1] ?? 0
const groupEnd = groupEnds[i]
if (groupEnd === undefined) {
throw new Error(`Missing group end at index ${i}`)
}
if (start < groupEnd && end > groupStart) {
groups.forEach(({ groupStart, groupEnd }, i) => {
if (groupStart < end && groupEnd > start) {
fetchRowGroup(i)
}
}
})
const wrapped = data.slice(start, end)
if (wrapped.some(row => row === undefined)) {
throw new Error('Row not fetched')
Expand Down