Skip to content

Commit 0d5309d

Browse files
committed
Fix parquetQueryWorker for parallel requests.
Adds a queryId to the message protocol to match requests to results.
1 parent 87d1327 commit 0d5309d

File tree

8 files changed

+28
-22
lines changed

8 files changed

+28
-22
lines changed

package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
},
2424
"dependencies": {
2525
"highlight.js": "11.10.0",
26-
"hightable": "0.5.3",
26+
"hightable": "0.5.4",
2727
"hyparquet": "1.5.0",
2828
"hyparquet-compressors": "0.1.4"
2929
},
@@ -34,7 +34,7 @@
3434
"@rollup/plugin-terser": "0.4.4",
3535
"@rollup/plugin-typescript": "12.1.1",
3636
"@testing-library/react": "16.0.1",
37-
"@types/node": "22.7.5",
37+
"@types/node": "22.7.6",
3838
"@types/react": "18.3.11",
3939
"@types/react-dom": "18.3.1",
4040
"@vitejs/plugin-react": "4.3.2",
@@ -48,7 +48,7 @@
4848
"rollup-plugin-postcss": "4.0.2",
4949
"tslib": "2.8.0",
5050
"typescript": "5.6.3",
51-
"typescript-eslint": "8.9.0",
51+
"typescript-eslint": "8.10.0",
5252
"vitest": "2.1.3"
5353
}
5454
}

public/build/app.min.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

public/build/app.min.js.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

public/build/worker.min.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

public/build/worker.min.js.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/tableProvider.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
1111
header: children.map(child => child.element.name),
1212
numRows: Number(metadata.num_rows),
1313
rows(rowStart: number, rowEnd: number, orderBy?: string) {
14+
console.log('parquetDataFrame.rows', { rowStart, rowEnd, orderBy })
1415
return parquetQueryWorker({ asyncBuffer: from, rowStart, rowEnd, orderBy })
1516
},
1617
sortable: true,

src/workers/parquetWorker.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ import { compressors } from 'hyparquet-compressors'
33
import { asyncBufferFrom } from './parquetWorkerClient.js'
44

55
self.onmessage = async ({ data }) => {
6-
const { metadata, asyncBuffer, rowStart, rowEnd, orderBy } = data
6+
const { metadata, asyncBuffer, rowStart, rowEnd, orderBy, queryId } = data
77
const file = await asyncBufferFrom(asyncBuffer)
88
try {
99
const result = await parquetQuery({
1010
metadata, file, rowStart, rowEnd, orderBy, compressors,
1111
})
12-
self.postMessage({ result })
12+
self.postMessage({ result, queryId })
1313
} catch (error) {
14-
self.postMessage({ error })
14+
self.postMessage({ error, queryId })
1515
}
1616
}

src/workers/parquetWorkerClient.ts

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ interface ParquetReadWorkerOptions {
1717
}
1818

1919
let worker: Worker | undefined
20+
let nextQueryId = 0
21+
const pending = new Map<number, { resolve: (value: any) => void, reject: (error: any) => void }>()
2022

2123
/**
2224
* Presents almost the same interface as parquetRead, but runs in a worker.
@@ -28,21 +30,24 @@ export function parquetQueryWorker({
2830
metadata, asyncBuffer, rowStart, rowEnd, orderBy }: ParquetReadWorkerOptions
2931
): Promise<Record<string, any>[]> {
3032
return new Promise((resolve, reject) => {
33+
const queryId = nextQueryId++
34+
pending.set(queryId, { resolve, reject })
3135
// Create a worker
3236
if (!worker) {
3337
worker = new Worker(new URL('worker.min.js', import.meta.url))
34-
}
35-
worker.onmessage = ({ data }) => {
36-
// Convert postmessage data to callbacks
37-
if (data.error) {
38-
reject(data.error)
39-
} else if (data.result) {
40-
resolve(data.result)
41-
} else {
42-
reject(new Error('Unexpected message from worker'))
38+
worker.onmessage = ({ data }) => {
39+
const { resolve, reject } = pending.get(data.queryId)!
40+
// Convert postmessage data to callbacks
41+
if (data.error) {
42+
reject(data.error)
43+
} else if (data.result) {
44+
resolve(data.result)
45+
} else {
46+
reject(new Error('Unexpected message from worker'))
47+
}
4348
}
4449
}
45-
worker.postMessage({ metadata, asyncBuffer, rowStart, rowEnd, orderBy })
50+
worker.postMessage({ queryId, metadata, asyncBuffer, rowStart, rowEnd, orderBy })
4651
})
4752
}
4853

@@ -53,8 +58,8 @@ export async function asyncBufferFrom(from: AsyncBufferFrom): Promise<AsyncBuffe
5358
const key = JSON.stringify(from)
5459
const cached = cache.get(key)
5560
if (cached) return cached
56-
const asyncBuffer = asyncBufferFromUrl(from.url, from.byteLength)
57-
cache.set(key, asyncBuffer.then(cachedAsyncBuffer))
61+
const asyncBuffer = asyncBufferFromUrl(from.url, from.byteLength).then(cachedAsyncBuffer)
62+
cache.set(key, asyncBuffer)
5863
return asyncBuffer
5964
}
6065
const cache = new Map<string, Promise<AsyncBuffer>>()

0 commit comments

Comments
 (0)