Skip to content

Commit 27c04a1

Browse files
committed
only fetch the values for the column, without transposing to objects
1 parent 2617322 commit 27c04a1

File tree

3 files changed

+69
-5
lines changed

3 files changed

+69
-5
lines changed

src/lib/getParquetColumn.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { ColumnData, ParquetReadOptions, parquetRead } from 'hyparquet'
2+
3+
type GetColumnOptions = Omit<ParquetReadOptions, 'columns' | 'rowStart' | 'rowEnd' | 'onChunk' | 'onComplete'> & {column: string}
4+
5+
export async function getParquetColumn({ metadata, file, column, compressors }: GetColumnOptions): Promise<unknown[]> {
6+
const numRows = Number(metadata?.num_rows)
7+
if (isNaN(numRows)) {
8+
throw new Error('metadata.num_rows is undefined')
9+
}
10+
if (numRows === 0) {
11+
return []
12+
}
13+
const lastError: {error?: Error} = {}
14+
const values: unknown[] = Array(numRows).fill(undefined)
15+
const ranges: [number, number][] = []
16+
function onChunk({ columnName, columnData, rowStart, rowEnd }: ColumnData) {
17+
if (columnName !== column) {
18+
lastError.error = new Error(`unexpected column name ${columnName}`)
19+
}
20+
for (let i = rowStart; i < rowEnd; i++) {
21+
values[i] = columnData[i - rowStart]
22+
}
23+
ranges.push([rowStart, rowEnd])
24+
}
25+
26+
// this awaits all the promises. When it returns, all the data should have already been sent using onChunk
27+
await parquetRead({ metadata, file, columns: [column], compressors, onChunk })
28+
29+
// Do some checks before returning the data
30+
31+
// check for errors
32+
if (lastError.error !== undefined) {
33+
throw lastError.error
34+
}
35+
36+
// check for missing data (should be faster than checking for undefined values in the array)
37+
const sortedRanges = ranges.sort((a, b) => a[0] - b[0])
38+
for (let i = 0; i < sortedRanges.length - 1; i++) {
39+
const range = sortedRanges[i]
40+
const nextRange = sortedRanges[i + 1]
41+
if (!range || !nextRange) {
42+
throw new Error('The ranges should not be undefined')
43+
}
44+
if (range[1] !== nextRange[0]) {
45+
throw new Error(`missing data between rows ${range[1]} and ${nextRange[0]}`)
46+
}
47+
}
48+
const firstRange = sortedRanges[0]
49+
if (!firstRange) {
50+
throw new Error('The first range should not be undefined')
51+
}
52+
if (firstRange[0] !== 0) {
53+
throw new Error(`missing data before row ${firstRange[0]}`)
54+
}
55+
const lastRange = sortedRanges[sortedRanges.length - 1]
56+
if (!lastRange) {
57+
throw new Error('The last range should not be undefined')
58+
}
59+
if (lastRange[1] !== numRows) {
60+
throw new Error(`missing data after row ${lastRange[1]}`)
61+
}
62+
63+
// return the values
64+
return values
65+
}

src/lib/workers/parquetWorker.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { ColumnData, parquetQuery } from 'hyparquet'
22
import { compressors } from 'hyparquet-compressors'
3+
import { getParquetColumn } from '../getParquetColumn.js'
34
import { asyncBufferFrom } from '../utils.js'
45
import type { ChunkMessage, ClientMessage, ColumnRanksMessage, ErrorMessage, ResultMessage } from './types.js'
56

@@ -26,10 +27,9 @@ self.onmessage = async ({ data }: { data: ClientMessage }) => {
2627
// the descending order, because the rank is the first, not the last, of the ties. But it's enough for the
2728
// purpose of sorting.
2829

29-
// TODO(SL): ensure only the expected column is fetched
3030
try {
31-
const sortColumn = await parquetQuery({ metadata, file, columns: [column], compressors })
32-
const valuesWithIndex = sortColumn.map((row, index) => ({ value: row[column] as unknown, index }))
31+
const sortColumn = await getParquetColumn({ metadata, file, column, compressors })
32+
const valuesWithIndex = sortColumn.map((value, index) => ({ value, index }))
3333
const sortedValuesWithIndex = Array.from(valuesWithIndex).sort(({ value: a }, { value: b }) => compare<unknown>(a, b))
3434
const numRows = sortedValuesWithIndex.length
3535
const columnRanks = sortedValuesWithIndex.reduce((accumulator, currentValue, rank) => {

src/lib/workers/parquetWorkerClient.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ function getWorker() {
3434
}
3535

3636
if (pendingQueryAgent.kind === 'query') {
37-
const { resolve, reject } = pendingQueryAgent
38-
const { onChunk } = pendingQueryAgent
37+
const { onChunk, resolve, reject } = pendingQueryAgent
3938
if ('error' in data) {
4039
reject(data.error)
4140
} else if ('result' in data) {

0 commit comments

Comments
 (0)