Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@ export { appendSearchParams, replaceSearchParams } from './routes.js'
export * from './sources/index.js'
export { parquetDataFrame } from './tableProvider.js'
export { asyncBufferFrom, cn, contentTypes, formatFileSize, getFileDate, getFileDateShort, imageTypes, parseFileSize } from './utils.js'
// export parquetQueryWorker for backward-compatibility
export { parquetReadWorker as parquetQueryWorker, parquetReadObjectsWorker, parquetReadWorker } from './workers/parquetWorkerClient.js'
export { parquetQueryWorker, parquetReadObjectsWorker, parquetReadWorker } from './workers/parquetWorkerClient.js'
export type { AsyncBufferFrom } from './workers/types.js'
42 changes: 24 additions & 18 deletions src/lib/workers/parquetWorker.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,46 @@
import type { ColumnData } from 'hyparquet'
import { AsyncBuffer, parquetRead, parquetReadObjects } from 'hyparquet'
import { AsyncBuffer, parquetQuery, parquetRead, parquetReadObjects } from 'hyparquet'
import { compressors } from 'hyparquet-compressors'
import type { ChunkMessage, ClientMessage, CompleteMessage, EmptyResultMessage, ErrorMessage, PageMessage, RowObjectsResultMessage } from './types.js'
import type { ChunkMessage, ClientMessage, CompleteMessage, PageMessage, ParquetQueryResolveMessage, ParquetReadObjectsResolveMessage, ParquetReadResolveMessage, RejectMessage } from './types.js'
import { fromToAsyncBuffer } from './utils.js'

const cache = new Map<string, Promise<AsyncBuffer>>()

