Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 76 additions & 68 deletions src/lib/tableProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,80 +107,88 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
return sortIndex
}

function sortedRows({ start, end, orderBy }: { start: number, end: number, orderBy: OrderBy}) {
const numRows = end - start
const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header))

getSortIndex(orderBy).then(indices => {
// Compute row groups to fetch
for (const index of indices.slice(start, end)) {
const groupIndex = groupEnds.findIndex(end => index < end)
fetchRowGroup(groupIndex)
}

// Re-assemble data in sorted order into wrapped
for (let i = start; i < end; i++) {
const index = indices[i]
if (index === undefined) {
throw new Error(`index ${i} not found in indices`)
}
const row = data[index]
if (row === undefined) {
throw new Error('Row not fetched')
}
const { cells } = row
const wrappedRow = wrapped[i - start]
if (wrappedRow === undefined) {
throw new Error(`Wrapped row missing at index ${i - start}`)
}
wrappedRow.index.resolve(index)
for (const key of header) {
const cell = cells[key]
if (cell) {
// TODO(SL): should we remove this check? It makes sense only if header change
// but if so, I guess we will have more issues
cell
.then((value: unknown) => {
const wrappedCell = wrappedRow.cells[key]
if (wrappedCell === undefined) {
throw new Error(`Wrapped cell not found for column ${key}`)
}
wrappedCell.resolve(value)
})
.catch((error: unknown) => {
console.error('Error resolving sorted row', error)
})
}
}
}
}).catch((error: unknown) => {
console.error(
'Error fetching sort index or resolving sorted rows',
error
)
})

return wrapped
}

function unsortedRows({ start, end }: { start: number, end: number }) {
for (let i = 0; i < groups.length; i++) {
const groupStart = groupEnds[i - 1] ?? 0
const groupEnd = groupEnds[i]
if (groupEnd === undefined) {
throw new Error(`Missing group end at index ${i}`)
}
if (start < groupEnd && end > groupStart) {
fetchRowGroup(i)
}
}
const wrapped = data.slice(start, end)
if (wrapped.some(row => row === undefined)) {
throw new Error('Row not fetched')
}
return wrapped as ResolvableRow[]
}

return {
header,
numRows: Number(metadata.num_rows),
rows({ start, end, orderBy }: { start: number, end: number, orderBy?: OrderBy}) {
if (orderBy?.length) {
const numRows = end - start
const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header))

getSortIndex(orderBy).then(indices => {
// Compute row groups to fetch
for (const index of indices.slice(start, end)) {
const groupIndex = groupEnds.findIndex(end => index < end)
fetchRowGroup(groupIndex)
}

// Re-assemble data in sorted order into wrapped
for (let i = start; i < end; i++) {
const index = indices[i]
if (index === undefined) {
throw new Error(`index ${i} not found in indices`)
}
const row = data[index]
if (row === undefined) {
throw new Error('Row not fetched')
}
const { cells } = row
const wrappedRow = wrapped[i - start]
if (wrappedRow === undefined) {
throw new Error(`Wrapped row missing at index ${i - start}`)
}
wrappedRow.index.resolve(index)
for (const key of header) {
const cell = cells[key]
if (cell) {
// TODO(SL): should we remove this check? It makes sense only if header change
// but if so, I guess we will have more issues
cell
.then((value: unknown) => {
const wrappedCell = wrappedRow.cells[key]
if (wrappedCell === undefined) {
throw new Error(`Wrapped cell not found for column ${key}`)
}
wrappedCell.resolve(value)
})
.catch((error: unknown) => {
console.error('Error resolving sorted row', error)
})
}
}
}
}).catch((error: unknown) => {
console.error(
'Error fetching sort index or resolving sorted rows',
error
)
})

return wrapped
return sortedRows({ start, end, orderBy })
} else {
for (let i = 0; i < groups.length; i++) {
const groupStart = groupEnds[i - 1] ?? 0
const groupEnd = groupEnds[i]
if (groupEnd === undefined) {
throw new Error(`Missing group end at index ${i}`)
}
if (start < groupEnd && end > groupStart) {
fetchRowGroup(i)
}
}
const wrapped = data.slice(start, end)
if (wrapped.some(row => row === undefined)) {
throw new Error('Row not fetched')
}
return wrapped as ResolvableRow[]
return unsortedRows({ start, end })
}
},
sortable: true,
Expand Down
12 changes: 7 additions & 5 deletions src/lib/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,22 @@ export function cn(...names: (string | undefined | false)[]): string {
/**
* Convert AsyncBufferFromUrl to AsyncBuffer.
*/
export function asyncBufferFrom(from: AsyncBufferFrom): Promise<AsyncBuffer> {
export async function asyncBufferFrom(from: AsyncBufferFrom): Promise<AsyncBuffer> {
if ('url' in from) {
// Cached asyncBuffer for urls only
const key = JSON.stringify(from)
const cached = cache.get(key)
if (cached) return cached
const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer)
cache.set(key, asyncBuffer)
return asyncBuffer
const buffer = await asyncBufferFromUrl(from)
const newCached = cachedAsyncBuffer(buffer)
// only cache if no error, otherwise let the error bubble up
cache.set(key, newCached)
return newCached
} else {
return from.file.arrayBuffer()
}
}
const cache = new Map<string, Promise<AsyncBuffer>>()
const cache = new Map<string, AsyncBuffer>()
// TODO(SL): do we really want a singleton?

export function getFileDateShort(file?: { lastModified?: string }): string {
Expand Down
15 changes: 11 additions & 4 deletions src/lib/workers/parquetWorker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ColumnData, parquetQuery } from 'hyparquet'
import { ColumnData, parquetRead, parquetReadObjects } from 'hyparquet'
import { compressors } from 'hyparquet-compressors'
import { getParquetColumn } from '../getParquetColumn.js'
import { asyncBufferFrom } from '../utils.js'
Expand Down Expand Up @@ -51,11 +51,18 @@ self.onmessage = async ({ data }: { data: ClientMessage }) => {
} catch (error) {
postErrorMessage({ error: error as Error, queryId })
}
} else if (data.chunks) {
function onChunk(chunk: ColumnData) { postChunkMessage({ chunk, queryId }) }
const { rowStart, rowEnd } = data
try {
await parquetRead({ metadata, file, rowStart, rowEnd, compressors, onChunk })
} catch (error) {
postErrorMessage({ error: error as Error, queryId })
}
} else {
const { rowStart, rowEnd, chunks } = data
const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined
const { rowStart, rowEnd } = data
try {
const result = await parquetQuery({ metadata, file, rowStart, rowEnd, compressors, onChunk })
const result = await parquetReadObjects({ metadata, file, rowStart, rowEnd, compressors })
postResultMessage({ result, queryId })
} catch (error) {
postErrorMessage({ error: error as Error, queryId })
Expand Down