diff --git a/package.json b/package.json index 243db5ce..a80e73e0 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,7 @@ "watch:url": "NODE_ENV=development nodemon bin/cli.js https://hyperparam.blob.core.windows.net/hyperparam/starcoderdata-js-00000-of-00065.parquet" }, "dependencies": { - "hightable": "0.17.2", + "hightable": "0.18.1", "hyparquet": "1.17.1", "hyparquet-compressors": "1.1.1", "icebird": "0.3.0", @@ -67,8 +67,8 @@ "@storybook/react-vite": "9.0.18", "@testing-library/react": "16.3.0", "@types/node": "24.1.0", - "@types/react": "19.1.8", - "@types/react-dom": "19.1.6", + "@types/react": "19.1.9", + "@types/react-dom": "19.1.7", "@vitejs/plugin-react": "4.7.0", "@vitest/coverage-v8": "3.2.4", "eslint": "9.32.0", diff --git a/src/components/AvroView/AvroView.tsx b/src/components/AvroView/AvroView.tsx index 71ea3d7f..03b53892 100644 --- a/src/components/AvroView/AvroView.tsx +++ b/src/components/AvroView/AvroView.tsx @@ -8,7 +8,7 @@ import styles from '../Json/Json.module.css' interface ViewerProps { source: FileSource - setError: (error: Error | undefined) => void + setError: (error: unknown) => void } /** @@ -43,7 +43,7 @@ export default function AvroView({ source, setError }: ViewerProps) { setContent({ fileSize }) setJson(json) } catch (error) { - setError(error as Error) + setError(error) } finally { setIsLoading(false) } diff --git a/src/components/Cell/Cell.tsx b/src/components/Cell/Cell.tsx index 0f31285e..2b7f7438 100644 --- a/src/components/Cell/Cell.tsx +++ b/src/components/Cell/Cell.tsx @@ -15,6 +15,8 @@ interface CellProps { col: number } +const UNLOADED_CELL_PLACEHOLDER = '' + /** * Cell viewer displays a single cell from a table. */ @@ -39,21 +41,14 @@ export default function CellView({ source, row, col }: CellProps) { const metadata = await parquetMetadataAsync(asyncBuffer) setProgress(0.75) const df = parquetDataFrame(from, metadata) - const asyncRows = df.rows({ start: row, end: row + 1 }) - if (asyncRows.length > 1 || !(0 in asyncRows)) { - throw new Error(`Expected 1 row, got ${asyncRows.length}`) - } - const asyncRow = asyncRows[0] - // Await cell data + const columnName = df.header[col] if (columnName === undefined) { throw new Error(`Column name missing at index col=${col}`) } - const asyncCell = asyncRow.cells[columnName] - if (asyncCell === undefined) { - throw new Error(`Cell missing at column ${columnName}`) - } - const text = await asyncCell.then(stringify) + await df.fetch({ rowStart: row, rowEnd: row + 1, columns: [columnName] }) + const cell = df.getCell({ row, column: columnName }) + const text = cell === undefined ? UNLOADED_CELL_PLACEHOLDER : stringify(cell.value) setText(text) setError(undefined) } catch (error) { diff --git a/src/components/CellPanel/CellPanel.tsx b/src/components/CellPanel/CellPanel.tsx index a91006d4..6c568e50 100644 --- a/src/components/CellPanel/CellPanel.tsx +++ b/src/components/CellPanel/CellPanel.tsx @@ -1,22 +1,25 @@ -import { DataFrame, stringify } from 'hightable' -import { ReactNode, useEffect, useState } from 'react' +import type { DataFrame, ResolvedValue } from 'hightable' +import { stringify } from 'hightable' +import { ReactNode, useCallback, useEffect, useState } from 'react' import { useConfig } from '../../hooks/useConfig.js' import { cn } from '../../lib/utils.js' import ContentWrapper from '../ContentWrapper/ContentWrapper.js' import Json from '../Json/Json.js' +import jsonStyles from '../Json/Json.module.css' import SlideCloseButton from '../SlideCloseButton/SlideCloseButton.js' import styles from '../TextView/TextView.module.css' -import jsonStyles from '../Json/Json.module.css' interface ViewerProps { df: DataFrame row: number col: number setProgress: (progress: number) => void - setError: (error: Error) => void + setError: (error: unknown) => void onClose: () => void } +const UNLOADED_CELL_PLACEHOLDER = '' + /** * Cell viewer displays a single cell from a table. */ @@ -24,39 +27,52 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose const [content, setContent] = useState() const { customClass } = useConfig() + const fillContent = useCallback((cell: ResolvedValue | undefined) => { + let content: ReactNode + if (cell === undefined) { + content = + + {UNLOADED_CELL_PLACEHOLDER} + + } else { + const { value } = cell + if (value instanceof Object && !(value instanceof Date)) { + content = + + + + } else { + content = + + {stringify(value)} + + } + } + setContent(content) + setError(undefined) + }, [customClass?.textView, customClass?.jsonView, setError]) + // Load cell data useEffect(() => { async function loadCellData() { try { setProgress(0.5) - const asyncRows = df.rows({ start: row, end: row + 1 }) - if (asyncRows.length > 1 || !(0 in asyncRows)) { - throw new Error(`Expected 1 row, got ${asyncRows.length}`) - } - const asyncRow = asyncRows[0] - // Await cell data + const columnName = df.header[col] if (columnName === undefined) { throw new Error(`Column name missing at index col=${col}`) } - const asyncCell = asyncRow.cells[columnName] - if (asyncCell === undefined) { - throw new Error(`Cell missing at column ${columnName}`) + let cell = df.getCell({ row, column: columnName }) + if (cell === undefined) { + fillContent(undefined) + return } - const value: unknown = await asyncCell - if (value instanceof Object && !(value instanceof Date)) { - setContent( - - - - ) - } else { - setContent( - - {stringify(value)} - - ) + await df.fetch({ rowStart: row, rowEnd: row + 1, columns: [columnName] }) + cell = df.getCell({ row, column: columnName }) + if (cell === undefined) { + throw new Error(`Cell at row=${row}, column=${columnName} is undefined`) } + fillContent(cell) } catch (error) { setError(error as Error) } finally { @@ -65,7 +81,7 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose } void loadCellData() - }, [df, col, row, setProgress, setError, customClass]) + }, [df, col, row, setProgress, setError, fillContent]) const headers = <> diff --git a/src/components/File/File.tsx b/src/components/File/File.tsx index d8c10f03..6078e03e 100644 --- a/src/components/File/File.tsx +++ b/src/components/File/File.tsx @@ -1,5 +1,6 @@ -import { useState } from 'react' +import { useCallback, useState } from 'react' import type { FileSource } from '../../lib/sources/types.js' +import { toError } from '../../lib/utils.js' import Breadcrumb from '../Breadcrumb/Breadcrumb.js' import Layout from '../Layout/Layout.js' import Viewer from '../Viewer/Viewer.js' @@ -13,10 +14,14 @@ interface FileProps { */ export default function File({ source }: FileProps) { const [progress, setProgress] = useState() - const [error, setError] = useState() + const [error, setError] = useState() + + const setErrorWrapper = useCallback((error: unknown) => { + setError(toError(error)) + }, [setError]) return - + } diff --git a/src/components/ImageView/ImageView.tsx b/src/components/ImageView/ImageView.tsx index 0e81295a..2f49303e 100644 --- a/src/components/ImageView/ImageView.tsx +++ b/src/components/ImageView/ImageView.tsx @@ -7,7 +7,7 @@ import styles from './ImageView.module.css' interface ViewerProps { source: FileSource - setError: (error: Error | undefined) => void + setError: (error: unknown) => void } interface Content { @@ -45,7 +45,7 @@ export default function ImageView({ source, setError }: ViewerProps) { setError(undefined) } catch (error) { setContent(undefined) - setError(error as Error) + setError(error) } finally { setIsLoading(false) } diff --git a/src/components/JsonView/JsonView.tsx b/src/components/JsonView/JsonView.tsx index eed17d18..bff8752c 100644 --- a/src/components/JsonView/JsonView.tsx +++ b/src/components/JsonView/JsonView.tsx @@ -8,7 +8,7 @@ import styles from '../Json/Json.module.css' interface ViewerProps { source: FileSource - setError: (error: Error | undefined) => void + setError: (error: unknown) => void } const largeFileSize = 8_000_000 // 8 mb @@ -48,7 +48,7 @@ export default function JsonView({ source, setError }: ViewerProps) { setJson(JSON.parse(text)) } catch (error) { // TODO: show plain text in error case - setError(error as Error) + setError(error) } finally { setIsLoading(false) } diff --git a/src/components/MarkdownView/MarkdownView.tsx b/src/components/MarkdownView/MarkdownView.tsx index 83b388c9..b7dc5c14 100644 --- a/src/components/MarkdownView/MarkdownView.tsx +++ b/src/components/MarkdownView/MarkdownView.tsx @@ -8,7 +8,7 @@ import styles from './MarkdownView.module.css' interface ViewerProps { source: FileSource - setError: (error: Error | undefined) => void + setError: (error: unknown) => void } /** @@ -37,7 +37,7 @@ export default function MarkdownView({ source, setError }: ViewerProps) { setError(undefined) setContent({ text, fileSize }) } catch (error) { - setError(error as Error) + setError(error) setContent(undefined) } finally { setIsLoading(false) diff --git a/src/components/ParquetView/ParquetView.tsx b/src/components/ParquetView/ParquetView.tsx index f7b76579..2dc8b0cd 100644 --- a/src/components/ParquetView/ParquetView.tsx +++ b/src/components/ParquetView/ParquetView.tsx @@ -1,4 +1,4 @@ -import HighTable, { DataFrame, rowCache } from 'hightable' +import HighTable, { DataFrame } from 'hightable' import 'hightable/src/HighTable.css' import { asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet' import React, { useCallback, useEffect, useState } from 'react' @@ -15,7 +15,7 @@ import styles from './ParquetView.module.css' interface ViewerProps { source: FileSource setProgress: (progress: number | undefined) => void - setError: (error: Error | undefined) => void + setError: (error: unknown) => void } interface Content extends ContentSize { @@ -41,12 +41,11 @@ export default function ParquetView({ source, setProgress, setError }: ViewerPro const from = { url: resolveUrl, byteLength: asyncBuffer.byteLength, requestInit } setProgress(0.66) const metadata = await parquetMetadataAsync(asyncBuffer) - let dataframe = parquetDataFrame(from, metadata) - dataframe = rowCache(dataframe) + const dataframe = parquetDataFrame(from, metadata) const fileSize = asyncBuffer.byteLength setContent({ dataframe, fileSize }) } catch (error) { - setError(error as Error) + setError(error) } finally { setIsLoading(false) setProgress(1) @@ -83,9 +82,14 @@ export default function ParquetView({ source, setProgress, setError }: ViewerPro if (cell?.col === col && cell.row === row) { return undefined } + const columnName = content?.dataframe.header[col] + if (columnName === undefined || !content?.dataframe.getCell({ row, column: columnName })) { + // don't open the cell panel until it has loaded + return undefined + } return { row, col } }) - }, []) + }, [content]) const onDoubleClickCell = useCallback((_event: React.MouseEvent, col: number, row: number) => { toggleCell(col, row) }, [toggleCell]) diff --git a/src/components/TextView/TextView.tsx b/src/components/TextView/TextView.tsx index ace7dd98..c1e5abb1 100644 --- a/src/components/TextView/TextView.tsx +++ b/src/components/TextView/TextView.tsx @@ -7,7 +7,7 @@ import styles from './TextView.module.css' interface ViewerProps { source: FileSource - setError: (error: Error | undefined) => void + setError: (error: unknown) => void } /** @@ -36,7 +36,7 @@ export default function TextView({ source, setError }: ViewerProps) { setError(undefined) setContent({ text, fileSize }) } catch (error) { - setError(error as Error) + setError(error) setContent(undefined) } finally { setIsLoading(false) diff --git a/src/components/Viewer/Viewer.tsx b/src/components/Viewer/Viewer.tsx index f2d1c62e..47388fe2 100644 --- a/src/components/Viewer/Viewer.tsx +++ b/src/components/Viewer/Viewer.tsx @@ -9,7 +9,7 @@ import TextView from '../TextView/TextView.js' interface ViewerProps { source: FileSource - setError: (error: Error | undefined) => void + setError: (error: unknown) => void setProgress: (progress: number | undefined) => void } diff --git a/src/lib/index.ts b/src/lib/index.ts index 046303f9..f34152a7 100644 --- a/src/lib/index.ts +++ b/src/lib/index.ts @@ -3,4 +3,4 @@ export * from './sources/index.js' export { parquetDataFrame } from './tableProvider.js' export { asyncBufferFrom, cn, contentTypes, formatFileSize, getFileDate, getFileDateShort, imageTypes, parseFileSize } from './utils.js' export { parquetQueryWorker } from './workers/parquetWorkerClient.js' -export type { AsyncBufferFrom, Cells } from './workers/types.js' +export type { AsyncBufferFrom } from './workers/types.js' diff --git a/src/lib/tableProvider.ts b/src/lib/tableProvider.ts index 596c8a7a..6e39f9cc 100644 --- a/src/lib/tableProvider.ts +++ b/src/lib/tableProvider.ts @@ -1,37 +1,21 @@ -import { DataFrame, OrderBy, ResolvableRow, resolvableRow } from 'hightable' +import { DataFrame, DataFrameEvents, ResolvedValue, UnsortableDataFrame, createEventTarget, sortableDataFrame } from 'hightable' +import type { ColumnData } from 'hyparquet' import { FileMetaData, parquetSchema } from 'hyparquet' -import { parquetColumnRanksWorker, parquetQueryWorker } from './workers/parquetWorkerClient.js' +import { parquetQueryWorker } from './workers/parquetWorkerClient.js' import type { AsyncBufferFrom } from './workers/types.d.ts' -/* - * sortIndex[0] gives the index of the first row in the sorted table - */ -export function computeSortIndex(orderByRanks: { direction: 'ascending' | 'descending', ranks: number[] }[]): number[] { - if (!(0 in orderByRanks)) { - throw new Error('orderByRanks should have at least one element') - } - const numRows = orderByRanks[0].ranks.length - return Array - .from({ length: numRows }, (_, i) => i) - .sort((a, b) => { - for (const { direction, ranks } of orderByRanks) { - const rankA = ranks[a] - const rankB = ranks[b] - if (rankA === undefined || rankB === undefined) { - throw new Error('Invalid ranks') - } - const value = direction === 'ascending' ? 1 : -1 - if (rankA < rankB) return -value - if (rankA > rankB) return value - } - return 0 - }) +type GroupStatus = { + kind: 'unfetched' +} | { + kind: 'fetching' + promise: Promise +} | { + kind: 'fetched' } - interface VirtualRowGroup { groupStart: number groupEnd: number - fetching: boolean + state: Map } /** @@ -40,9 +24,9 @@ interface VirtualRowGroup { export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): DataFrame { const { children } = parquetSchema(metadata) const header = children.map(child => child.element.name) - const sortCache = new Map>() - const columnRanksCache = new Map>() - const data = new Array(Number(metadata.num_rows)) + const eventTarget = createEventTarget() + + const cellCache = new Map[]>(header.map(name => [name, []])) // virtual row groups are up to 1000 rows within row group boundaries const groups: VirtualRowGroup[] = [] @@ -52,155 +36,118 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): for (let j = 0; j < rg.num_rows; j += 1000) { const groupSize = Math.min(1000, Number(rg.num_rows) - j) const groupEnd = groupStart + groupSize - groups.push({ groupStart, groupEnd, fetching: false }) + groups.push({ + groupStart, + groupEnd, + state: new Map(header.map(name => [name, { kind: 'unfetched' }])), + }) groupStart = groupEnd } } - function fetchVirtualRowGroup(virtualGroupIndex: number) { - const group = groups[virtualGroupIndex] - if (group && !group.fetching) { - group.fetching = true - const { groupStart, groupEnd } = group - // Initialize with resolvable promises - for (let i = groupStart; i < groupEnd; i++) { - data[i] = resolvableRow(header) - data[i]?.index.resolve(i) - } - parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd }) - .then(groupData => { - for (let rowIndex = groupStart; rowIndex < groupEnd; rowIndex++) { - const dataRow = data[rowIndex] - if (dataRow === undefined) { - throw new Error(`Missing data row for index ${rowIndex}`) - } - const row = groupData[rowIndex - groupStart] - if (row === undefined) { - throw new Error(`Missing row in groupData for index ${rowIndex}`) - } - for (const [key, value] of Object.entries(row)) { - const cell = dataRow.cells[key] - if (cell === undefined) { - throw new Error(`Missing column in dataRow for column ${key}`) - } - cell.resolve(value) - } - } - }) - .catch((error: unknown) => { - const reason = `Error fetching rows ${groupStart}-${groupEnd}: ${error}` - // reject the index of the first row (it's enough to trigger the error bar) - data[groupStart]?.index.reject(reason) - }) - } - } + async function fetchVirtualRowGroup({ group, columns }: { + group: VirtualRowGroup, columns: string[] + }): Promise { + const { groupStart, groupEnd, state } = group + const columnsToFetch = columns.filter(column => state.get(column)?.kind === 'unfetched') + const promises = [...group.state.values()].filter((status): status is { kind: 'fetching', promise: Promise } => status.kind === 'fetching').map(status => status.promise) - function getColumnRanks(column: string): Promise { - let columnRanks = columnRanksCache.get(column) - if (!columnRanks) { - columnRanks = parquetColumnRanksWorker({ from, metadata, column }) - columnRanksCache.set(column, columnRanks) + // TODO(SL): pass AbortSignal to the worker? + if (columnsToFetch.length > 0) { + const commonPromise = parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd, columns: columnsToFetch, onChunk }) + columnsToFetch.forEach(column => { + state.set(column, { kind: 'fetching', promise: commonPromise }) + }) + promises.push(commonPromise) } - return columnRanks + await Promise.all(promises) + + columnsToFetch.forEach(column => { + state.set(column, { kind: 'fetched' }) + }) + } - function getSortIndex(orderBy: OrderBy): Promise { - const orderByKey = JSON.stringify(orderBy) - let sortIndex = sortCache.get(orderByKey) - if (!sortIndex) { - const orderByRanksPromise = Promise.all( - orderBy.map(({ column, direction }) => getColumnRanks(column).then(ranks => ({ direction, ranks }))) - ) - sortIndex = orderByRanksPromise.then(orderByRanks => computeSortIndex(orderByRanks)) - sortCache.set(orderByKey, sortIndex) + function onChunk(chunk: ColumnData): void { + const { columnName, columnData, rowStart } = chunk + const cachedColumn = cellCache.get(columnName) + if (!cachedColumn) { + throw new Error(`Column "${columnName}" not found in header`) + } + let row = rowStart + for (const value of columnData) { + cachedColumn[row] ??= { value } + row++ } - return sortIndex + eventTarget.dispatchEvent(new CustomEvent('resolve')) } - return { + const numRows = Number(metadata.num_rows) + + const unsortableDataFrame: UnsortableDataFrame = { header, - numRows: Number(metadata.num_rows), - rows({ start, end, orderBy }) { - if (orderBy?.length) { - const numRows = end - start - const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header)) - - getSortIndex(orderBy).then(indices => { - // Compute row groups to fetch - for (const index of indices.slice(start, end)) { - const groupIndex = groups.findIndex(({ groupEnd }) => index < groupEnd) - fetchVirtualRowGroup(groupIndex) - } - - // Re-assemble data in sorted order into wrapped - for (let i = start; i < end; i++) { - const index = indices[i] - if (index === undefined) { - throw new Error(`index ${i} not found in indices`) - } - const row = data[index] - if (row === undefined) { - throw new Error('Row not fetched') - } - const { cells } = row - const wrappedRow = wrapped[i - start] - if (wrappedRow === undefined) { - throw new Error(`Wrapped row missing at index ${i - start}`) - } - wrappedRow.index.resolve(index) - for (const key of header) { - const cell = cells[key] - if (cell) { - // TODO(SL): should we remove this check? It makes sense only if header change - // but if so, I guess we will have more issues - cell - .then((value: unknown) => { - const wrappedCell = wrappedRow.cells[key] - if (wrappedCell === undefined) { - throw new Error(`Wrapped cell not found for column ${key}`) - } - wrappedCell.resolve(value) - }) - .catch((error: unknown) => { - console.error('Error resolving sorted row', error) - }) - } - } - } - }).catch((error: unknown) => { - console.error('Error fetching sort index or resolving sorted rows', error) - // Reject at least one promise to trigger the error bar - wrapped[0]?.index.reject(`Error fetching sort index or resolving sorted rows: ${error}`) - }) - - return wrapped - } else { - groups.forEach(({ groupStart, groupEnd }, i) => { - if (groupStart < end && groupEnd > start) { - fetchVirtualRowGroup(i) - } - }) - const wrapped = data.slice(start, end) - if (wrapped.some(row => row === undefined)) { - throw new Error('Row not fetched') - } - return wrapped as ResolvableRow[] - } + numRows, + eventTarget, + getRowNumber({ row }) { + validateRow({ row, data: { numRows } }) + return { value: row } + }, + getCell({ row, column }) { + validateRow({ row, data: { numRows } }) + validateColumn({ column, data: { header } }) + return cellCache.get(column)?.[row] }, - sortable: true, - getColumn({ column, start, end }) { - if (!header.includes(column)) { - return Promise.reject(new Error(`Column "${column}" not found in header`)) + fetch: async ({ rowStart, rowEnd, columns, signal }) => { + validateFetchParams({ rowStart, rowEnd, columns, data: { numRows, header } }) + checkSignal(signal) + + if (!columns || columns.length === 0) { + return } - return parquetQueryWorker({ - from, - metadata, - rowStart: start, - rowEnd: end, - }).then(rows => { - return rows.map(row => row[column]) + const promises: Promise[] = [] + + groups.forEach((group) => { + const { groupStart, groupEnd } = group + if (groupStart < rowEnd && groupEnd > rowStart) { + promises.push( + fetchVirtualRowGroup({ + group, + columns, + }).then(() => { + checkSignal(signal) + }) + ) + } }) + + await Promise.all(promises) }, } + + return sortableDataFrame(unsortableDataFrame) +} + +function validateFetchParams({ rowStart, rowEnd, columns, data: { numRows, header } }: {rowStart: number, rowEnd: number, columns?: string[], data: Pick}): void { + if (rowStart < 0 || rowEnd > numRows || !Number.isInteger(rowStart) || !Number.isInteger(rowEnd) || rowStart > rowEnd) { + throw new Error(`Invalid row range: ${rowStart} - ${rowEnd}, numRows: ${numRows}`) + } + if (columns?.some(column => !header.includes(column))) { + throw new Error(`Invalid columns: ${columns.join(', ')}. Available columns: ${header.join(', ')}`) + } +} +function validateRow({ row, data: { numRows } }: {row: number, data: Pick}): void { + if (row < 0 || row >= numRows || !Number.isInteger(row)) { + throw new Error(`Invalid row index: ${row}, numRows: ${numRows}`) + } +} +function validateColumn({ column, data: { header } }: {column: string, data: Pick}): void { + if (!header.includes(column)) { + throw new Error(`Invalid column: ${column}. Available columns: ${header.join(', ')}`) + } +} +function checkSignal(signal?: AbortSignal): void { + if (signal?.aborted) { + throw new DOMException('The operation was aborted.', 'AbortError') + } } diff --git a/src/lib/utils.ts b/src/lib/utils.ts index 6940bd16..a9552319 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -1,3 +1,4 @@ +import { stringify } from 'hightable' import { AsyncBuffer, asyncBufferFromUrl, cachedAsyncBuffer } from 'hyparquet' import { AsyncBufferFrom } from './workers/types.js' @@ -97,3 +98,11 @@ export const contentTypes: Record = { } export const imageTypes = ['.png', '.jpg', '.jpeg', '.gif', '.svg', '.tiff', '.webp'] + +export function toError(error: unknown): Error | undefined{ + if (error === undefined || error instanceof Error) { + return error + } else { + return new Error(stringify(error)) + } +} diff --git a/src/lib/workers/parquetWorker.ts b/src/lib/workers/parquetWorker.ts index 50036aaf..3a84164b 100644 --- a/src/lib/workers/parquetWorker.ts +++ b/src/lib/workers/parquetWorker.ts @@ -1,69 +1,30 @@ -import { ColumnData, parquetQuery } from 'hyparquet' +import type { ColumnData } from 'hyparquet' +import { parquetRead } from 'hyparquet' import { compressors } from 'hyparquet-compressors' -import { parquetReadColumn } from 'hyparquet/src/read.js' import { asyncBufferFrom } from '../utils.js' -import type { ChunkMessage, ClientMessage, ColumnRanksMessage, ErrorMessage, ResultMessage } from './types.js' +import type { ChunkMessage, ClientMessage, ErrorMessage, ResultMessage } from './types.js' function postChunkMessage ({ chunk, queryId }: ChunkMessage) { self.postMessage({ chunk, queryId }) } -function postResultMessage ({ result, queryId }: ResultMessage) { - self.postMessage({ result, queryId }) +function postResultMessage ({ queryId }: ResultMessage) { + self.postMessage({ queryId }) } function postErrorMessage ({ error, queryId }: ErrorMessage) { self.postMessage({ error, queryId }) } -function postColumnRanksMessage ({ columnRanks, queryId }: ColumnRanksMessage) { - self.postMessage({ columnRanks, queryId }) -} self.onmessage = async ({ data }: { data: ClientMessage }) => { - const { metadata, from, kind, queryId } = data + const { rowStart, rowEnd, columns, metadata, from, queryId } = data const file = await asyncBufferFrom(from) - if (kind === 'columnRanks') { - const { column } = data - // return the column ranks in ascending order - // we can get the descending order replacing the rank with numRows - rank - 1. It's not exactly the rank of - // the descending order, because the rank is the first, not the last, of the ties. But it's enough for the - // purpose of sorting. - - try { - const sortColumn: unknown[] = Array.from(await parquetReadColumn({ file, metadata, columns: [column], compressors })) - const valuesWithIndex = sortColumn.map((value, index) => ({ value, index })) - const sortedValuesWithIndex = valuesWithIndex.sort(({ value: a }, { value: b }) => compare(a, b)) - const columnRanks = sortedValuesWithIndex.reduce((accumulator, currentValue, rank) => { - const { lastValue, lastRank, ranks } = accumulator - const { value, index } = currentValue - if (value === lastValue) { - ranks[index] = lastRank - return { ranks, lastValue, lastRank } - } else { - ranks[index] = rank - return { ranks, lastValue: value, lastRank: rank } - } - }, { - ranks: Array(sortColumn.length).fill(-1) as number[], - lastValue: undefined as unknown, - lastRank: 0, - }).ranks - postColumnRanksMessage({ columnRanks, queryId }) - } catch (error) { - postErrorMessage({ error: error as Error, queryId }) - } - } else { - const { rowStart, rowEnd, columns, orderBy, filter, chunks } = data - const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined - try { - const result = await parquetQuery({ metadata, file, rowStart, rowEnd, columns, orderBy, filter, compressors, onChunk }) - postResultMessage({ result, queryId }) - } catch (error) { - postErrorMessage({ error: error as Error, queryId }) - } + try { + await parquetRead({ metadata, file, rowStart, rowEnd, columns, compressors, onPage: onChunk }) + postResultMessage({ queryId }) + } catch (error) { + postErrorMessage({ error: error as Error, queryId }) } -} -function compare(a: T, b: T): number { - if (a < b) return -1 - if (a > b) return 1 - return 1 // TODO: how to handle nulls? + function onChunk(chunk: ColumnData) { + postChunkMessage({ chunk, queryId }) + } } diff --git a/src/lib/workers/parquetWorkerClient.ts b/src/lib/workers/parquetWorkerClient.ts index be661c60..c2968512 100644 --- a/src/lib/workers/parquetWorkerClient.ts +++ b/src/lib/workers/parquetWorkerClient.ts @@ -2,29 +2,22 @@ import ParquetWorker from './parquetWorker?worker&inline' /// ^ the worker is bundled with the main thread code (inline) which is easier for users to import /// (no need to copy the worker file to the right place) import type { ColumnData } from 'hyparquet' -import type { Cells, ColumnRanksClientMessage, ColumnRanksWorkerMessage, ColumnRanksWorkerOptions, QueryClientMessage, QueryWorkerMessage, QueryWorkerOptions } from './types.js' +import type { ClientMessage, WorkerMessage, WorkerOptions } from './types.js' let worker: Worker | undefined let nextQueryId = 0 -interface RowsQueryAgent { - kind: 'query' - resolve: (value: Cells[]) => void +interface QueryAgent { + resolve: () => void reject: (error: Error) => void - onChunk?: (chunk: ColumnData) => void + onChunk: (chunk: ColumnData) => void } -interface ColumnRanksQueryAgent { - kind: 'columnRanks' - resolve: (value: number[]) => void - reject: (error: Error) => void -} -type QueryAgent = RowsQueryAgent | ColumnRanksQueryAgent const pending = new Map() function getWorker() { if (!worker) { worker = new ParquetWorker() - worker.onmessage = ({ data }: { data: QueryWorkerMessage | ColumnRanksWorkerMessage }) => { + worker.onmessage = ({ data }: { data: WorkerMessage }) => { const pendingQueryAgent = pending.get(data.queryId) if (!pendingQueryAgent) { console.warn( @@ -33,27 +26,13 @@ function getWorker() { return } - if (pendingQueryAgent.kind === 'query') { - const { onChunk, resolve, reject } = pendingQueryAgent - if ('error' in data) { - reject(data.error) - } else if ('result' in data) { - resolve(data.result) - } else if ('chunk' in data) { - onChunk?.(data.chunk) - } else { - reject(new Error('Unexpected message from worker')) - } - return - } - - const { resolve, reject } = pendingQueryAgent + const { onChunk, resolve, reject } = pendingQueryAgent if ('error' in data) { reject(data.error) - } else if ('columnRanks' in data) { - resolve(data.columnRanks) + } else if ('chunk' in data) { + onChunk(data.chunk) } else { - reject(new Error('Unexpected message from worker')) + resolve() } } } @@ -66,26 +45,13 @@ function getWorker() { * Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs * to be serialized to the worker. */ -export function parquetQueryWorker({ metadata, from, rowStart, rowEnd, columns, orderBy, filter, onChunk }: QueryWorkerOptions): Promise { +export function parquetQueryWorker({ metadata, from, rowStart, rowEnd, columns, onChunk }: WorkerOptions): Promise { // TODO(SL) Support passing columns? return new Promise((resolve, reject) => { const queryId = nextQueryId++ - pending.set(queryId, { kind: 'query', resolve, reject, onChunk }) - const worker = getWorker() - - // If caller provided an onChunk callback, worker will send chunks as they are parsed - const chunks = onChunk !== undefined - const message: QueryClientMessage = { queryId, metadata, from, rowStart, rowEnd, columns, orderBy, filter, chunks, kind: 'query' } - worker.postMessage(message) - }) -} - -export function parquetColumnRanksWorker({ metadata, from, column }: ColumnRanksWorkerOptions): Promise { - return new Promise((resolve, reject) => { - const queryId = nextQueryId++ - pending.set(queryId, { kind: 'columnRanks', resolve, reject }) + pending.set(queryId, { resolve, reject, onChunk }) const worker = getWorker() - const message: ColumnRanksClientMessage = { queryId, metadata, from, column, kind: 'columnRanks' } + const message: ClientMessage = { queryId, metadata, from, rowStart, rowEnd, columns } worker.postMessage(message) }) } diff --git a/src/lib/workers/types.ts b/src/lib/workers/types.ts index f7a5a2d4..be883077 100644 --- a/src/lib/workers/types.ts +++ b/src/lib/workers/types.ts @@ -1,5 +1,4 @@ import type { ColumnData, FileMetaData } from 'hyparquet' -import type { ParquetQueryFilter } from 'hyparquet/src/types.js' // Serializable constructors for AsyncBuffers interface AsyncBufferFromFile { @@ -12,51 +11,25 @@ interface AsyncBufferFromUrl { requestInit?: RequestInit } export type AsyncBufferFrom = AsyncBufferFromFile | AsyncBufferFromUrl -// Cells is defined in hightable, but uses any, not unknown -export type Cells = Record ; -export interface CommonWorkerOptions { - metadata: FileMetaData, - from: AsyncBufferFrom -} -interface Message { +export interface ResultMessage { queryId: number } -export interface ErrorMessage extends Message { +export interface ErrorMessage extends ResultMessage { error: Error } +export interface ChunkMessage extends ResultMessage { + chunk: ColumnData +} +export type WorkerMessage = ChunkMessage | ResultMessage | ErrorMessage -/* Query worker */ -export interface QueryWorkerOptions extends CommonWorkerOptions { +export interface WorkerOptions { + metadata: FileMetaData, + from: AsyncBufferFrom rowStart?: number, rowEnd?: number, columns?: string[], - orderBy?: string, - filter?: ParquetQueryFilter, - onChunk?: (chunk: ColumnData) => void -} -export interface QueryClientMessage extends QueryWorkerOptions, Message { - kind: 'query', - chunks?: boolean + onChunk: (chunk: ColumnData) => void + // TODO(SL): support onPage too? } -export interface ChunkMessage extends Message { - chunk: ColumnData -} -export interface ResultMessage extends Message { - result: Cells[] -} -export type QueryWorkerMessage = ChunkMessage | ResultMessage | ErrorMessage - -/* ColumnRanks worker */ -export interface ColumnRanksWorkerOptions extends CommonWorkerOptions { - column: string -} -export interface ColumnRanksClientMessage extends ColumnRanksWorkerOptions, Message { - kind: 'columnRanks' -} -export interface ColumnRanksMessage extends Message { - columnRanks: number[] -} -export type ColumnRanksWorkerMessage = ColumnRanksMessage | ErrorMessage - -export type ClientMessage = QueryClientMessage | ColumnRanksClientMessage +export type ClientMessage = Omit & ResultMessage