Skip to content

Commit 1c10ac7

Browse files
committed
merge the workers back together
1 parent c6f3e2d commit 1c10ac7

File tree

8 files changed

+137
-191
lines changed

8 files changed

+137
-191
lines changed

packages/components/src/lib/tableProvider.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import { DataFrame, ResolvablePromise, resolvablePromise } from 'hightable'
22
import { FileMetaData, parquetSchema } from 'hyparquet'
3-
import { parquetQueryWorker } from '../workers/parquetWorkerClient.ts'
4-
import { parquetSortIndexWorker } from '../workers/sortParquetWorkerClient.ts'
5-
import type { AsyncBufferFrom } from '../workers/types.ts'
3+
import { AsyncBufferFrom, parquetQueryWorker, parquetSortIndexWorker } from '../workers/parquetWorkerClient.ts'
64

75
type ResolvableRow = Record<string, ResolvablePromise<unknown>>;
86

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import { ColumnData, parquetQuery } from 'hyparquet'
22
import { compressors } from 'hyparquet-compressors'
3-
import { asyncBufferFrom } from './parquetWorkerClient.ts'
4-
import type {
3+
import {
54
ChunkMessage,
65
ErrorMessage,
6+
IndicesMessage,
77
ParquetReadWorkerOptions,
88
ResultMessage,
9-
} from './types.ts'
9+
asyncBufferFrom,
10+
compare,
11+
} from './parquetWorkerClient.ts'
1012

