Skip to content

Commit 2ddd598

Browse files
authored
add parquetReadWorker (#295)
1 parent 9789f2d commit 2ddd598

File tree

4 files changed

+130
-58
lines changed

4 files changed

+130
-58
lines changed

src/lib/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,5 @@ export { appendSearchParams, replaceSearchParams } from './routes.js'
22
export * from './sources/index.js'
33
export { parquetDataFrame } from './tableProvider.js'
44
export { asyncBufferFrom, cn, contentTypes, formatFileSize, getFileDate, getFileDateShort, imageTypes, parseFileSize } from './utils.js'
5-
// export parquetQueryWorker for backward-compatibility
6-
export { parquetReadWorker as parquetQueryWorker, parquetReadObjectsWorker, parquetReadWorker } from './workers/parquetWorkerClient.js'
5+
export { parquetQueryWorker, parquetReadObjectsWorker, parquetReadWorker } from './workers/parquetWorkerClient.js'
76
export type { AsyncBufferFrom } from './workers/types.js'

src/lib/workers/parquetWorker.ts

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,46 @@
11
import type { ColumnData } from 'hyparquet'
2-
import { AsyncBuffer, parquetRead, parquetReadObjects } from 'hyparquet'
2+
import { AsyncBuffer, parquetQuery, parquetRead, parquetReadObjects } from 'hyparquet'
33
import { compressors } from 'hyparquet-compressors'
4-
import type { ChunkMessage, ClientMessage, CompleteMessage, EmptyResultMessage, ErrorMessage, PageMessage, RowObjectsResultMessage } from './types.js'
4+
import type { ChunkMessage, ClientMessage, CompleteMessage, PageMessage, ParquetQueryResolveMessage, ParquetReadObjectsResolveMessage, ParquetReadResolveMessage, RejectMessage } from './types.js'
55
import { fromToAsyncBuffer } from './utils.js'
66

77
const cache = new Map<string, Promise<AsyncBuffer>>()
88

9-
function postCompleteMessage ({ queryId, rows }: CompleteMessage) {
10-
self.postMessage({ queryId, rows })
9+
function postCompleteMessage ({ queryId, rows }: Omit<CompleteMessage, 'kind'>) {
10+
self.postMessage({ kind: 'onComplete', queryId, rows })
1111
}
12-
function postChunkMessage ({ chunk, queryId }: ChunkMessage) {
13-
self.postMessage({ chunk, queryId })
12+
function postChunkMessage ({ chunk, queryId }: Omit<ChunkMessage, 'kind'>) {
13+
self.postMessage({ kind: 'onChunk', chunk, queryId })
1414
}
15-
function postPageMessage ({ page, queryId }: PageMessage) {
16-
self.postMessage({ page, queryId })
15+
function postPageMessage ({ page, queryId }: Omit<PageMessage, 'kind'>) {
16+
self.postMessage({ kind: 'onPage', page, queryId })
1717
}
18-
function postErrorMessage ({ error, queryId }: ErrorMessage) {
19-
self.postMessage({ error, queryId })
18+
function postErrorMessage ({ error, queryId }: Omit<RejectMessage, 'kind'>) {
19+
self.postMessage({ kind: 'onReject', error, queryId })
2020
}
21-
function postRowObjectsResultMessage ({ queryId, rowObjects }: RowObjectsResultMessage) {
22-
self.postMessage({ queryId, rowObjects })
21+
function postParquetReadResultMessage ({ queryId }: Omit<ParquetReadResolveMessage, 'kind'>) {
22+
self.postMessage({ kind: 'onParquetReadResolve', queryId })
2323
}
24-
function postEmptyResultMessage ({ queryId }: EmptyResultMessage) {
25-
self.postMessage({ queryId })
24+
function postParquetReadObjectsResultMessage ({ queryId, rows }: Omit<ParquetReadObjectsResolveMessage, 'kind'>) {
25+
self.postMessage({ kind: 'onParquetReadObjectsResolve', queryId, rows })
26+
}
27+
function postParquetQueryResultMessage ({ queryId, rows }: Omit<ParquetQueryResolveMessage, 'kind'>) {
28+
self.postMessage({ kind: 'onParquetQueryResolve', queryId, rows })
2629
}
2730

2831
self.onmessage = async ({ data }: { data: ClientMessage }) => {
29-
const { kind, queryId, from, ...options } = data
32+
const { queryId, from, kind, options } = data
3033
const file = await fromToAsyncBuffer(from, cache)
3134
try {
3235
if (kind === 'parquetReadObjects') {
33-
const rowObjects = await parquetReadObjects({ ...options, file, compressors, onChunk, onPage })
34-
postRowObjectsResultMessage({ queryId, rowObjects })
36+
const rows = await parquetReadObjects({ ...options, file, compressors, onChunk, onPage })
37+
postParquetReadObjectsResultMessage({ queryId, rows })
38+
} else if (kind === 'parquetQuery') {
39+
const rows = await parquetQuery({ ...options, file, compressors, onComplete, onChunk, onPage })
40+
postParquetQueryResultMessage({ queryId, rows })
3541
} else {
3642
await parquetRead({ ...options, file, compressors, onComplete, onChunk, onPage })
37-
postEmptyResultMessage({ queryId })
43+
postParquetReadResultMessage({ queryId })
3844
}
3945
} catch (error) {
4046
postErrorMessage({ error: error as Error, queryId })
Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { ColumnData } from 'hyparquet'
2-
import type { ClientMessage, ParquetReadObjectsWorkerOptions, ParquetReadWorkerOptions, Rows, WorkerMessage } from './types.js'
2+
import type { ClientMessage, ParquetQueryWorkerOptions, ParquetReadObjectsWorkerOptions, ParquetReadWorkerOptions, Rows, WorkerMessage } from './types.js'
33

44
let worker: Worker | undefined
55
let nextQueryId = 0
@@ -8,8 +8,9 @@ interface Agent {
88
onChunk?: (chunk: ColumnData) => void
99
onPage?: (page: ColumnData) => void
1010
reject: (error: Error) => void
11-
resolveEmpty?: () => void
12-
resolveRowObjects?: (rowObjects: Rows) => void
11+
parquetReadResolve?: () => void
12+
parquetReadObjectsResolve?: (rows: Rows) => void
13+
parquetQueryResolve?: (rows: Rows) => void
1314
}
1415

1516
const pendingAgents = new Map<number, Agent>()
@@ -26,24 +27,37 @@ function getWorker() {
2627
return
2728
}
2829

29-
const { onComplete, onChunk, onPage, reject, resolveEmpty, resolveRowObjects } = pendingAgent
30-
if ('rows' in data) {
31-
onComplete?.(data.rows)
32-
} else if ('chunk' in data) {
33-
onChunk?.(data.chunk)
34-
} else if ('page' in data) {
35-
onPage?.(data.page)
36-
} else {
37-
if ('error' in data) {
38-
reject(data.error)
39-
} else if ('rowObjects' in data) {
40-
resolveRowObjects?.(data.rowObjects)
41-
} else {
42-
resolveEmpty?.()
43-
}
44-
/* clean up */
45-
pendingAgents.delete(data.queryId)
46-
// TODO(SL): maybe terminate the worker when no pending agents left
30+
const { onComplete, onChunk, onPage, reject, parquetReadResolve, parquetReadObjectsResolve, parquetQueryResolve } = pendingAgent
31+
switch (data.kind) {
32+
case 'onComplete':
33+
onComplete?.(data.rows)
34+
break
35+
case 'onChunk':
36+
onChunk?.(data.chunk)
37+
break
38+
case 'onPage':
39+
onPage?.(data.page)
40+
break
41+
default:
42+
switch (data.kind) {
43+
case 'onReject':
44+
if ('error' in data) { // check, just in case
45+
reject(data.error)
46+
}
47+
break
48+
case 'onParquetReadResolve':
49+
parquetReadResolve?.()
50+
break
51+
case 'onParquetReadObjectsResolve':
52+
parquetReadObjectsResolve?.(data.rows)
53+
break
54+
case 'onParquetQueryResolve':
55+
parquetQueryResolve?.(data.rows)
56+
break
57+
}
58+
/* clean up */
59+
pendingAgents.delete(data.queryId)
60+
// TODO(SL): maybe terminate the worker when no pending agents left
4761
}
4862
}
4963
}
@@ -58,12 +72,12 @@ function getWorker() {
5872
* the default parsers.
5973
*/
6074
export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<void> {
61-
const { onComplete, onChunk, onPage, ...serializableOptions } = options
75+
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options
6276
return new Promise((resolve, reject) => {
6377
const queryId = nextQueryId++
64-
pendingAgents.set(queryId, { resolveEmpty: resolve, reject, onComplete, onChunk, onPage })
78+
pendingAgents.set(queryId, { parquetReadResolve: resolve, reject, onComplete, onChunk, onPage })
6579
const worker = getWorker()
66-
const message: ClientMessage = { queryId, ...serializableOptions, kind: 'parquetRead' }
80+
const message: ClientMessage = { queryId, from, kind: 'parquetRead', options: serializableOptions }
6781
worker.postMessage(message)
6882
})
6983
}
@@ -76,12 +90,30 @@ export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<vo
7690
* the default parsers.
7791
*/
7892
export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOptions): Promise<Rows> {
79-
const { onChunk, onPage, ...serializableOptions } = options
93+
const { onChunk, onPage, from, ...serializableOptions } = options
8094
return new Promise((resolve, reject) => {
8195
const queryId = nextQueryId++
82-
pendingAgents.set(queryId, { resolveRowObjects: resolve, reject, onChunk, onPage })
96+
pendingAgents.set(queryId, { parquetReadObjectsResolve: resolve, reject, onChunk, onPage })
8397
const worker = getWorker()
84-
const message: ClientMessage = { queryId, ...serializableOptions, kind: 'parquetReadObjects' }
98+
const message: ClientMessage = { queryId, from, kind: 'parquetReadObjects', options: serializableOptions }
99+
worker.postMessage(message)
100+
})
101+
}
102+
103+
/**
104+
* Presents almost the same interface as parquetQuery, but runs in a worker.
105+
* This is useful for reading large parquet files without blocking the main thread.
106+
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
107+
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
108+
* the default parsers.
109+
*/
110+
export function parquetQueryWorker(options: ParquetQueryWorkerOptions): Promise<Rows> {
111+
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options
112+
return new Promise((resolve, reject) => {
113+
const queryId = nextQueryId++
114+
pendingAgents.set(queryId, { parquetQueryResolve: resolve, reject, onComplete, onChunk, onPage })
115+
const worker = getWorker()
116+
const message: ClientMessage = { queryId, from, kind: 'parquetQuery', options: serializableOptions }
85117
worker.postMessage(message)
86118
})
87119
}

src/lib/workers/types.ts

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
import type { ColumnData, ParquetReadOptions } from 'hyparquet'
2+
import { parquetQuery } from 'hyparquet'
3+
4+
// https://github.com/hyparam/hyparquet/pull/105
5+
type ParquetQueryFilter = Exclude<Parameters<typeof parquetQuery>[0]['filter'], undefined>
26

37
// Serializable constructors for AsyncBuffers
48
interface AsyncBufferFromFile {
@@ -33,38 +37,69 @@ export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'comp
3337
* - 'parsers' are not configurable, the worker uses the default parsers
3438
*/
3539
export type ParquetReadObjectsWorkerOptions = Omit<ParquetReadWorkerOptions, 'onComplete'>
40+
/**
41+
* Options for the worker version of parquetQuery
42+
* The same options as parquetQuery, but:
43+
* - 'file' must be replaced with 'from': "AsyncBufferFrom"
44+
* - 'compressors' are not configurable, the worker uses hyparquet-compressors
45+
* - 'parsers' are not configurable, the worker uses the default parsers
46+
*/
47+
export interface ParquetQueryWorkerOptions extends ParquetReadWorkerOptions {
48+
filter?: ParquetQueryFilter,
49+
orderBy?: string
50+
}
3651

3752
/**
3853
* Messages sent by the client function to the worker
3954
*/
4055
export interface QueryId {
4156
queryId: number
4257
}
43-
export type SerializableOptions = Omit<ParquetReadWorkerOptions, 'onComplete' | 'onChunk' | 'onPage'>
44-
export interface ClientMessage extends SerializableOptions, QueryId {
45-
kind: 'parquetReadObjects' | 'parquetRead'
58+
export interface From {
59+
from: AsyncBufferFrom
4660
}
61+
export interface ParquetReadClientMessage extends QueryId, From {
62+
kind: 'parquetRead'
63+
options: Omit<ParquetReadWorkerOptions, 'onComplete' | 'onChunk' | 'onPage' | 'from'>
64+
}
65+
export interface ParquetReadObjectsClientMessage extends QueryId, From {
66+
kind: 'parquetReadObjects'
67+
options: Omit<ParquetReadObjectsWorkerOptions, 'onChunk' | 'onPage'| 'from'>
68+
}
69+
export interface ParquetQueryClientMessage extends QueryId, From {
70+
kind: 'parquetQuery'
71+
options: Omit<ParquetQueryWorkerOptions, 'onComplete' | 'onChunk' | 'onPage'| 'from'>
72+
}
73+
export type ClientMessage = ParquetQueryClientMessage | ParquetReadObjectsClientMessage | ParquetReadClientMessage
4774

4875
/**
4976
* Messages sent by the worker to the client
5077
*/
51-
// export interface ResultMessage {
52-
// queryId: number
53-
// }
5478
export interface CompleteMessage extends QueryId {
79+
kind: 'onComplete'
5580
rows: Rows
5681
}
5782
export interface ChunkMessage extends QueryId {
83+
kind: 'onChunk'
5884
chunk: ColumnData
5985
}
6086
export interface PageMessage extends QueryId {
87+
kind: 'onPage'
6188
page: ColumnData
6289
}
63-
export interface ErrorMessage extends QueryId {
90+
export interface RejectMessage extends QueryId {
91+
kind: 'onReject'
6492
error: Error
6593
}
66-
export interface RowObjectsResultMessage extends QueryId {
67-
rowObjects: Rows
94+
export interface ParquetReadResolveMessage extends QueryId {
95+
kind: 'onParquetReadResolve'
96+
}
97+
export interface ParquetReadObjectsResolveMessage extends QueryId {
98+
kind: 'onParquetReadObjectsResolve'
99+
rows: Rows
100+
}
101+
export interface ParquetQueryResolveMessage extends QueryId {
102+
kind: 'onParquetQueryResolve'
103+
rows: Rows
68104
}
69-
export type EmptyResultMessage = QueryId
70-
export type WorkerMessage = CompleteMessage | ChunkMessage | PageMessage | ErrorMessage | RowObjectsResultMessage | EmptyResultMessage
105+
export type WorkerMessage = CompleteMessage | ChunkMessage | PageMessage | RejectMessage | ParquetReadResolveMessage | ParquetReadObjectsResolveMessage | ParquetQueryResolveMessage

0 commit comments

Comments
 (0)