Skip to content

Commit bd38102

Browse files
committed
Optionally send chunks back from parquet worker
1 parent afd26f7 commit bd38102

File tree

8 files changed

+22
-20
lines changed

8 files changed

+22
-20
lines changed

public/HighTable.css

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
.table thead th.orderby ::after {
6868
position: absolute;
6969
right: 8px;
70-
top: 4px;
70+
top: 6px;
7171
padding-left: 2px;
7272
background-color: #eaeaeb;
7373
content: "▾";

public/build/app.min.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

public/build/app.min.js.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

public/build/worker.min.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

public/build/worker.min.js.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/tableProvider.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
1111
header: children.map(child => child.element.name),
1212
numRows: Number(metadata.num_rows),
1313
rows(rowStart: number, rowEnd: number, orderBy?: string) {
14-
console.log('parquetDataFrame.rows', { rowStart, rowEnd, orderBy })
1514
return parquetQueryWorker({ asyncBuffer: from, rowStart, rowEnd, orderBy })
1615
},
1716
sortable: true,

src/workers/parquetWorker.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
import { parquetQuery } from 'hyparquet'
1+
import { ColumnData, parquetQuery } from 'hyparquet'
22
import { compressors } from 'hyparquet-compressors'
33
import { asyncBufferFrom } from './parquetWorkerClient.js'
44

55
self.onmessage = async ({ data }) => {
6-
const { metadata, asyncBuffer, rowStart, rowEnd, orderBy, queryId } = data
6+
const { metadata, asyncBuffer, rowStart, rowEnd, orderBy, queryId, chunks } = data
77
const file = await asyncBufferFrom(asyncBuffer)
8+
const onChunk = chunks ? (chunk: ColumnData) => self.postMessage({ chunk, queryId }) : undefined
89
try {
910
const result = await parquetQuery({
10-
metadata, file, rowStart, rowEnd, orderBy, compressors,
11+
metadata, file, rowStart, rowEnd, orderBy, compressors, onChunk
1112
})
1213
self.postMessage({ result, queryId })
1314
} catch (error) {

src/workers/parquetWorkerClient.ts

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { asyncBufferFromUrl, cachedAsyncBuffer, AsyncBuffer, FileMetaData } from 'hyparquet'
1+
import { asyncBufferFromUrl, cachedAsyncBuffer, AsyncBuffer, ParquetReadOptions } from 'hyparquet'
22

33
// Serializable constructor for AsyncBuffers
44
export interface AsyncBufferFrom {
@@ -7,13 +7,9 @@ export interface AsyncBufferFrom {
77
}
88

99
// Same as ParquetReadOptions, but AsyncBufferFrom instead of AsyncBuffer
10-
interface ParquetReadWorkerOptions {
10+
interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'file'> {
1111
asyncBuffer: AsyncBufferFrom
12-
metadata?: FileMetaData // parquet metadata, will be parsed if not provided
13-
columns?: number[] // columns to read, all columns if undefined
14-
rowStart?: number // inclusive
15-
rowEnd?: number // exclusive
16-
orderBy?: string // column to sort by
12+
orderBy?: string
1713
}
1814

1915
let worker: Worker | undefined
@@ -23,11 +19,11 @@ const pending = new Map<number, { resolve: (value: any) => void, reject: (error:
2319
/**
2420
* Presents almost the same interface as parquetRead, but runs in a worker.
2521
* This is useful for reading large parquet files without blocking the main thread.
26-
* Instead of taking an AsyncBuffer, it takes a FileContent, because it needs
22+
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
2723
* to be serialized to the worker.
2824
*/
29-
export function parquetQueryWorker({
30-
metadata, asyncBuffer, rowStart, rowEnd, orderBy }: ParquetReadWorkerOptions
25+
export function parquetQueryWorker(
26+
{ metadata, asyncBuffer, rowStart, rowEnd, orderBy, onChunk }: ParquetReadWorkerOptions
3127
): Promise<Record<string, any>[]> {
3228
return new Promise((resolve, reject) => {
3329
const queryId = nextQueryId++
@@ -42,12 +38,18 @@ export function parquetQueryWorker({
4238
reject(data.error)
4339
} else if (data.result) {
4440
resolve(data.result)
41+
} else if (data.chunk) {
42+
onChunk?.(data.chunk)
4543
} else {
4644
reject(new Error('Unexpected message from worker'))
4745
}
4846
}
4947
}
50-
worker.postMessage({ queryId, metadata, asyncBuffer, rowStart, rowEnd, orderBy })
48+
// If caller provided an onChunk callback, worker will send chunks as they are parsed
49+
const chunks = onChunk !== undefined
50+
worker.postMessage({
51+
queryId, metadata, asyncBuffer, rowStart, rowEnd, orderBy, chunks
52+
})
5153
})
5254
}
5355

0 commit comments

Comments
 (0)