Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ export { appendSearchParams, replaceSearchParams } from './routes.js'
export * from './sources/index.js'
export { parquetDataFrame } from './tableProvider.js'
export { asyncBufferFrom, cn, contentTypes, formatFileSize, getFileDate, getFileDateShort, imageTypes, parseFileSize } from './utils.js'
export { parquetQueryWorker } from './workers/parquetWorkerClient.js'
// export parquetQueryWorker for backward-compatibility
export { parquetReadWorker as parquetQueryWorker, parquetReadObjectsWorker, parquetReadWorker } from './workers/parquetWorkerClient.js'
export type { AsyncBufferFrom } from './workers/types.js'
4 changes: 2 additions & 2 deletions src/lib/tableProvider.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { DataFrame, DataFrameEvents, ResolvedValue, UnsortableDataFrame, createEventTarget, sortableDataFrame } from 'hightable'
import type { ColumnData } from 'hyparquet'
import { FileMetaData, parquetSchema } from 'hyparquet'
import { parquetQueryWorker } from './workers/parquetWorkerClient.js'
import { parquetReadWorker } from './workers/parquetWorkerClient.js'
import type { AsyncBufferFrom } from './workers/types.d.ts'

type GroupStatus = {
Expand Down Expand Up @@ -54,7 +54,7 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):

// TODO(SL): pass AbortSignal to the worker?
if (columnsToFetch.length > 0) {
const commonPromise = parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd, columns: columnsToFetch, onChunk })
const commonPromise = parquetReadWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd, columns: columnsToFetch, onChunk })
columnsToFetch.forEach(column => {
state.set(column, { kind: 'fetching', promise: commonPromise })
})
Expand Down
15 changes: 3 additions & 12 deletions src/lib/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { stringify } from 'hightable'
import { AsyncBuffer, asyncBufferFromUrl, cachedAsyncBuffer } from 'hyparquet'
import { AsyncBuffer } from 'hyparquet'
import { AsyncBufferFrom } from './workers/types.js'
import { fromToAsyncBuffer } from './workers/utils.js'

/**
* Helper function to join class names
Expand All @@ -13,17 +14,7 @@ export function cn(...names: (string | undefined | false)[]): string {
* Convert AsyncBufferFromUrl to AsyncBuffer.
*/
export function asyncBufferFrom(from: AsyncBufferFrom): Promise<AsyncBuffer> {
if ('url' in from) {
// Cached asyncBuffer for urls only
const key = JSON.stringify(from)
const cached = cache.get(key)
if (cached) return cached
const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer)
cache.set(key, asyncBuffer)
return asyncBuffer
} else {
return from.file.arrayBuffer()
}
return fromToAsyncBuffer(from, cache)
}
const cache = new Map<string, Promise<AsyncBuffer>>()
// TODO(SL): do we really want a singleton?
Expand Down
49 changes: 28 additions & 21 deletions src/lib/workers/parquetWorker.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,52 @@
import type { ColumnData } from 'hyparquet'
import { AsyncBuffer, asyncBufferFromUrl, cachedAsyncBuffer, parquetRead } from 'hyparquet'
import { AsyncBuffer, parquetRead, parquetReadObjects } from 'hyparquet'
import { compressors } from 'hyparquet-compressors'
import type { AsyncBufferFrom, ChunkMessage, ClientMessage, ErrorMessage, ResultMessage } from './types.js'
import type { ChunkMessage, ClientMessage, CompleteMessage, EmptyResultMessage, ErrorMessage, PageMessage, RowObjectsResultMessage } from './types.js'
import { fromToAsyncBuffer } from './utils.js'

const cache = new Map<string, Promise<AsyncBuffer>>()

export function asyncBufferFrom(from: AsyncBufferFrom): Promise<AsyncBuffer> {
if ('url' in from) {
// Cached asyncBuffer for urls only
const key = JSON.stringify(from)
const cached = cache.get(key)
if (cached) return cached
const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer)
cache.set(key, asyncBuffer)
return asyncBuffer
} else {
return from.file.arrayBuffer()
}
function postCompleteMessage ({ queryId, rows }: CompleteMessage) {
self.postMessage({ queryId, rows })
}

