diff --git a/src/lib/index.ts b/src/lib/index.ts index f34152a7..65c75ffd 100644 --- a/src/lib/index.ts +++ b/src/lib/index.ts @@ -2,5 +2,6 @@ export { appendSearchParams, replaceSearchParams } from './routes.js' export * from './sources/index.js' export { parquetDataFrame } from './tableProvider.js' export { asyncBufferFrom, cn, contentTypes, formatFileSize, getFileDate, getFileDateShort, imageTypes, parseFileSize } from './utils.js' -export { parquetQueryWorker } from './workers/parquetWorkerClient.js' +// export parquetQueryWorker for backward-compatibility +export { parquetReadWorker as parquetQueryWorker, parquetReadObjectsWorker, parquetReadWorker } from './workers/parquetWorkerClient.js' export type { AsyncBufferFrom } from './workers/types.js' diff --git a/src/lib/tableProvider.ts b/src/lib/tableProvider.ts index 6e39f9cc..a1c30260 100644 --- a/src/lib/tableProvider.ts +++ b/src/lib/tableProvider.ts @@ -1,7 +1,7 @@ import { DataFrame, DataFrameEvents, ResolvedValue, UnsortableDataFrame, createEventTarget, sortableDataFrame } from 'hightable' import type { ColumnData } from 'hyparquet' import { FileMetaData, parquetSchema } from 'hyparquet' -import { parquetQueryWorker } from './workers/parquetWorkerClient.js' +import { parquetReadWorker } from './workers/parquetWorkerClient.js' import type { AsyncBufferFrom } from './workers/types.d.ts' type GroupStatus = { @@ -54,7 +54,7 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): // TODO(SL): pass AbortSignal to the worker? if (columnsToFetch.length > 0) { - const commonPromise = parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd, columns: columnsToFetch, onChunk }) + const commonPromise = parquetReadWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd, columns: columnsToFetch, onChunk }) columnsToFetch.forEach(column => { state.set(column, { kind: 'fetching', promise: commonPromise }) }) diff --git a/src/lib/utils.ts b/src/lib/utils.ts index a9552319..811df3c6 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -1,6 +1,7 @@ import { stringify } from 'hightable' -import { AsyncBuffer, asyncBufferFromUrl, cachedAsyncBuffer } from 'hyparquet' +import { AsyncBuffer } from 'hyparquet' import { AsyncBufferFrom } from './workers/types.js' +import { fromToAsyncBuffer } from './workers/utils.js' /** * Helper function to join class names @@ -13,17 +14,7 @@ export function cn(...names: (string | undefined | false)[]): string { * Convert AsyncBufferFromUrl to AsyncBuffer. */ export function asyncBufferFrom(from: AsyncBufferFrom): Promise { - if ('url' in from) { - // Cached asyncBuffer for urls only - const key = JSON.stringify(from) - const cached = cache.get(key) - if (cached) return cached - const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer) - cache.set(key, asyncBuffer) - return asyncBuffer - } else { - return from.file.arrayBuffer() - } + return fromToAsyncBuffer(from, cache) } const cache = new Map>() // TODO(SL): do we really want a singleton? diff --git a/src/lib/workers/parquetWorker.ts b/src/lib/workers/parquetWorker.ts index d31712ec..4ae0b917 100644 --- a/src/lib/workers/parquetWorker.ts +++ b/src/lib/workers/parquetWorker.ts @@ -1,45 +1,52 @@ import type { ColumnData } from 'hyparquet' -import { AsyncBuffer, asyncBufferFromUrl, cachedAsyncBuffer, parquetRead } from 'hyparquet' +import { AsyncBuffer, parquetRead, parquetReadObjects } from 'hyparquet' import { compressors } from 'hyparquet-compressors' -import type { AsyncBufferFrom, ChunkMessage, ClientMessage, ErrorMessage, ResultMessage } from './types.js' +import type { ChunkMessage, ClientMessage, CompleteMessage, EmptyResultMessage, ErrorMessage, PageMessage, RowObjectsResultMessage } from './types.js' +import { fromToAsyncBuffer } from './utils.js' const cache = new Map>() -export function asyncBufferFrom(from: AsyncBufferFrom): Promise { - if ('url' in from) { - // Cached asyncBuffer for urls only - const key = JSON.stringify(from) - const cached = cache.get(key) - if (cached) return cached - const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer) - cache.set(key, asyncBuffer) - return asyncBuffer - } else { - return from.file.arrayBuffer() - } +function postCompleteMessage ({ queryId, rows }: CompleteMessage) { + self.postMessage({ queryId, rows }) } - function postChunkMessage ({ chunk, queryId }: ChunkMessage) { self.postMessage({ chunk, queryId }) } -function postResultMessage ({ queryId }: ResultMessage) { - self.postMessage({ queryId }) +function postPageMessage ({ page, queryId }: PageMessage) { + self.postMessage({ page, queryId }) } function postErrorMessage ({ error, queryId }: ErrorMessage) { self.postMessage({ error, queryId }) } +function postRowObjectsResultMessage ({ queryId, rowObjects }: RowObjectsResultMessage) { + self.postMessage({ queryId, rowObjects }) +} +function postEmptyResultMessage ({ queryId }: EmptyResultMessage) { + self.postMessage({ queryId }) +} self.onmessage = async ({ data }: { data: ClientMessage }) => { - const { rowStart, rowEnd, columns, metadata, from, queryId } = data - const file = await asyncBufferFrom(from) + const { kind, queryId, from, ...options } = data + const file = await fromToAsyncBuffer(from, cache) try { - await parquetRead({ metadata, file, rowStart, rowEnd, columns, compressors, onChunk }) - postResultMessage({ queryId }) + if (kind === 'parquetReadObjects') { + const rowObjects = await parquetReadObjects({ ...options, file, compressors, onChunk, onPage }) + postRowObjectsResultMessage({ queryId, rowObjects }) + } else { + await parquetRead({ ...options, file, compressors, onComplete, onChunk, onPage }) + postEmptyResultMessage({ queryId }) + } } catch (error) { postErrorMessage({ error: error as Error, queryId }) } + function onComplete(rows: unknown[][]) { + postCompleteMessage({ queryId, rows }) + } function onChunk(chunk: ColumnData) { postChunkMessage({ chunk, queryId }) } + function onPage(page: ColumnData) { + postPageMessage({ page, queryId }) + } } diff --git a/src/lib/workers/parquetWorkerClient.ts b/src/lib/workers/parquetWorkerClient.ts index 835a8a2c..bcd1b8fb 100644 --- a/src/lib/workers/parquetWorkerClient.ts +++ b/src/lib/workers/parquetWorkerClient.ts @@ -1,35 +1,49 @@ import type { ColumnData } from 'hyparquet' -import type { ClientMessage, WorkerMessage, WorkerOptions } from './types.js' +import type { ClientMessage, ParquetReadObjectsWorkerOptions, ParquetReadWorkerOptions, Rows, WorkerMessage } from './types.js' let worker: Worker | undefined let nextQueryId = 0 -interface QueryAgent { - resolve: () => void +interface Agent { + onComplete?: (rows: Rows) => void + onChunk?: (chunk: ColumnData) => void + onPage?: (page: ColumnData) => void reject: (error: Error) => void - onChunk: (chunk: ColumnData) => void + resolveEmpty?: () => void + resolveRowObjects?: (rowObjects: Rows) => void } -const pending = new Map() +const pendingAgents = new Map() function getWorker() { if (!worker) { worker = new Worker(new URL('./parquetWorker.js', import.meta.url), { type: 'module' }) worker.onmessage = ({ data }: { data: WorkerMessage }) => { - const pendingQueryAgent = pending.get(data.queryId) - if (!pendingQueryAgent) { + const pendingAgent = pendingAgents.get(data.queryId) + if (!pendingAgent) { console.warn( `Unexpected: no pending promise found for queryId: ${data.queryId.toString()}` ) return } - const { onChunk, resolve, reject } = pendingQueryAgent - if ('error' in data) { - reject(data.error) + const { onComplete, onChunk, onPage, reject, resolveEmpty, resolveRowObjects } = pendingAgent + if ('rows' in data) { + onComplete?.(data.rows) } else if ('chunk' in data) { - onChunk(data.chunk) + onChunk?.(data.chunk) + } else if ('page' in data) { + onPage?.(data.page) } else { - resolve() + if ('error' in data) { + reject(data.error) + } else if ('rowObjects' in data) { + resolveRowObjects?.(data.rowObjects) + } else { + resolveEmpty?.() + } + /* clean up */ + pendingAgents.delete(data.queryId) + // TODO(SL): maybe terminate the worker when no pending agents left } } } @@ -40,14 +54,34 @@ function getWorker() { * Presents almost the same interface as parquetRead, but runs in a worker. * This is useful for reading large parquet files without blocking the main thread. * Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs - * to be serialized to the worker. + * to be serialized to the worker. Also: the worker uses hyparquet-compressors and + * the default parsers. */ -export function parquetQueryWorker({ metadata, from, rowStart, rowEnd, columns, onChunk }: WorkerOptions): Promise { +export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise { + const { onComplete, onChunk, onPage, ...serializableOptions } = options return new Promise((resolve, reject) => { const queryId = nextQueryId++ - pending.set(queryId, { resolve, reject, onChunk }) + pendingAgents.set(queryId, { resolveEmpty: resolve, reject, onComplete, onChunk, onPage }) const worker = getWorker() - const message: ClientMessage = { queryId, metadata, from, rowStart, rowEnd, columns } + const message: ClientMessage = { queryId, ...serializableOptions, kind: 'parquetRead' } + worker.postMessage(message) + }) +} + +/** + * Presents almost the same interface as parquetReadObjects, but runs in a worker. + * This is useful for reading large parquet files without blocking the main thread. + * Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs + * to be serialized to the worker. Also: the worker uses hyparquet-compressors and + * the default parsers. + */ +export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOptions): Promise { + const { onChunk, onPage, ...serializableOptions } = options + return new Promise((resolve, reject) => { + const queryId = nextQueryId++ + pendingAgents.set(queryId, { resolveRowObjects: resolve, reject, onChunk, onPage }) + const worker = getWorker() + const message: ClientMessage = { queryId, ...serializableOptions, kind: 'parquetReadObjects' } worker.postMessage(message) }) } diff --git a/src/lib/workers/types.ts b/src/lib/workers/types.ts index 98dcec2c..d0ca0f52 100644 --- a/src/lib/workers/types.ts +++ b/src/lib/workers/types.ts @@ -1,4 +1,4 @@ -import type { ColumnData, FileMetaData } from 'hyparquet' +import type { ColumnData, ParquetReadOptions } from 'hyparquet' // Serializable constructors for AsyncBuffers interface AsyncBufferFromFile { @@ -12,23 +12,59 @@ interface AsyncBufferFromUrl { } export type AsyncBufferFrom = AsyncBufferFromFile | AsyncBufferFromUrl -export interface ResultMessage { +export type Rows = unknown[][] | Record[] + +/** + * Options for the worker version of parquetRead + * The same options as parquetRead, but: + * - 'file' must be replaced with 'from': "AsyncBufferFrom" + * - 'compressors' are not configurable, the worker uses hyparquet-compressors + * - 'parsers' are not configurable, the worker uses the default parsers + */ +export interface ParquetReadWorkerOptions extends Omit { + onComplete?: (rows: Rows) => void // fix for https://github.com/hyparam/hyparquet/issues/28 + from: AsyncBufferFrom +} +/** + * Options for the worker version of parquetReadObjects + * The same options as parquetReadObjects, but: + * - 'file' must be replaced with 'from': "AsyncBufferFrom" + * - 'compressors' are not configurable, the worker uses hyparquet-compressors + * - 'parsers' are not configurable, the worker uses the default parsers + */ +export type ParquetReadObjectsWorkerOptions = Omit + +/** + * Messages sent by the client function to the worker + */ +export interface QueryId { queryId: number } -export interface ErrorMessage extends ResultMessage { - error: Error +export type SerializableOptions = Omit +export interface ClientMessage extends SerializableOptions, QueryId { + kind: 'parquetReadObjects' | 'parquetRead' +} + +/** + * Messages sent by the worker to the client + */ +// export interface ResultMessage { +// queryId: number +// } +export interface CompleteMessage extends QueryId { + rows: Rows } -export interface ChunkMessage extends ResultMessage { +export interface ChunkMessage extends QueryId { chunk: ColumnData } -export type WorkerMessage = ChunkMessage | ResultMessage | ErrorMessage - -export interface WorkerOptions { - metadata: FileMetaData, - from: AsyncBufferFrom - rowStart?: number, - rowEnd?: number, - columns?: string[], - onChunk: (chunk: ColumnData) => void +export interface PageMessage extends QueryId { + page: ColumnData +} +export interface ErrorMessage extends QueryId { + error: Error +} +export interface RowObjectsResultMessage extends QueryId { + rowObjects: Rows } -export type ClientMessage = Omit & ResultMessage +export type EmptyResultMessage = QueryId +export type WorkerMessage = CompleteMessage | ChunkMessage | PageMessage | ErrorMessage | RowObjectsResultMessage | EmptyResultMessage diff --git a/src/lib/workers/utils.ts b/src/lib/workers/utils.ts new file mode 100644 index 00000000..520773d6 --- /dev/null +++ b/src/lib/workers/utils.ts @@ -0,0 +1,20 @@ +import { AsyncBuffer, asyncBufferFromUrl, cachedAsyncBuffer } from 'hyparquet' +import { AsyncBufferFrom } from './types.js' + +export function fromToAsyncBuffer(from: AsyncBufferFrom, cache?: Map>): Promise { + if ('url' in from) { + // Cached asyncBuffer for urls only + const key = JSON.stringify(from) + if (cache) { + const cached = cache.get(key) + if (cached) return cached + } + const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer) + if (cache) { + cache.set(key, asyncBuffer) + } + return asyncBuffer + } else { + return from.file.arrayBuffer() + } +}