diff --git a/src/lib/tableProvider.ts b/src/lib/tableProvider.ts index ebcedcd5..db1a3ded 100644 --- a/src/lib/tableProvider.ts +++ b/src/lib/tableProvider.ts @@ -107,80 +107,88 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): return sortIndex } + function sortedRows({ start, end, orderBy }: { start: number, end: number, orderBy: OrderBy}) { + const numRows = end - start + const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header)) + + getSortIndex(orderBy).then(indices => { + // Compute row groups to fetch + for (const index of indices.slice(start, end)) { + const groupIndex = groupEnds.findIndex(end => index < end) + fetchRowGroup(groupIndex) + } + + // Re-assemble data in sorted order into wrapped + for (let i = start; i < end; i++) { + const index = indices[i] + if (index === undefined) { + throw new Error(`index ${i} not found in indices`) + } + const row = data[index] + if (row === undefined) { + throw new Error('Row not fetched') + } + const { cells } = row + const wrappedRow = wrapped[i - start] + if (wrappedRow === undefined) { + throw new Error(`Wrapped row missing at index ${i - start}`) + } + wrappedRow.index.resolve(index) + for (const key of header) { + const cell = cells[key] + if (cell) { + // TODO(SL): should we remove this check? It makes sense only if header change + // but if so, I guess we will have more issues + cell + .then((value: unknown) => { + const wrappedCell = wrappedRow.cells[key] + if (wrappedCell === undefined) { + throw new Error(`Wrapped cell not found for column ${key}`) + } + wrappedCell.resolve(value) + }) + .catch((error: unknown) => { + console.error('Error resolving sorted row', error) + }) + } + } + } + }).catch((error: unknown) => { + console.error( + 'Error fetching sort index or resolving sorted rows', + error + ) + }) + + return wrapped + } + + function unsortedRows({ start, end }: { start: number, end: number }) { + for (let i = 0; i < groups.length; i++) { + const groupStart = groupEnds[i - 1] ?? 0 + const groupEnd = groupEnds[i] + if (groupEnd === undefined) { + throw new Error(`Missing group end at index ${i}`) + } + if (start < groupEnd && end > groupStart) { + fetchRowGroup(i) + } + } + const wrapped = data.slice(start, end) + if (wrapped.some(row => row === undefined)) { + throw new Error('Row not fetched') + } + return wrapped as ResolvableRow[] + } + return { header, numRows: Number(metadata.num_rows), rows({ start, end, orderBy }: { start: number, end: number, orderBy?: OrderBy}) { if (orderBy?.length) { - const numRows = end - start - const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header)) - - getSortIndex(orderBy).then(indices => { - // Compute row groups to fetch - for (const index of indices.slice(start, end)) { - const groupIndex = groupEnds.findIndex(end => index < end) - fetchRowGroup(groupIndex) - } - - // Re-assemble data in sorted order into wrapped - for (let i = start; i < end; i++) { - const index = indices[i] - if (index === undefined) { - throw new Error(`index ${i} not found in indices`) - } - const row = data[index] - if (row === undefined) { - throw new Error('Row not fetched') - } - const { cells } = row - const wrappedRow = wrapped[i - start] - if (wrappedRow === undefined) { - throw new Error(`Wrapped row missing at index ${i - start}`) - } - wrappedRow.index.resolve(index) - for (const key of header) { - const cell = cells[key] - if (cell) { - // TODO(SL): should we remove this check? It makes sense only if header change - // but if so, I guess we will have more issues - cell - .then((value: unknown) => { - const wrappedCell = wrappedRow.cells[key] - if (wrappedCell === undefined) { - throw new Error(`Wrapped cell not found for column ${key}`) - } - wrappedCell.resolve(value) - }) - .catch((error: unknown) => { - console.error('Error resolving sorted row', error) - }) - } - } - } - }).catch((error: unknown) => { - console.error( - 'Error fetching sort index or resolving sorted rows', - error - ) - }) - - return wrapped + return sortedRows({ start, end, orderBy }) } else { - for (let i = 0; i < groups.length; i++) { - const groupStart = groupEnds[i - 1] ?? 0 - const groupEnd = groupEnds[i] - if (groupEnd === undefined) { - throw new Error(`Missing group end at index ${i}`) - } - if (start < groupEnd && end > groupStart) { - fetchRowGroup(i) - } - } - const wrapped = data.slice(start, end) - if (wrapped.some(row => row === undefined)) { - throw new Error('Row not fetched') - } - return wrapped as ResolvableRow[] + return unsortedRows({ start, end }) } }, sortable: true, diff --git a/src/lib/utils.ts b/src/lib/utils.ts index 6940bd16..aa85e0d5 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -11,20 +11,22 @@ export function cn(...names: (string | undefined | false)[]): string { /** * Convert AsyncBufferFromUrl to AsyncBuffer. */ -export function asyncBufferFrom(from: AsyncBufferFrom): Promise { +export async function asyncBufferFrom(from: AsyncBufferFrom): Promise { if ('url' in from) { // Cached asyncBuffer for urls only const key = JSON.stringify(from) const cached = cache.get(key) if (cached) return cached - const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer) - cache.set(key, asyncBuffer) - return asyncBuffer + const buffer = await asyncBufferFromUrl(from) + const newCached = cachedAsyncBuffer(buffer) + // only cache if no error, otherwise let the error bubble up + cache.set(key, newCached) + return newCached } else { return from.file.arrayBuffer() } } -const cache = new Map>() +const cache = new Map() // TODO(SL): do we really want a singleton? export function getFileDateShort(file?: { lastModified?: string }): string { diff --git a/src/lib/workers/parquetWorker.ts b/src/lib/workers/parquetWorker.ts index cfc06dc6..45d641c5 100644 --- a/src/lib/workers/parquetWorker.ts +++ b/src/lib/workers/parquetWorker.ts @@ -1,4 +1,4 @@ -import { ColumnData, parquetQuery } from 'hyparquet' +import { ColumnData, parquetRead, parquetReadObjects } from 'hyparquet' import { compressors } from 'hyparquet-compressors' import { getParquetColumn } from '../getParquetColumn.js' import { asyncBufferFrom } from '../utils.js' @@ -51,11 +51,18 @@ self.onmessage = async ({ data }: { data: ClientMessage }) => { } catch (error) { postErrorMessage({ error: error as Error, queryId }) } + } else if (data.chunks) { + function onChunk(chunk: ColumnData) { postChunkMessage({ chunk, queryId }) } + const { rowStart, rowEnd } = data + try { + await parquetRead({ metadata, file, rowStart, rowEnd, compressors, onChunk }) + } catch (error) { + postErrorMessage({ error: error as Error, queryId }) + } } else { - const { rowStart, rowEnd, chunks } = data - const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined + const { rowStart, rowEnd } = data try { - const result = await parquetQuery({ metadata, file, rowStart, rowEnd, compressors, onChunk }) + const result = await parquetReadObjects({ metadata, file, rowStart, rowEnd, compressors }) postResultMessage({ result, queryId }) } catch (error) { postErrorMessage({ error: error as Error, queryId })