diff --git a/src/lib/index.ts b/src/lib/index.ts index 65c75ffd..b53c1f9e 100644 --- a/src/lib/index.ts +++ b/src/lib/index.ts @@ -2,6 +2,5 @@ export { appendSearchParams, replaceSearchParams } from './routes.js' export * from './sources/index.js' export { parquetDataFrame } from './tableProvider.js' export { asyncBufferFrom, cn, contentTypes, formatFileSize, getFileDate, getFileDateShort, imageTypes, parseFileSize } from './utils.js' -// export parquetQueryWorker for backward-compatibility -export { parquetReadWorker as parquetQueryWorker, parquetReadObjectsWorker, parquetReadWorker } from './workers/parquetWorkerClient.js' +export { parquetQueryWorker, parquetReadObjectsWorker, parquetReadWorker } from './workers/parquetWorkerClient.js' export type { AsyncBufferFrom } from './workers/types.js' diff --git a/src/lib/workers/parquetWorker.ts b/src/lib/workers/parquetWorker.ts index 4ae0b917..06fb2667 100644 --- a/src/lib/workers/parquetWorker.ts +++ b/src/lib/workers/parquetWorker.ts @@ -1,40 +1,46 @@ import type { ColumnData } from 'hyparquet' -import { AsyncBuffer, parquetRead, parquetReadObjects } from 'hyparquet' +import { AsyncBuffer, parquetQuery, parquetRead, parquetReadObjects } from 'hyparquet' import { compressors } from 'hyparquet-compressors' -import type { ChunkMessage, ClientMessage, CompleteMessage, EmptyResultMessage, ErrorMessage, PageMessage, RowObjectsResultMessage } from './types.js' +import type { ChunkMessage, ClientMessage, CompleteMessage, PageMessage, ParquetQueryResolveMessage, ParquetReadObjectsResolveMessage, ParquetReadResolveMessage, RejectMessage } from './types.js' import { fromToAsyncBuffer } from './utils.js' const cache = new Map>() -function postCompleteMessage ({ queryId, rows }: CompleteMessage) { - self.postMessage({ queryId, rows }) +function postCompleteMessage ({ queryId, rows }: Omit) { + self.postMessage({ kind: 'onComplete', queryId, rows }) } -function postChunkMessage ({ chunk, queryId }: ChunkMessage) { - self.postMessage({ chunk, queryId }) +function postChunkMessage ({ chunk, queryId }: Omit) { + self.postMessage({ kind: 'onChunk', chunk, queryId }) } -function postPageMessage ({ page, queryId }: PageMessage) { - self.postMessage({ page, queryId }) +function postPageMessage ({ page, queryId }: Omit) { + self.postMessage({ kind: 'onPage', page, queryId }) } -function postErrorMessage ({ error, queryId }: ErrorMessage) { - self.postMessage({ error, queryId }) +function postErrorMessage ({ error, queryId }: Omit) { + self.postMessage({ kind: 'onReject', error, queryId }) } -function postRowObjectsResultMessage ({ queryId, rowObjects }: RowObjectsResultMessage) { - self.postMessage({ queryId, rowObjects }) +function postParquetReadResultMessage ({ queryId }: Omit) { + self.postMessage({ kind: 'onParquetReadResolve', queryId }) } -function postEmptyResultMessage ({ queryId }: EmptyResultMessage) { - self.postMessage({ queryId }) +function postParquetReadObjectsResultMessage ({ queryId, rows }: Omit) { + self.postMessage({ kind: 'onParquetReadObjectsResolve', queryId, rows }) +} +function postParquetQueryResultMessage ({ queryId, rows }: Omit) { + self.postMessage({ kind: 'onParquetQueryResolve', queryId, rows }) } self.onmessage = async ({ data }: { data: ClientMessage }) => { - const { kind, queryId, from, ...options } = data + const { queryId, from, kind, options } = data const file = await fromToAsyncBuffer(from, cache) try { if (kind === 'parquetReadObjects') { - const rowObjects = await parquetReadObjects({ ...options, file, compressors, onChunk, onPage }) - postRowObjectsResultMessage({ queryId, rowObjects }) + const rows = await parquetReadObjects({ ...options, file, compressors, onChunk, onPage }) + postParquetReadObjectsResultMessage({ queryId, rows }) + } else if (kind === 'parquetQuery') { + const rows = await parquetQuery({ ...options, file, compressors, onComplete, onChunk, onPage }) + postParquetQueryResultMessage({ queryId, rows }) } else { await parquetRead({ ...options, file, compressors, onComplete, onChunk, onPage }) - postEmptyResultMessage({ queryId }) + postParquetReadResultMessage({ queryId }) } } catch (error) { postErrorMessage({ error: error as Error, queryId }) diff --git a/src/lib/workers/parquetWorkerClient.ts b/src/lib/workers/parquetWorkerClient.ts index bcd1b8fb..de928e3a 100644 --- a/src/lib/workers/parquetWorkerClient.ts +++ b/src/lib/workers/parquetWorkerClient.ts @@ -1,5 +1,5 @@ import type { ColumnData } from 'hyparquet' -import type { ClientMessage, ParquetReadObjectsWorkerOptions, ParquetReadWorkerOptions, Rows, WorkerMessage } from './types.js' +import type { ClientMessage, ParquetQueryWorkerOptions, ParquetReadObjectsWorkerOptions, ParquetReadWorkerOptions, Rows, WorkerMessage } from './types.js' let worker: Worker | undefined let nextQueryId = 0 @@ -8,8 +8,9 @@ interface Agent { onChunk?: (chunk: ColumnData) => void onPage?: (page: ColumnData) => void reject: (error: Error) => void - resolveEmpty?: () => void - resolveRowObjects?: (rowObjects: Rows) => void + parquetReadResolve?: () => void + parquetReadObjectsResolve?: (rows: Rows) => void + parquetQueryResolve?: (rows: Rows) => void } const pendingAgents = new Map() @@ -26,24 +27,37 @@ function getWorker() { return } - const { onComplete, onChunk, onPage, reject, resolveEmpty, resolveRowObjects } = pendingAgent - if ('rows' in data) { - onComplete?.(data.rows) - } else if ('chunk' in data) { - onChunk?.(data.chunk) - } else if ('page' in data) { - onPage?.(data.page) - } else { - if ('error' in data) { - reject(data.error) - } else if ('rowObjects' in data) { - resolveRowObjects?.(data.rowObjects) - } else { - resolveEmpty?.() - } - /* clean up */ - pendingAgents.delete(data.queryId) - // TODO(SL): maybe terminate the worker when no pending agents left + const { onComplete, onChunk, onPage, reject, parquetReadResolve, parquetReadObjectsResolve, parquetQueryResolve } = pendingAgent + switch (data.kind) { + case 'onComplete': + onComplete?.(data.rows) + break + case 'onChunk': + onChunk?.(data.chunk) + break + case 'onPage': + onPage?.(data.page) + break + default: + switch (data.kind) { + case 'onReject': + if ('error' in data) { // check, just in case + reject(data.error) + } + break + case 'onParquetReadResolve': + parquetReadResolve?.() + break + case 'onParquetReadObjectsResolve': + parquetReadObjectsResolve?.(data.rows) + break + case 'onParquetQueryResolve': + parquetQueryResolve?.(data.rows) + break + } + /* clean up */ + pendingAgents.delete(data.queryId) + // TODO(SL): maybe terminate the worker when no pending agents left } } } @@ -58,12 +72,12 @@ function getWorker() { * the default parsers. */ export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise { - const { onComplete, onChunk, onPage, ...serializableOptions } = options + const { onComplete, onChunk, onPage, from, ...serializableOptions } = options return new Promise((resolve, reject) => { const queryId = nextQueryId++ - pendingAgents.set(queryId, { resolveEmpty: resolve, reject, onComplete, onChunk, onPage }) + pendingAgents.set(queryId, { parquetReadResolve: resolve, reject, onComplete, onChunk, onPage }) const worker = getWorker() - const message: ClientMessage = { queryId, ...serializableOptions, kind: 'parquetRead' } + const message: ClientMessage = { queryId, from, kind: 'parquetRead', options: serializableOptions } worker.postMessage(message) }) } @@ -76,12 +90,30 @@ export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise { - const { onChunk, onPage, ...serializableOptions } = options + const { onChunk, onPage, from, ...serializableOptions } = options return new Promise((resolve, reject) => { const queryId = nextQueryId++ - pendingAgents.set(queryId, { resolveRowObjects: resolve, reject, onChunk, onPage }) + pendingAgents.set(queryId, { parquetReadObjectsResolve: resolve, reject, onChunk, onPage }) const worker = getWorker() - const message: ClientMessage = { queryId, ...serializableOptions, kind: 'parquetReadObjects' } + const message: ClientMessage = { queryId, from, kind: 'parquetReadObjects', options: serializableOptions } + worker.postMessage(message) + }) +} + +/** + * Presents almost the same interface as parquetQuery, but runs in a worker. + * This is useful for reading large parquet files without blocking the main thread. + * Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs + * to be serialized to the worker. Also: the worker uses hyparquet-compressors and + * the default parsers. + */ +export function parquetQueryWorker(options: ParquetQueryWorkerOptions): Promise { + const { onComplete, onChunk, onPage, from, ...serializableOptions } = options + return new Promise((resolve, reject) => { + const queryId = nextQueryId++ + pendingAgents.set(queryId, { parquetQueryResolve: resolve, reject, onComplete, onChunk, onPage }) + const worker = getWorker() + const message: ClientMessage = { queryId, from, kind: 'parquetQuery', options: serializableOptions } worker.postMessage(message) }) } diff --git a/src/lib/workers/types.ts b/src/lib/workers/types.ts index d0ca0f52..d2ce699f 100644 --- a/src/lib/workers/types.ts +++ b/src/lib/workers/types.ts @@ -1,4 +1,8 @@ import type { ColumnData, ParquetReadOptions } from 'hyparquet' +import { parquetQuery } from 'hyparquet' + +// https://github.com/hyparam/hyparquet/pull/105 +type ParquetQueryFilter = Exclude[0]['filter'], undefined> // Serializable constructors for AsyncBuffers interface AsyncBufferFromFile { @@ -33,6 +37,17 @@ export interface ParquetReadWorkerOptions extends Omit +/** + * Options for the worker version of parquetQuery + * The same options as parquetQuery, but: + * - 'file' must be replaced with 'from': "AsyncBufferFrom" + * - 'compressors' are not configurable, the worker uses hyparquet-compressors + * - 'parsers' are not configurable, the worker uses the default parsers + */ +export interface ParquetQueryWorkerOptions extends ParquetReadWorkerOptions { + filter?: ParquetQueryFilter, + orderBy?: string +} /** * Messages sent by the client function to the worker @@ -40,31 +55,51 @@ export type ParquetReadObjectsWorkerOptions = Omit -export interface ClientMessage extends SerializableOptions, QueryId { - kind: 'parquetReadObjects' | 'parquetRead' +export interface From { + from: AsyncBufferFrom } +export interface ParquetReadClientMessage extends QueryId, From { + kind: 'parquetRead' + options: Omit +} +export interface ParquetReadObjectsClientMessage extends QueryId, From { + kind: 'parquetReadObjects' + options: Omit +} +export interface ParquetQueryClientMessage extends QueryId, From { + kind: 'parquetQuery' + options: Omit +} +export type ClientMessage = ParquetQueryClientMessage | ParquetReadObjectsClientMessage | ParquetReadClientMessage /** * Messages sent by the worker to the client */ -// export interface ResultMessage { -// queryId: number -// } export interface CompleteMessage extends QueryId { + kind: 'onComplete' rows: Rows } export interface ChunkMessage extends QueryId { + kind: 'onChunk' chunk: ColumnData } export interface PageMessage extends QueryId { + kind: 'onPage' page: ColumnData } -export interface ErrorMessage extends QueryId { +export interface RejectMessage extends QueryId { + kind: 'onReject' error: Error } -export interface RowObjectsResultMessage extends QueryId { - rowObjects: Rows +export interface ParquetReadResolveMessage extends QueryId { + kind: 'onParquetReadResolve' +} +export interface ParquetReadObjectsResolveMessage extends QueryId { + kind: 'onParquetReadObjectsResolve' + rows: Rows +} +export interface ParquetQueryResolveMessage extends QueryId { + kind: 'onParquetQueryResolve' + rows: Rows } -export type EmptyResultMessage = QueryId -export type WorkerMessage = CompleteMessage | ChunkMessage | PageMessage | ErrorMessage | RowObjectsResultMessage | EmptyResultMessage +export type WorkerMessage = CompleteMessage | ChunkMessage | PageMessage | RejectMessage | ParquetReadResolveMessage | ParquetReadObjectsResolveMessage | ParquetQueryResolveMessage