function postCompleteMessage ({ queryId, rows }: CompleteMessage) {
self.postMessage({ queryId, rows })
function postCompleteMessage ({ queryId, rows }: Omit<CompleteMessage, 'kind'>) {
self.postMessage({ kind: 'onComplete', queryId, rows })
}
function postChunkMessage ({ chunk, queryId }: ChunkMessage) {
self.postMessage({ chunk, queryId })
function postChunkMessage ({ chunk, queryId }: Omit<ChunkMessage, 'kind'>) {
self.postMessage({ kind: 'onChunk', chunk, queryId })
}
function postPageMessage ({ page, queryId }: PageMessage) {
self.postMessage({ page, queryId })
function postPageMessage ({ page, queryId }: Omit<PageMessage, 'kind'>) {
self.postMessage({ kind: 'onPage', page, queryId })
}
function postErrorMessage ({ error, queryId }: ErrorMessage) {
self.postMessage({ error, queryId })
function postErrorMessage ({ error, queryId }: Omit<RejectMessage, 'kind'>) {
self.postMessage({ kind: 'onReject', error, queryId })
}
function postRowObjectsResultMessage ({ queryId, rowObjects }: RowObjectsResultMessage) {
self.postMessage({ queryId, rowObjects })
function postParquetReadResultMessage ({ queryId }: Omit<ParquetReadResolveMessage, 'kind'>) {
self.postMessage({ kind: 'onParquetReadResolve', queryId })
}
function postEmptyResultMessage ({ queryId }: EmptyResultMessage) {
self.postMessage({ queryId })
function postParquetReadObjectsResultMessage ({ queryId, rows }: Omit<ParquetReadObjectsResolveMessage, 'kind'>) {
self.postMessage({ kind: 'onParquetReadObjectsResolve', queryId, rows })
}
function postParquetQueryResultMessage ({ queryId, rows }: Omit<ParquetQueryResolveMessage, 'kind'>) {
self.postMessage({ kind: 'onParquetQueryResolve', queryId, rows })
}

self.onmessage = async ({ data }: { data: ClientMessage }) => {
const { kind, queryId, from, ...options } = data
const { queryId, from, kind, options } = data
const file = await fromToAsyncBuffer(from, cache)
try {
if (kind === 'parquetReadObjects') {
const rowObjects = await parquetReadObjects({ ...options, file, compressors, onChunk, onPage })
postRowObjectsResultMessage({ queryId, rowObjects })
const rows = await parquetReadObjects({ ...options, file, compressors, onChunk, onPage })
postParquetReadObjectsResultMessage({ queryId, rows })
} else if (kind === 'parquetQuery') {
const rows = await parquetQuery({ ...options, file, compressors, onComplete, onChunk, onPage })
postParquetQueryResultMessage({ queryId, rows })
} else {
await parquetRead({ ...options, file, compressors, onComplete, onChunk, onPage })
postEmptyResultMessage({ queryId })
postParquetReadResultMessage({ queryId })
}
} catch (error) {
postErrorMessage({ error: error as Error, queryId })
Expand Down
86 changes: 59 additions & 27 deletions src/lib/workers/parquetWorkerClient.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { ColumnData } from 'hyparquet'
import type { ClientMessage, ParquetReadObjectsWorkerOptions, ParquetReadWorkerOptions, Rows, WorkerMessage } from './types.js'
import type { ClientMessage, ParquetQueryWorkerOptions, ParquetReadObjectsWorkerOptions, ParquetReadWorkerOptions, Rows, WorkerMessage } from './types.js'

let worker: Worker | undefined
let nextQueryId = 0
Expand All @@ -8,8 +8,9 @@ interface Agent {
onChunk?: (chunk: ColumnData) => void
onPage?: (page: ColumnData) => void
reject: (error: Error) => void
resolveEmpty?: () => void
resolveRowObjects?: (rowObjects: Rows) => void
parquetReadResolve?: () => void
parquetReadObjectsResolve?: (rows: Rows) => void
parquetQueryResolve?: (rows: Rows) => void
}

const pendingAgents = new Map<number, Agent>()
Expand All @@ -26,24 +27,37 @@ function getWorker() {
return
}

const { onComplete, onChunk, onPage, reject, resolveEmpty, resolveRowObjects } = pendingAgent
if ('rows' in data) {
onComplete?.(data.rows)
} else if ('chunk' in data) {
onChunk?.(data.chunk)
} else if ('page' in data) {
onPage?.(data.page)
} else {
if ('error' in data) {
reject(data.error)
} else if ('rowObjects' in data) {
resolveRowObjects?.(data.rowObjects)
} else {
resolveEmpty?.()
}
/* clean up */
pendingAgents.delete(data.queryId)
// TODO(SL): maybe terminate the worker when no pending agents left
const { onComplete, onChunk, onPage, reject, parquetReadResolve, parquetReadObjectsResolve, parquetQueryResolve } = pendingAgent
switch (data.kind) {
case 'onComplete':
onComplete?.(data.rows)
break
case 'onChunk':
onChunk?.(data.chunk)
break
case 'onPage':
onPage?.(data.page)
break
default:
switch (data.kind) {
case 'onReject':
if ('error' in data) { // check, just in case
reject(data.error)
}
break
case 'onParquetReadResolve':
parquetReadResolve?.()
break
case 'onParquetReadObjectsResolve':
parquetReadObjectsResolve?.(data.rows)
break
case 'onParquetQueryResolve':
parquetQueryResolve?.(data.rows)
break
}
/* clean up */
pendingAgents.delete(data.queryId)
// TODO(SL): maybe terminate the worker when no pending agents left
}
}
}
Expand All @@ -58,12 +72,12 @@ function getWorker() {
* the default parsers.
*/
export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<void> {
const { onComplete, onChunk, onPage, ...serializableOptions } = options
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { resolveEmpty: resolve, reject, onComplete, onChunk, onPage })
pendingAgents.set(queryId, { parquetReadResolve: resolve, reject, onComplete, onChunk, onPage })
const worker = getWorker()
const message: ClientMessage = { queryId, ...serializableOptions, kind: 'parquetRead' }
const message: ClientMessage = { queryId, from, kind: 'parquetRead', options: serializableOptions }
worker.postMessage(message)
})
}
Expand All @@ -76,12 +90,30 @@ export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<vo
* the default parsers.
*/
export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOptions): Promise<Rows> {
const { onChunk, onPage, ...serializableOptions } = options
const { onChunk, onPage, from, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { resolveRowObjects: resolve, reject, onChunk, onPage })
pendingAgents.set(queryId, { parquetReadObjectsResolve: resolve, reject, onChunk, onPage })
const worker = getWorker()
const message: ClientMessage = { queryId, ...serializableOptions, kind: 'parquetReadObjects' }
const message: ClientMessage = { queryId, from, kind: 'parquetReadObjects', options: serializableOptions }
worker.postMessage(message)
})
}

/**
* Presents almost the same interface as parquetQuery, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
* the default parsers.
*/
export function parquetQueryWorker(options: ParquetQueryWorkerOptions): Promise<Rows> {
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { parquetQueryResolve: resolve, reject, onComplete, onChunk, onPage })
const worker = getWorker()
const message: ClientMessage = { queryId, from, kind: 'parquetQuery', options: serializableOptions }
worker.postMessage(message)
})
}
57 changes: 46 additions & 11 deletions src/lib/workers/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import type { ColumnData, ParquetReadOptions } from 'hyparquet'
import { parquetQuery } from 'hyparquet'

