Skip to content

Commit 0fc591a

Browse files
authored
Add a worker version of parquetReadObjects (#293)
* factorize code * rename parquetQueryWorker to parquetReadWorker, add parquetReadObjectsWorker
1 parent 66896a3 commit 0fc591a

File tree

7 files changed

+156
-67
lines changed

7 files changed

+156
-67
lines changed

src/lib/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ export { appendSearchParams, replaceSearchParams } from './routes.js'
22
export * from './sources/index.js'
33
export { parquetDataFrame } from './tableProvider.js'
44
export { asyncBufferFrom, cn, contentTypes, formatFileSize, getFileDate, getFileDateShort, imageTypes, parseFileSize } from './utils.js'
5-
export { parquetQueryWorker } from './workers/parquetWorkerClient.js'
5+
// export parquetQueryWorker for backward-compatibility
6+
export { parquetReadWorker as parquetQueryWorker, parquetReadObjectsWorker, parquetReadWorker } from './workers/parquetWorkerClient.js'
67
export type { AsyncBufferFrom } from './workers/types.js'

src/lib/tableProvider.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { DataFrame, DataFrameEvents, ResolvedValue, UnsortableDataFrame, createEventTarget, sortableDataFrame } from 'hightable'
22
import type { ColumnData } from 'hyparquet'
33
import { FileMetaData, parquetSchema } from 'hyparquet'
4-
import { parquetQueryWorker } from './workers/parquetWorkerClient.js'
4+
import { parquetReadWorker } from './workers/parquetWorkerClient.js'
55
import type { AsyncBufferFrom } from './workers/types.d.ts'
66

77
type GroupStatus = {
@@ -54,7 +54,7 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
5454

5555
// TODO(SL): pass AbortSignal to the worker?
5656
if (columnsToFetch.length > 0) {
57-
const commonPromise = parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd, columns: columnsToFetch, onChunk })
57+
const commonPromise = parquetReadWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd, columns: columnsToFetch, onChunk })
5858
columnsToFetch.forEach(column => {
5959
state.set(column, { kind: 'fetching', promise: commonPromise })
6060
})

src/lib/utils.ts

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { stringify } from 'hightable'
2-
import { AsyncBuffer, asyncBufferFromUrl, cachedAsyncBuffer } from 'hyparquet'
2+
import { AsyncBuffer } from 'hyparquet'
33
import { AsyncBufferFrom } from './workers/types.js'
4+
import { fromToAsyncBuffer } from './workers/utils.js'
45

56
/**
67
* Helper function to join class names
@@ -13,17 +14,7 @@ export function cn(...names: (string | undefined | false)[]): string {
1314
* Convert AsyncBufferFromUrl to AsyncBuffer.
1415
*/
1516
export function asyncBufferFrom(from: AsyncBufferFrom): Promise<AsyncBuffer> {
16-
if ('url' in from) {
17-
// Cached asyncBuffer for urls only
18-
const key = JSON.stringify(from)
19-
const cached = cache.get(key)
20-
if (cached) return cached
21-
const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer)
22-
cache.set(key, asyncBuffer)
23-
return asyncBuffer
24-
} else {
25-
return from.file.arrayBuffer()
26-
}
17+
return fromToAsyncBuffer(from, cache)
2718
}
2819
const cache = new Map<string, Promise<AsyncBuffer>>()
2920
// TODO(SL): do we really want a singleton?

src/lib/workers/parquetWorker.ts

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,52 @@
11
import type { ColumnData } from 'hyparquet'
2-
import { AsyncBuffer, asyncBufferFromUrl, cachedAsyncBuffer, parquetRead } from 'hyparquet'
2+
import { AsyncBuffer, parquetRead, parquetReadObjects } from 'hyparquet'
33
import { compressors } from 'hyparquet-compressors'
4-
import type { AsyncBufferFrom, ChunkMessage, ClientMessage, ErrorMessage, ResultMessage } from './types.js'
4+
import type { ChunkMessage, ClientMessage, CompleteMessage, EmptyResultMessage, ErrorMessage, PageMessage, RowObjectsResultMessage } from './types.js'
5+
import { fromToAsyncBuffer } from './utils.js'
56

