Skip to content

Commit 098e7a7

Browse files
committed
fix types in worker
1 parent 96f533e commit 098e7a7

File tree

3 files changed

+18
-9
lines changed

3 files changed

+18
-9
lines changed

src/lib/workers/parquetWorker.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { ColumnData } from 'hyparquet'
22
import { AsyncBuffer, parquetQuery, parquetRead, parquetReadObjects } from 'hyparquet'
33
import { compressors } from 'hyparquet-compressors'
4-
import type { ChunkMessage, ClientMessage, CompleteMessage, PageMessage, ParquetQueryResolveMessage, ParquetReadObjectsResolveMessage, ParquetReadResolveMessage, RejectMessage } from './types.js'
4+
import type { ChunkMessage, ClientMessage, CompleteMessage, PageMessage, ParquetQueryResolveMessage, ParquetReadObjectsResolveMessage, ParquetReadResolveMessage, RejectMessage, Rows } from './types.js'
55
import { fromToAsyncBuffer } from './utils.js'
66

77
const cache = new Map<string, Promise<AsyncBuffer>>()
@@ -33,20 +33,20 @@ self.onmessage = async ({ data }: { data: ClientMessage }) => {
3333
const file = await fromToAsyncBuffer(from, cache)
3434
try {
3535
if (kind === 'parquetReadObjects') {
36-
const rows = await parquetReadObjects({ ...options, file, compressors, onChunk, onPage })
36+
const rows = (await parquetReadObjects({ ...options, rowFormat: 'object', file, compressors, onChunk, onPage })) as Rows
3737
postParquetReadObjectsResultMessage({ queryId, rows })
3838
} else if (kind === 'parquetQuery') {
39-
const rows = await parquetQuery({ ...options, file, compressors, onComplete, onChunk, onPage })
39+
const rows = (await parquetQuery({ ...options, rowFormat: 'object', file, compressors, onComplete, onChunk, onPage })) as Rows
4040
postParquetQueryResultMessage({ queryId, rows })
4141
} else {
42-
await parquetRead({ ...options, file, compressors, onComplete, onChunk, onPage })
42+
await parquetRead({ ...options, rowFormat: 'object', file, compressors, onComplete, onChunk, onPage })
4343
postParquetReadResultMessage({ queryId })
4444
}
4545
} catch (error) {
4646
postErrorMessage({ error: error as Error, queryId })
4747
}
4848

49-
function onComplete(rows: unknown[][]) {
49+
function onComplete(rows: Rows) {
5050
postCompleteMessage({ queryId, rows })
5151
}
5252
function onChunk(chunk: ColumnData) {

src/lib/workers/parquetWorkerClient.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type { ClientMessage, ParquetQueryWorkerOptions, ParquetReadObjectsWorker
77
let worker: Worker | undefined
88
let nextQueryId = 0
99
interface Agent {
10-
onComplete?: (rows: Rows) => void
10+
onComplete?: ((rows: Rows) => void)
1111
onChunk?: (chunk: ColumnData) => void
1212
onPage?: (page: ColumnData) => void
1313
reject: (error: Error) => void
@@ -73,6 +73,8 @@ function getWorker() {
7373
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
7474
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
7575
* the default parsers.
76+
*
77+
* Note that it only supports 'rowFormat: object' (the default).
7678
*/
7779
export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<void> {
7880
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options
@@ -91,6 +93,8 @@ export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<vo
9193
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
9294
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
9395
* the default parsers.
96+
*
97+
* Note that it only supports 'rowFormat: object' (the default).
9498
*/
9599
export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOptions): Promise<Rows> {
96100
const { onChunk, onPage, from, ...serializableOptions } = options
@@ -109,6 +113,8 @@ export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOption
109113
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
110114
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
111115
* the default parsers.
116+
*
117+
* Note that it only supports 'rowFormat: object' (the default).
112118
*/
113119
export function parquetQueryWorker(options: ParquetQueryWorkerOptions): Promise<Rows> {
114120
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options

src/lib/workers/types.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ interface AsyncBufferFromUrl {
1616
}
1717
export type AsyncBufferFrom = AsyncBufferFromFile | AsyncBufferFromUrl
1818

19-
export type Rows = unknown[][] | Record<string, unknown>[]
19+
// Only rowFormat 'object' is supported in the worker
20+
export type Rows = Record<string, unknown>[]
2021

2122
/**
2223
* Options for the worker version of parquetRead
@@ -25,9 +26,11 @@ export type Rows = unknown[][] | Record<string, unknown>[]
2526
* - 'compressors' are not configurable, the worker uses hyparquet-compressors
2627
* - 'parsers' are not configurable, the worker uses the default parsers
2728
*/
28-
export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'compressors' | 'parsers' | 'file' | 'onComplete'> {
29-
onComplete?: (rows: Rows) => void // fix for https://github.com/hyparam/hyparquet/issues/28
29+
export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'compressors' | 'parsers' | 'file' | 'rowFormat' | 'onComplete'> {
3030
from: AsyncBufferFrom
31+
// rowFormat 'array' is not supported in the worker.
32+
rowFormat?: 'object'
33+
onComplete?: (rows: Rows) => void
3134
}
3235
/**
3336
* Options for the worker version of parquetReadObjects

0 commit comments

Comments
 (0)