function postChunkMessage ({ chunk, queryId }: ChunkMessage) {
self.postMessage({ chunk, queryId })
}
function postResultMessage ({ queryId }: ResultMessage) {
self.postMessage({ queryId })
function postPageMessage ({ page, queryId }: PageMessage) {
self.postMessage({ page, queryId })
}
function postErrorMessage ({ error, queryId }: ErrorMessage) {
self.postMessage({ error, queryId })
}
function postRowObjectsResultMessage ({ queryId, rowObjects }: RowObjectsResultMessage) {
self.postMessage({ queryId, rowObjects })
}
function postEmptyResultMessage ({ queryId }: EmptyResultMessage) {
self.postMessage({ queryId })
}

self.onmessage = async ({ data }: { data: ClientMessage }) => {
const { rowStart, rowEnd, columns, metadata, from, queryId } = data
const file = await asyncBufferFrom(from)
const { kind, queryId, from, ...options } = data
const file = await fromToAsyncBuffer(from, cache)
try {
await parquetRead({ metadata, file, rowStart, rowEnd, columns, compressors, onChunk })
postResultMessage({ queryId })
if (kind === 'parquetReadObjects') {
const rowObjects = await parquetReadObjects({ ...options, file, compressors, onChunk, onPage })
postRowObjectsResultMessage({ queryId, rowObjects })
} else {
await parquetRead({ ...options, file, compressors, onComplete, onChunk, onPage })
postEmptyResultMessage({ queryId })
}
} catch (error) {
postErrorMessage({ error: error as Error, queryId })
}

function onComplete(rows: unknown[][]) {
postCompleteMessage({ queryId, rows })
}
function onChunk(chunk: ColumnData) {
postChunkMessage({ chunk, queryId })
}
function onPage(page: ColumnData) {
postPageMessage({ page, queryId })
}
}
66 changes: 50 additions & 16 deletions src/lib/workers/parquetWorkerClient.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,49 @@
import type { ColumnData } from 'hyparquet'
import type { ClientMessage, WorkerMessage, WorkerOptions } from './types.js'
import type { ClientMessage, ParquetReadObjectsWorkerOptions, ParquetReadWorkerOptions, Rows, WorkerMessage } from './types.js'

let worker: Worker | undefined
let nextQueryId = 0
interface QueryAgent {
resolve: () => void
interface Agent {
onComplete?: (rows: Rows) => void
onChunk?: (chunk: ColumnData) => void
onPage?: (page: ColumnData) => void
reject: (error: Error) => void
onChunk: (chunk: ColumnData) => void
resolveEmpty?: () => void
resolveRowObjects?: (rowObjects: Rows) => void
}

const pending = new Map<number, QueryAgent>()
const pendingAgents = new Map<number, Agent>()

function getWorker() {
if (!worker) {
worker = new Worker(new URL('./parquetWorker.js', import.meta.url), { type: 'module' })
worker.onmessage = ({ data }: { data: WorkerMessage }) => {
const pendingQueryAgent = pending.get(data.queryId)
if (!pendingQueryAgent) {
const pendingAgent = pendingAgents.get(data.queryId)
if (!pendingAgent) {
console.warn(
`Unexpected: no pending promise found for queryId: ${data.queryId.toString()}`
)
return
}

const { onChunk, resolve, reject } = pendingQueryAgent
if ('error' in data) {
reject(data.error)
const { onComplete, onChunk, onPage, reject, resolveEmpty, resolveRowObjects } = pendingAgent
if ('rows' in data) {
onComplete?.(data.rows)
} else if ('chunk' in data) {
onChunk(data.chunk)
onChunk?.(data.chunk)
} else if ('page' in data) {
onPage?.(data.page)
} else {
resolve()
if ('error' in data) {
reject(data.error)
} else if ('rowObjects' in data) {
resolveRowObjects?.(data.rowObjects)
} else {
resolveEmpty?.()
}
/* clean up */
pendingAgents.delete(data.queryId)
// TODO(SL): maybe terminate the worker when no pending agents left
}
}
}
Expand All @@ -40,14 +54,34 @@ function getWorker() {
* Presents almost the same interface as parquetRead, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
* to be serialized to the worker.
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
* the default parsers.
*/
export function parquetQueryWorker({ metadata, from, rowStart, rowEnd, columns, onChunk }: WorkerOptions): Promise<void> {
export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<void> {
const { onComplete, onChunk, onPage, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pending.set(queryId, { resolve, reject, onChunk })
pendingAgents.set(queryId, { resolveEmpty: resolve, reject, onComplete, onChunk, onPage })
const worker = getWorker()
const message: ClientMessage = { queryId, metadata, from, rowStart, rowEnd, columns }
const message: ClientMessage = { queryId, ...serializableOptions, kind: 'parquetRead' }
worker.postMessage(message)
})
}

/**
* Presents almost the same interface as parquetReadObjects, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
* the default parsers.
*/
export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOptions): Promise<Rows> {
const { onChunk, onPage, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { resolveRowObjects: resolve, reject, onChunk, onPage })
const worker = getWorker()
const message: ClientMessage = { queryId, ...serializableOptions, kind: 'parquetReadObjects' }
worker.postMessage(message)
})
}
66 changes: 51 additions & 15 deletions src/lib/workers/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ColumnData, FileMetaData } from 'hyparquet'
import type { ColumnData, ParquetReadOptions } from 'hyparquet'

// Serializable constructors for AsyncBuffers
interface AsyncBufferFromFile {
Expand All @@ -12,23 +12,59 @@ interface AsyncBufferFromUrl {
}
export type AsyncBufferFrom = AsyncBufferFromFile | AsyncBufferFromUrl

export interface ResultMessage {
export type Rows = unknown[][] | Record<string, unknown>[]

/**
* Options for the worker version of parquetRead
* The same options as parquetRead, but:
* - 'file' must be replaced with 'from': "AsyncBufferFrom"
* - 'compressors' are not configurable, the worker uses hyparquet-compressors
* - 'parsers' are not configurable, the worker uses the default parsers
*/
export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'compressors' | 'parsers' | 'file' | 'onComplete'> {
onComplete?: (rows: Rows) => void // fix for https://github.com/hyparam/hyparquet/issues/28
from: AsyncBufferFrom
}
/**
* Options for the worker version of parquetReadObjects
* The same options as parquetReadObjects, but:
* - 'file' must be replaced with 'from': "AsyncBufferFrom"
* - 'compressors' are not configurable, the worker uses hyparquet-compressors
* - 'parsers' are not configurable, the worker uses the default parsers
*/
export type ParquetReadObjectsWorkerOptions = Omit<ParquetReadWorkerOptions, 'onComplete'>

/**
* Messages sent by the client function to the worker
*/
export interface QueryId {
queryId: number
}
export interface ErrorMessage extends ResultMessage {
error: Error
export type SerializableOptions = Omit<ParquetReadWorkerOptions, 'onComplete' | 'onChunk' | 'onPage'>
export interface ClientMessage extends SerializableOptions, QueryId {
kind: 'parquetReadObjects' | 'parquetRead'
}

/**
* Messages sent by the worker to the client
*/
// export interface ResultMessage {
// queryId: number
// }
export interface CompleteMessage extends QueryId {
rows: Rows
}
export interface ChunkMessage extends ResultMessage {
export interface ChunkMessage extends QueryId {
chunk: ColumnData
}
export type WorkerMessage = ChunkMessage | ResultMessage | ErrorMessage

export interface WorkerOptions {
metadata: FileMetaData,
from: AsyncBufferFrom
rowStart?: number,
rowEnd?: number,
columns?: string[],
onChunk: (chunk: ColumnData) => void
export interface PageMessage extends QueryId {
page: ColumnData
}
export interface ErrorMessage extends QueryId {
error: Error
}
export interface RowObjectsResultMessage extends QueryId {
rowObjects: Rows
}
export type ClientMessage = Omit<WorkerOptions, 'onChunk'> & ResultMessage
export type EmptyResultMessage = QueryId
export type WorkerMessage = CompleteMessage | ChunkMessage | PageMessage | ErrorMessage | RowObjectsResultMessage | EmptyResultMessage
20 changes: 20 additions & 0 deletions src/lib/workers/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { AsyncBuffer, asyncBufferFromUrl, cachedAsyncBuffer } from 'hyparquet'
import { AsyncBufferFrom } from './types.js'

export function fromToAsyncBuffer(from: AsyncBufferFrom, cache?: Map<string, Promise<AsyncBuffer>>): Promise<AsyncBuffer> {
if ('url' in from) {
// Cached asyncBuffer for urls only
const key = JSON.stringify(from)
if (cache) {
const cached = cache.get(key)
if (cached) return cached
}
const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer)
if (cache) {
cache.set(key, asyncBuffer)
}
return asyncBuffer
} else {
return from.file.arrayBuffer()
}
}