67
const cache = new Map<string, Promise<AsyncBuffer>>()
78

8-
export function asyncBufferFrom(from: AsyncBufferFrom): Promise<AsyncBuffer> {
9-
if ('url' in from) {
10-
// Cached asyncBuffer for urls only
11-
const key = JSON.stringify(from)
12-
const cached = cache.get(key)
13-
if (cached) return cached
14-
const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer)
15-
cache.set(key, asyncBuffer)
16-
return asyncBuffer
17-
} else {
18-
return from.file.arrayBuffer()
19-
}
9+
function postCompleteMessage ({ queryId, rows }: CompleteMessage) {
10+
self.postMessage({ queryId, rows })
2011
}
21-
2212
function postChunkMessage ({ chunk, queryId }: ChunkMessage) {
2313
self.postMessage({ chunk, queryId })
2414
}
25-
function postResultMessage ({ queryId }: ResultMessage) {
26-
self.postMessage({ queryId })
15+
function postPageMessage ({ page, queryId }: PageMessage) {
16+
self.postMessage({ page, queryId })
2717
}
2818
function postErrorMessage ({ error, queryId }: ErrorMessage) {
2919
self.postMessage({ error, queryId })
3020
}
21+
function postRowObjectsResultMessage ({ queryId, rowObjects }: RowObjectsResultMessage) {
22+
self.postMessage({ queryId, rowObjects })
23+
}
24+
function postEmptyResultMessage ({ queryId }: EmptyResultMessage) {
25+
self.postMessage({ queryId })
26+
}
3127

3228
self.onmessage = async ({ data }: { data: ClientMessage }) => {
33-
const { rowStart, rowEnd, columns, metadata, from, queryId } = data
34-
const file = await asyncBufferFrom(from)
29+
const { kind, queryId, from, ...options } = data
30+
const file = await fromToAsyncBuffer(from, cache)
3531
try {
36-
await parquetRead({ metadata, file, rowStart, rowEnd, columns, compressors, onChunk })
37-
postResultMessage({ queryId })
32+
if (kind === 'parquetReadObjects') {
33+
const rowObjects = await parquetReadObjects({ ...options, file, compressors, onChunk, onPage })
34+
postRowObjectsResultMessage({ queryId, rowObjects })
35+
} else {
36+
await parquetRead({ ...options, file, compressors, onComplete, onChunk, onPage })
37+
postEmptyResultMessage({ queryId })
38+
}
3839
} catch (error) {
3940
postErrorMessage({ error: error as Error, queryId })
4041
}
4142

43+
function onComplete(rows: unknown[][]) {
44+
postCompleteMessage({ queryId, rows })
45+
}
4246
function onChunk(chunk: ColumnData) {
4347
postChunkMessage({ chunk, queryId })
4448
}
49+
function onPage(page: ColumnData) {
50+
postPageMessage({ page, queryId })
51+
}
4552
}
Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,49 @@
11
import type { ColumnData } from 'hyparquet'
2-
import type { ClientMessage, WorkerMessage, WorkerOptions } from './types.js'
2+
import type { ClientMessage, ParquetReadObjectsWorkerOptions, ParquetReadWorkerOptions, Rows, WorkerMessage } from './types.js'
33

44
let worker: Worker | undefined
55
let nextQueryId = 0
6-
interface QueryAgent {
7-
resolve: () => void
6+
interface Agent {
7+
onComplete?: (rows: Rows) => void
8+
onChunk?: (chunk: ColumnData) => void
9+
onPage?: (page: ColumnData) => void
810
reject: (error: Error) => void
9-
onChunk: (chunk: ColumnData) => void
11+
resolveEmpty?: () => void
12+
resolveRowObjects?: (rowObjects: Rows) => void
1013
}
1114

12-
const pending = new Map<number, QueryAgent>()
15+
const pendingAgents = new Map<number, Agent>()
1316

1417
function getWorker() {
1518
if (!worker) {
1619
worker = new Worker(new URL('./parquetWorker.js', import.meta.url), { type: 'module' })
1720
worker.onmessage = ({ data }: { data: WorkerMessage }) => {
18-
const pendingQueryAgent = pending.get(data.queryId)
19-
if (!pendingQueryAgent) {
21+
const pendingAgent = pendingAgents.get(data.queryId)
22+
if (!pendingAgent) {
2023
console.warn(
2124
`Unexpected: no pending promise found for queryId: ${data.queryId.toString()}`
2225
)
2326
return
2427
}
2528

26-
const { onChunk, resolve, reject } = pendingQueryAgent
27-
if ('error' in data) {
28-
reject(data.error)
29+
const { onComplete, onChunk, onPage, reject, resolveEmpty, resolveRowObjects } = pendingAgent
30+
if ('rows' in data) {
31+
onComplete?.(data.rows)
2932
} else if ('chunk' in data) {
30-
onChunk(data.chunk)
33+
onChunk?.(data.chunk)
34+
} else if ('page' in data) {
35+
onPage?.(data.page)
3136
} else {
32-
resolve()
37+
if ('error' in data) {
38+
reject(data.error)
39+
} else if ('rowObjects' in data) {
40+
resolveRowObjects?.(data.rowObjects)
41+
} else {
42+
resolveEmpty?.()
43+
}
44+
/* clean up */
45+
pendingAgents.delete(data.queryId)
46+
// TODO(SL): maybe terminate the worker when no pending agents left
3347
}
3448
}
3549
}
@@ -40,14 +54,34 @@ function getWorker() {
4054
* Presents almost the same interface as parquetRead, but runs in a worker.
4155
* This is useful for reading large parquet files without blocking the main thread.
4256
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
43-
* to be serialized to the worker.
57+
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
58+
* the default parsers.
4459
*/
45-
export function parquetQueryWorker({ metadata, from, rowStart, rowEnd, columns, onChunk }: WorkerOptions): Promise<void> {
60+
export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<void> {
61+
const { onComplete, onChunk, onPage, ...serializableOptions } = options
4662
return new Promise((resolve, reject) => {
4763
const queryId = nextQueryId++
48-
pending.set(queryId, { resolve, reject, onChunk })
64+
pendingAgents.set(queryId, { resolveEmpty: resolve, reject, onComplete, onChunk, onPage })
4965
const worker = getWorker()
50-
const message: ClientMessage = { queryId, metadata, from, rowStart, rowEnd, columns }
66+
const message: ClientMessage = { queryId, ...serializableOptions, kind: 'parquetRead' }
67+
worker.postMessage(message)
68+
})
69+
}
70+
71+
/**
72+
* Presents almost the same interface as parquetReadObjects, but runs in a worker.
73+
* This is useful for reading large parquet files without blocking the main thread.
74+
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
75+
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
76+
* the default parsers.
77+
*/
78+
export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOptions): Promise<Rows> {
79+
const { onChunk, onPage, ...serializableOptions } = options
80+
return new Promise((resolve, reject) => {
81+
const queryId = nextQueryId++
82+
pendingAgents.set(queryId, { resolveRowObjects: resolve, reject, onChunk, onPage })
83+
const worker = getWorker()
84+
const message: ClientMessage = { queryId, ...serializableOptions, kind: 'parquetReadObjects' }
5185
worker.postMessage(message)
5286
})
5387
}

src/lib/workers/types.ts

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { ColumnData, FileMetaData } from 'hyparquet'
1+
import type { ColumnData, ParquetReadOptions } from 'hyparquet'
22

33
// Serializable constructors for AsyncBuffers
44
interface AsyncBufferFromFile {
@@ -12,23 +12,59 @@ interface AsyncBufferFromUrl {
1212
}
1313
export type AsyncBufferFrom = AsyncBufferFromFile | AsyncBufferFromUrl
1414

15-
export interface ResultMessage {
15+
export type Rows = unknown[][] | Record<string, unknown>[]
16+
17+
/**
18+
* Options for the worker version of parquetRead
19+
* The same options as parquetRead, but:
20+
* - 'file' must be replaced with 'from': "AsyncBufferFrom"
21+
* - 'compressors' are not configurable, the worker uses hyparquet-compressors
22+
* - 'parsers' are not configurable, the worker uses the default parsers
23+
*/
24+
export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'compressors' | 'parsers' | 'file' | 'onComplete'> {
25+
onComplete?: (rows: Rows) => void // fix for https://github.com/hyparam/hyparquet/issues/28
26+
from: AsyncBufferFrom
27+
}
28+
/**
29+
* Options for the worker version of parquetReadObjects
30+
* The same options as parquetReadObjects, but:
31+
* - 'file' must be replaced with 'from': "AsyncBufferFrom"
32+
* - 'compressors' are not configurable, the worker uses hyparquet-compressors
33+
* - 'parsers' are not configurable, the worker uses the default parsers
34+
*/
35+
export type ParquetReadObjectsWorkerOptions = Omit<ParquetReadWorkerOptions, 'onComplete'>
36+
37+
/**
38+
* Messages sent by the client function to the worker
39+
*/
40+
export interface QueryId {
1641
queryId: number
1742
}
18-
export interface ErrorMessage extends ResultMessage {
19-
error: Error
43+
export type SerializableOptions = Omit<ParquetReadWorkerOptions, 'onComplete' | 'onChunk' | 'onPage'>
44+
export interface ClientMessage extends SerializableOptions, QueryId {
45+
kind: 'parquetReadObjects' | 'parquetRead'
46+
}
47+
48+
/**
49+
* Messages sent by the worker to the client
50+
*/
51+
// export interface ResultMessage {
52+
// queryId: number
53+
// }
54+
export interface CompleteMessage extends QueryId {
55+
rows: Rows
2056
}
21-
export interface ChunkMessage extends ResultMessage {
57+
export interface ChunkMessage extends QueryId {
2258
chunk: ColumnData
2359
}
24-
export type WorkerMessage = ChunkMessage | ResultMessage | ErrorMessage
25-
26-
export interface WorkerOptions {
27-
metadata: FileMetaData,
28-
from: AsyncBufferFrom
29-
rowStart?: number,
30-
rowEnd?: number,
31-
columns?: string[],
32-
onChunk: (chunk: ColumnData) => void
60+
export interface PageMessage extends QueryId {
61+
page: ColumnData
62+
}
63+
export interface ErrorMessage extends QueryId {
64+
error: Error
65+
}
66+
export interface RowObjectsResultMessage extends QueryId {
67+
rowObjects: Rows
3368
}
34-
export type ClientMessage = Omit<WorkerOptions, 'onChunk'> & ResultMessage
69+
export type EmptyResultMessage = QueryId
70+
export type WorkerMessage = CompleteMessage | ChunkMessage | PageMessage | ErrorMessage | RowObjectsResultMessage | EmptyResultMessage

src/lib/workers/utils.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { AsyncBuffer, asyncBufferFromUrl, cachedAsyncBuffer } from 'hyparquet'
2+
import { AsyncBufferFrom } from './types.js'
3+
4+
export function fromToAsyncBuffer(from: AsyncBufferFrom, cache?: Map<string, Promise<AsyncBuffer>>): Promise<AsyncBuffer> {
5+
if ('url' in from) {
6+
// Cached asyncBuffer for urls only
7+
const key = JSON.stringify(from)
8+
if (cache) {
9+
const cached = cache.get(key)
10+
if (cached) return cached
11+
}
12+
const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer)
13+
if (cache) {
14+
cache.set(key, asyncBuffer)
15+
}
16+
return asyncBuffer
17+
} else {
18+
return from.file.arrayBuffer()
19+
}
20+
}

0 commit comments

Comments
 (0)