// https://github.com/hyparam/hyparquet/pull/105
type ParquetQueryFilter = Exclude<Parameters<typeof parquetQuery>[0]['filter'], undefined>

// Serializable constructors for AsyncBuffers
interface AsyncBufferFromFile {
Expand Down Expand Up @@ -33,38 +37,69 @@ export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'comp
* - 'parsers' are not configurable, the worker uses the default parsers
*/
export type ParquetReadObjectsWorkerOptions = Omit<ParquetReadWorkerOptions, 'onComplete'>
/**
* Options for the worker version of parquetQuery
* The same options as parquetQuery, but:
* - 'file' must be replaced with 'from': "AsyncBufferFrom"
* - 'compressors' are not configurable, the worker uses hyparquet-compressors
* - 'parsers' are not configurable, the worker uses the default parsers
*/
export interface ParquetQueryWorkerOptions extends ParquetReadWorkerOptions {
filter?: ParquetQueryFilter,
orderBy?: string
}

/**
* Messages sent by the client function to the worker
*/
export interface QueryId {
queryId: number
}
export type SerializableOptions = Omit<ParquetReadWorkerOptions, 'onComplete' | 'onChunk' | 'onPage'>
export interface ClientMessage extends SerializableOptions, QueryId {
kind: 'parquetReadObjects' | 'parquetRead'
export interface From {
from: AsyncBufferFrom
}
export interface ParquetReadClientMessage extends QueryId, From {
kind: 'parquetRead'
options: Omit<ParquetReadWorkerOptions, 'onComplete' | 'onChunk' | 'onPage' | 'from'>
}
export interface ParquetReadObjectsClientMessage extends QueryId, From {
kind: 'parquetReadObjects'
options: Omit<ParquetReadObjectsWorkerOptions, 'onChunk' | 'onPage'| 'from'>
}
export interface ParquetQueryClientMessage extends QueryId, From {
kind: 'parquetQuery'
options: Omit<ParquetQueryWorkerOptions, 'onComplete' | 'onChunk' | 'onPage'| 'from'>
}
export type ClientMessage = ParquetQueryClientMessage | ParquetReadObjectsClientMessage | ParquetReadClientMessage

/**
* Messages sent by the worker to the client
*/
// export interface ResultMessage {
// queryId: number
// }
export interface CompleteMessage extends QueryId {
kind: 'onComplete'
rows: Rows
}
export interface ChunkMessage extends QueryId {
kind: 'onChunk'
chunk: ColumnData
}
export interface PageMessage extends QueryId {
kind: 'onPage'
page: ColumnData
}
export interface ErrorMessage extends QueryId {
export interface RejectMessage extends QueryId {
kind: 'onReject'
error: Error
}
export interface RowObjectsResultMessage extends QueryId {
rowObjects: Rows
export interface ParquetReadResolveMessage extends QueryId {
kind: 'onParquetReadResolve'
}
export interface ParquetReadObjectsResolveMessage extends QueryId {
kind: 'onParquetReadObjectsResolve'
rows: Rows
}
export interface ParquetQueryResolveMessage extends QueryId {
kind: 'onParquetQueryResolve'
rows: Rows
}
export type EmptyResultMessage = QueryId
export type WorkerMessage = CompleteMessage | ChunkMessage | PageMessage | ErrorMessage | RowObjectsResultMessage | EmptyResultMessage
export type WorkerMessage = CompleteMessage | ChunkMessage | PageMessage | RejectMessage | ParquetReadResolveMessage | ParquetReadObjectsResolveMessage | ParquetQueryResolveMessage