Skip to content

Commit 3a5053e

Browse files
severoplatypii
andauthored
[Breaking] remove sortability from parquetDataFrame (#310)
* update dependencies * update code to hightable 0.19.0 * update code related to hightable 0.19.1 * fix types in worker * parquetDataFrame returns an unsortable dataframe * upgrade deps --------- Co-authored-by: Kenny Daniel <[email protected]>
1 parent 929d360 commit 3a5053e

File tree

8 files changed

+41
-54
lines changed

8 files changed

+41
-54
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
"watch:url": "NODE_ENV=development nodemon bin/cli.js https://hyperparam.blob.core.windows.net/hyperparam/starcoderdata-js-00000-of-00065.parquet"
5656
},
5757
"dependencies": {
58-
"hightable": "0.18.5",
58+
"hightable": "0.19.4",
5959
"hyparquet": "1.18.0",
6060
"hyparquet-compressors": "1.1.1",
6161
"icebird": "0.3.0",

src/components/Cell/Cell.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ export default function CellView({ source, row, col }: CellProps) {
4242
setProgress(0.75)
4343
const df = parquetDataFrame(from, metadata)
4444

45-
const columnName = df.header[col]
45+
const columnName = df.columnDescriptors[col]?.name
4646
if (columnName === undefined) {
4747
throw new Error(`Column name missing at index col=${col}`)
4848
}
49-
await df.fetch({ rowStart: row, rowEnd: row + 1, columns: [columnName] })
49+
await df.fetch?.({ rowStart: row, rowEnd: row + 1, columns: [columnName] })
5050
const cell = df.getCell({ row, column: columnName })
5151
const text = cell === undefined ? UNLOADED_CELL_PLACEHOLDER : stringify(cell.value)
5252
setText(text)

src/components/CellPanel/CellPanel.tsx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose
5858
try {
5959
setProgress(0.5)
6060

61-
const columnName = df.header[col]
61+
const columnName = df.columnDescriptors[col]?.name
6262
if (columnName === undefined) {
6363
throw new Error(`Column name missing at index col=${col}`)
6464
}
@@ -67,7 +67,7 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose
6767
fillContent(undefined)
6868
return
6969
}
70-
await df.fetch({ rowStart: row, rowEnd: row + 1, columns: [columnName] })
70+
await df.fetch?.({ rowStart: row, rowEnd: row + 1, columns: [columnName] })
7171
cell = df.getCell({ row, column: columnName })
7272
if (cell === undefined) {
7373
throw new Error(`Cell at row=${row}, column=${columnName} is undefined`)
@@ -85,7 +85,7 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose
8585

8686
const headers = <>
8787
<SlideCloseButton onClick={onClose} />
88-
<span>column: {df.header[col]}</span>
88+
<span>column: {df.columnDescriptors[col]?.name}</span>
8989
<span>row: {row + 1}</span>
9090
</>
9191

src/components/ParquetView/ParquetView.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ export default function ParquetView({ source, setProgress, setError }: ViewerPro
8282
if (cell?.col === col && cell.row === row) {
8383
return undefined
8484
}
85-
const columnName = content?.dataframe.header[col]
85+
const columnName = content?.dataframe.columnDescriptors[col]?.name
8686
if (columnName === undefined || !content?.dataframe.getCell({ row, column: columnName })) {
8787
// don't open the cell panel until it has loaded
8888
return undefined

src/lib/tableProvider.ts

Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { DataFrame, DataFrameEvents, ResolvedValue, UnsortableDataFrame, createEventTarget, sortableDataFrame } from 'hightable'
1+
import { DataFrame, DataFrameEvents, ResolvedValue, checkSignal, createEventTarget, validateFetchParams, validateGetCellParams, validateGetRowNumberParams } from 'hightable'
22
import type { ColumnData } from 'hyparquet'
33
import { FileMetaData, ParquetReadOptions, parquetSchema } from 'hyparquet'
44
import { parquetReadWorker } from './workers/parquetWorkerClient.js'
@@ -20,13 +20,16 @@ interface VirtualRowGroup {
2020

2121
/**
2222
* Convert a parquet file into a dataframe.
23+
*
24+
* It fetches data on demand in chunks of 1000 rows within each row group.
25+
* It's not sortable. You can use sortableDataFrame from hightable to make it sortable.
2326
*/
24-
export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData, options?: Pick<ParquetReadOptions, 'utf8'>): DataFrame {
27+
export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData, options?: Pick<ParquetReadOptions, 'utf8'>): DataFrame<{parquet: FileMetaData}> {
2528
const { children } = parquetSchema(metadata)
26-
const header = children.map(child => child.element.name)
29+
const columnDescriptors = children.map(child => ({ name: child.element.name }))
2730
const eventTarget = createEventTarget<DataFrameEvents>()
2831

29-
const cellCache = new Map<string, ResolvedValue<unknown>[]>(header.map(name => [name, []]))
32+
const cellCache = new Map<string, ResolvedValue<unknown>[]>(columnDescriptors.map(({ name }) => [name, []]))
3033

3134
// virtual row groups are up to 1000 rows within row group boundaries
3235
const groups: VirtualRowGroup[] = []
@@ -39,7 +42,7 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData,
3942
groups.push({
4043
groupStart,
4144
groupEnd,
42-
state: new Map(header.map(name => [name, { kind: 'unfetched' }])),
45+
state: new Map(columnDescriptors.map(({ name }) => [name, { kind: 'unfetched' }])),
4346
})
4447
groupStart = groupEnd
4548
}
@@ -84,22 +87,21 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData,
8487

8588
const numRows = Number(metadata.num_rows)
8689

87-
const unsortableDataFrame: UnsortableDataFrame = {
88-
header,
90+
const unsortableDataFrame: DataFrame<{parquet: FileMetaData}> = {
91+
columnDescriptors,
8992
numRows,
90-
metadata,
93+
metadata: { parquet: metadata },
9194
eventTarget,
92-
getRowNumber({ row }) {
93-
validateRow({ row, data: { numRows } })
95+
getRowNumber({ row, orderBy }) {
96+
validateGetRowNumberParams({ row, orderBy, data: { numRows, columnDescriptors } })
9497
return { value: row }
9598
},
96-
getCell({ row, column }) {
97-
validateRow({ row, data: { numRows } })
98-
validateColumn({ column, data: { header } })
99+
getCell({ row, column, orderBy }) {
100+
validateGetCellParams({ row, column, orderBy, data: { numRows, columnDescriptors } })
99101
return cellCache.get(column)?.[row]
100102
},
101103
fetch: async ({ rowStart, rowEnd, columns, signal }) => {
102-
validateFetchParams({ rowStart, rowEnd, columns, data: { numRows, header } })
104+
validateFetchParams({ rowStart, rowEnd, columns, data: { numRows, columnDescriptors } })
103105
checkSignal(signal)
104106

105107
if (!columns || columns.length === 0) {
@@ -126,29 +128,5 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData,
126128
},
127129
}
128130

129-
return sortableDataFrame(unsortableDataFrame)
130-
}
131-
132-
function validateFetchParams({ rowStart, rowEnd, columns, data: { numRows, header } }: {rowStart: number, rowEnd: number, columns?: string[], data: Pick<DataFrame, 'numRows' | 'header'>}): void {
133-
if (rowStart < 0 || rowEnd > numRows || !Number.isInteger(rowStart) || !Number.isInteger(rowEnd) || rowStart > rowEnd) {
134-
throw new Error(`Invalid row range: ${rowStart} - ${rowEnd}, numRows: ${numRows}`)
135-
}
136-
if (columns?.some(column => !header.includes(column))) {
137-
throw new Error(`Invalid columns: ${columns.join(', ')}. Available columns: ${header.join(', ')}`)
138-
}
139-
}
140-
function validateRow({ row, data: { numRows } }: {row: number, data: Pick<DataFrame, 'numRows'>}): void {
141-
if (row < 0 || row >= numRows || !Number.isInteger(row)) {
142-
throw new Error(`Invalid row index: ${row}, numRows: ${numRows}`)
143-
}
144-
}
145-
function validateColumn({ column, data: { header } }: {column: string, data: Pick<DataFrame, 'header'>}): void {
146-
if (!header.includes(column)) {
147-
throw new Error(`Invalid column: ${column}. Available columns: ${header.join(', ')}`)
148-
}
149-
}
150-
function checkSignal(signal?: AbortSignal): void {
151-
if (signal?.aborted) {
152-
throw new DOMException('The operation was aborted.', 'AbortError')
153-
}
131+
return unsortableDataFrame
154132
}

src/lib/workers/parquetWorker.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { ColumnData } from 'hyparquet'
22
import { AsyncBuffer, parquetQuery, parquetRead, parquetReadObjects } from 'hyparquet'
33
import { compressors } from 'hyparquet-compressors'
4-
import type { ChunkMessage, ClientMessage, CompleteMessage, PageMessage, ParquetQueryResolveMessage, ParquetReadObjectsResolveMessage, ParquetReadResolveMessage, RejectMessage } from './types.js'
4+
import type { ChunkMessage, ClientMessage, CompleteMessage, PageMessage, ParquetQueryResolveMessage, ParquetReadObjectsResolveMessage, ParquetReadResolveMessage, RejectMessage, Rows } from './types.js'
55
import { fromToAsyncBuffer } from './utils.js'
66

77
const cache = new Map<string, Promise<AsyncBuffer>>()
@@ -33,20 +33,20 @@ self.onmessage = async ({ data }: { data: ClientMessage }) => {
3333
const file = await fromToAsyncBuffer(from, cache)
3434
try {
3535
if (kind === 'parquetReadObjects') {
36-
const rows = await parquetReadObjects({ ...options, file, compressors, onChunk, onPage })
36+
const rows = (await parquetReadObjects({ ...options, rowFormat: 'object', file, compressors, onChunk, onPage })) as Rows
3737
postParquetReadObjectsResultMessage({ queryId, rows })
3838
} else if (kind === 'parquetQuery') {
3939
const rows = await parquetQuery({ ...options, file, compressors, onChunk, onPage })
4040
postParquetQueryResultMessage({ queryId, rows })
4141
} else {
42-
await parquetRead({ ...options, file, compressors, onComplete, onChunk, onPage })
42+
await parquetRead({ ...options, rowFormat: 'object', file, compressors, onComplete, onChunk, onPage })
4343
postParquetReadResultMessage({ queryId })
4444
}
4545
} catch (error) {
4646
postErrorMessage({ error: error as Error, queryId })
4747
}
4848

49-
function onComplete(rows: unknown[][] | Record<string, unknown>[]) {
49+
function onComplete(rows: Rows) {
5050
postCompleteMessage({ queryId, rows })
5151
}
5252
function onChunk(chunk: ColumnData) {

src/lib/workers/parquetWorkerClient.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type { ClientMessage, ParquetQueryWorkerOptions, ParquetReadObjectsWorker
77
let worker: Worker | undefined
88
let nextQueryId = 0
99
interface Agent {
10-
onComplete?: (rows: Rows) => void
10+
onComplete?: ((rows: Rows) => void)
1111
onChunk?: (chunk: ColumnData) => void
1212
onPage?: (page: ColumnData) => void
1313
reject: (error: Error) => void
@@ -73,6 +73,8 @@ function getWorker() {
7373
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
7474
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
7575
* the default parsers.
76+
*
77+
* Note that it only supports 'rowFormat: object' (the default).
7678
*/
7779
export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<void> {
7880
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options
@@ -91,6 +93,8 @@ export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<vo
9193
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
9294
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
9395
* the default parsers.
96+
*
97+
* Note that it only supports 'rowFormat: object' (the default).
9498
*/
9599
export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOptions): Promise<Rows> {
96100
const { onChunk, onPage, from, ...serializableOptions } = options
@@ -109,6 +113,8 @@ export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOption
109113
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
110114
* to be serialized to the worker. Also: the worker uses hyparquet-compressors and
111115
* the default parsers.
116+
*
117+
* Note that it only supports 'rowFormat: object' (the default).
112118
*/
113119
export function parquetQueryWorker(options: ParquetQueryWorkerOptions): Promise<Rows> {
114120
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options

src/lib/workers/types.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ interface AsyncBufferFromUrl {
1616
}
1717
export type AsyncBufferFrom = AsyncBufferFromFile | AsyncBufferFromUrl
1818

19-
export type Rows = unknown[][] | Record<string, unknown>[]
19+
// Only rowFormat 'object' is supported in the worker
20+
export type Rows = Record<string, unknown>[]
2021

2122
/**
2223
* Options for the worker version of parquetRead
@@ -25,9 +26,11 @@ export type Rows = unknown[][] | Record<string, unknown>[]
2526
* - 'compressors' are not configurable, the worker uses hyparquet-compressors
2627
* - 'parsers' are not configurable, the worker uses the default parsers
2728
*/
28-
export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'compressors' | 'parsers' | 'file' | 'onComplete'> {
29-
onComplete?: (rows: Rows) => void // fix for https://github.com/hyparam/hyparquet/issues/28
29+
export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'compressors' | 'parsers' | 'file' | 'rowFormat' | 'onComplete'> {
3030
from: AsyncBufferFrom
31+
// rowFormat 'array' is not supported in the worker.
32+
rowFormat?: 'object'
33+
onComplete?: (rows: Rows) => void
3134
}
3235
/**
3336
* Options for the worker version of parquetReadObjects

0 commit comments

Comments
 (0)