1113
function postChunkMessage ({ chunk, queryId }: ChunkMessage) {
1214
self.postMessage({ chunk, queryId })
@@ -17,6 +19,9 @@ function postResultMessage ({ result, queryId }: ResultMessage) {
1719
function postErrorMessage ({ error, queryId }: ErrorMessage) {
1820
self.postMessage({ error, queryId })
1921
}
22+
function postIndicesMessage ({ indices, queryId }: IndicesMessage) {
23+
self.postMessage({ indices, queryId })
24+
}
2025

2126
self.onmessage = async ({
2227
data,
@@ -32,26 +37,49 @@ self.onmessage = async ({
3237
columns,
3338
queryId,
3439
chunks,
40+
sortIndex,
3541
} = data
3642
const file = await asyncBufferFrom(from)
37-
const onChunk = chunks
38-
? (chunk: ColumnData) => {
39-
postChunkMessage({ chunk, queryId })
43+
if (sortIndex === undefined) {
44+
const onChunk = chunks
45+
? (chunk: ColumnData) => {
46+
postChunkMessage({ chunk, queryId })
47+
}
48+
: undefined
49+
try {
50+
const result = await parquetQuery({
51+
metadata,
52+
file,
53+
rowStart,
54+
rowEnd,
55+
orderBy,
56+
columns,
57+
compressors,
58+
onChunk,
59+
})
60+
postResultMessage({ result, queryId })
61+
} catch (error) {
62+
postErrorMessage({ error: error as Error, queryId })
63+
}
64+
} else {
65+
try {
66+
// Special case for sorted index
67+
if (orderBy === undefined)
68+
throw new Error('sortParquetWorker requires orderBy')
69+
if (rowStart !== undefined || rowEnd !== undefined)
70+
throw new Error('sortIndex requires all rows')
71+
const sortColumn = await parquetQuery({
72+
metadata,
73+
file,
74+
columns: [orderBy],
75+
compressors,
76+
})
77+
const indices = Array.from(sortColumn, (_, index) => index).sort((a, b) =>
78+
compare<unknown>(sortColumn[a][orderBy], sortColumn[b][orderBy]),
79+
)
80+
postIndicesMessage({ indices, queryId })
81+
} catch (error) {
82+
postErrorMessage({ error: error as Error, queryId })
4083
}
41-
: undefined
42-
try {
43-
const result = await parquetQuery({
44-
metadata,
45-
file,
46-
rowStart,
47-
rowEnd,
48-
orderBy,
49-
columns,
50-
compressors,
51-
onChunk,
52-
})
53-
postResultMessage({ result, queryId })
54-
} catch (error) {
55-
postErrorMessage({ error: error as Error, queryId })
5684
}
5785
}

packages/components/src/workers/parquetWorkerClient.ts

Lines changed: 83 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,105 @@
11
import ParquetWorker from './parquetWorker?worker&inline'
22
/// ^ the worker is bundled with the main thread code (inline) which is easier for users to import
33
/// (no need to copy the worker file to the right place)
4-
import { AsyncBuffer, ColumnData } from 'hyparquet'
4+
import { AsyncBuffer, ColumnData, FileMetaData, ParquetReadOptions } from 'hyparquet'
55
import { asyncBufferFromUrl } from '../lib/utils.ts'
6-
import type {
7-
AsyncBufferFrom,
8-
ParquetMessage,
9-
ParquetReadWorkerOptions,
10-
Row,
11-
} from './types.ts'
12-
// import { asyncBufferFromUrl, cachedAsyncBuffer, AsyncBuffer } from 'hyparquet'
6+
7+
// Serializable constructor for AsyncBuffers
8+
export interface AsyncBufferFrom {
9+
url: string
10+
byteLength: number
11+
headers?: Record<string, string>
12+
}
13+
// Same as ParquetReadOptions, but AsyncBufferFrom instead of AsyncBuffer
14+
export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'file'> {
15+
from: AsyncBufferFrom
16+
orderBy?: string
17+
sortIndex?: boolean
18+
}
19+
// Row is defined in hightable, but not exported + we change any to unknown
20+
export type Row = Record<string, unknown>;
21+
22+
interface Message {
23+
queryId: number
24+
}
25+
export interface ChunkMessage extends Message {
26+
chunk: ColumnData
27+
}
28+
export interface ResultMessage extends Message {
29+
result: Row[]
30+
}
31+
export interface IndicesMessage extends Message {
32+
indices: number[]
33+
}
34+
export interface ErrorMessage extends Message {
35+
error: Error
36+
}
37+
38+
export type ParquetMessage = ChunkMessage | ResultMessage | ErrorMessage
39+
export type SortParquetMessage = IndicesMessage | ErrorMessage
40+
41+
export interface ParquetSortIndexOptions {
42+
metadata: FileMetaData
43+
from: AsyncBufferFrom
44+
orderBy: string
45+
}
1346

1447

1548
let worker: Worker | undefined
1649
let nextQueryId = 0
17-
interface QueryAgent {
50+
interface SortQueryAgent {
51+
kind: 'sortIndex';
52+
resolve: (value: number[]) => void;
53+
reject: (error: Error) => void;
54+
}
55+
interface RowsQueryAgent {
56+
kind: 'query';
1857
resolve: (value: Row[]) => void;
1958
reject: (error: Error) => void;
2059
onChunk?: (chunk: ColumnData) => void;
2160
}
61+
type QueryAgent = SortQueryAgent | RowsQueryAgent
62+
2263
const pending = new Map<number, QueryAgent>()
2364

2465
function getWorker() {
2566
if (!worker) {
2667
worker = new ParquetWorker()
27-
worker.onmessage = ({ data }: { data: ParquetMessage }) => {
68+
worker.onmessage = ({ data }: { data: ParquetMessage | SortParquetMessage }) => {
2869
const pendingQueryAgent = pending.get(data.queryId)
2970
if (!pendingQueryAgent) {
3071
throw new Error(
3172
`Unexpected: no pending promise found for queryId: ${data.queryId.toString()}`,
3273
)
3374
// TODO(SL): should never happen. But if it does, I'm not sure if throwing an error here helps.
3475
}
35-
const { resolve, reject, onChunk } = pendingQueryAgent
36-
if ('error' in data) {
37-
reject(data.error)
38-
} else if ('result' in data) {
39-
resolve(data.result)
40-
} else if ('chunk' in data) {
41-
onChunk?.(data.chunk)
76+
if (pendingQueryAgent.kind === 'query') {
77+
const { resolve, reject, onChunk } = pendingQueryAgent
78+
if ('error' in data) {
79+
reject(data.error)
80+
} else if ('result' in data) {
81+
resolve(data.result)
82+
} else if ('chunk' in data) {
83+
onChunk?.(data.chunk)
84+
} else {
85+
reject(new Error('Unexpected message from worker'))
86+
}
4287
} else {
43-
reject(new Error('Unexpected message from worker'))
88+
const { resolve, reject } = pendingQueryAgent
89+
if ('error' in data) {
90+
reject(data.error)
91+
} else if ('indices' in data) {
92+
resolve(data.indices)
93+
} else {
94+
reject(new Error('Unexpected message from worker'))
95+
}
4496
}
4597
}
4698
}
4799
return worker
48100
}
49101

102+
50103
/**
51104
* Presents almost the same interface as parquetRead, but runs in a worker.
52105
* This is useful for reading large parquet files without blocking the main thread.
@@ -63,7 +116,7 @@ export function parquetQueryWorker({
63116
}: ParquetReadWorkerOptions): Promise<Row[]> {
64117
return new Promise((resolve, reject) => {
65118
const queryId = nextQueryId++
66-
pending.set(queryId, { resolve, reject, onChunk })
119+
pending.set(queryId, { kind: 'query', resolve, reject, onChunk })
67120
const worker = getWorker()
68121

69122
// If caller provided an onChunk callback, worker will send chunks as they are parsed
@@ -80,6 +133,17 @@ export function parquetQueryWorker({
80133
})
81134
}
82135

136+
export function parquetSortIndexWorker({ metadata, from, orderBy }: ParquetSortIndexOptions): Promise<number[]> {
137+
return new Promise((resolve, reject) => {
138+
const queryId = nextQueryId++
139+
pending.set(queryId, { kind: 'sortIndex', resolve, reject })
140+
const worker = getWorker()
141+
worker.postMessage({
142+
queryId, metadata, from, orderBy, sortIndex: true,
143+
})
144+
})
145+
}
146+
83147
/**
84148
* Convert AsyncBufferFrom to AsyncBuffer and cache results.
85149
*/

packages/components/src/workers/sortParquetWorker.ts

Lines changed: 0 additions & 53 deletions
This file was deleted.

packages/components/src/workers/sortParquetWorkerClient.ts

Lines changed: 0 additions & 50 deletions
This file was deleted.

0 commit comments

Comments
 (0)