Skip to content

Commit bedb991

Browse files
committed
Parquet aware table provider
1 parent d1cffb6 commit bedb991

File tree

9 files changed

+151
-43
lines changed

9 files changed

+151
-43
lines changed

public/build/app.min.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

public/build/app.min.js.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

public/build/worker.min.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

public/build/worker.min.js.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/cli.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ if (arg === 'chat') {
2424
} else if (stat.isFile()) {
2525
const parent = path.split('/').slice(0, -1).join('/')
2626
const key = path.split('/').pop()
27-
console.log('WTF1', parent, key)
2827
serve(parent, key)
2928
}
3029
}).catch(() => {

src/components/viewers/ParquetView.tsx

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import React, { useCallback, useEffect, useState } from 'react'
33
import { parquetDataFrame } from '../../tableProvider.js'
44
import { Spinner } from '../Layout.js'
55
import ContentHeader from './ContentHeader.js'
6-
import { asyncBufferFromUrl, FileMetaData, parquetMetadataAsync } from 'hyparquet'
6+
import { asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet'
77

88
enum LoadingState {
99
NotLoaded,
@@ -28,7 +28,6 @@ interface Content {
2828
export default function ParquetView({ file, setProgress, setError }: ViewerProps) {
2929
const [loading, setLoading] = useState<LoadingState>(LoadingState.NotLoaded)
3030
const [content, setContent] = useState<Content>()
31-
const [metadata, setMetadata] = useState<FileMetaData>()
3231

3332
const isUrl = file.startsWith('http://') || file.startsWith('https://')
3433
const url = isUrl ? file : '/api/store/get?key=' + file
@@ -41,8 +40,7 @@ export default function ParquetView({ file, setProgress, setError }: ViewerProps
4140
const from = { url, byteLength: asyncBuffer.byteLength }
4241
setProgress(0.66)
4342
const metadata = await parquetMetadataAsync(asyncBuffer)
44-
setMetadata(metadata)
45-
let dataframe = await parquetDataFrame(from, metadata)
43+
let dataframe = parquetDataFrame(from, metadata)
4644
dataframe = rowCache(dataframe)
4745
const fileSize = asyncBuffer.byteLength
4846
setContent({ dataframe, fileSize })

src/tableProvider.ts

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,82 @@
1-
import type { DataFrame } from 'hightable'
1+
import { DataFrame, resolvablePromise } from 'hightable'
22
import { FileMetaData, parquetSchema } from 'hyparquet'
3-
import { AsyncBufferFrom, parquetQueryWorker } from './workers/parquetWorkerClient.js'
3+
import { AsyncBufferFrom, parquetQueryWorker, parquetSortIndexWorker } from './workers/parquetWorkerClient.js'
44

55
/**
66
* Convert a parquet file into a dataframe.
77
*/
88
export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): DataFrame {
99
const { children } = parquetSchema(metadata)
10+
const header = children.map(child => child.element.name)
11+
const sortCache = new Map<string, Promise<number[]>>()
12+
const data = new Array(Number(metadata.num_rows))
13+
const groups = new Array(metadata.row_groups.length).fill(false)
14+
let groupStart = 0
15+
const groupEnds = metadata.row_groups.map(group => groupStart += Number(group.num_rows))
16+
17+
function fetchRowGroup(groupIndex: number) {
18+
if (!groups[groupIndex]) {
19+
const rowStart = groupEnds[groupIndex - 1] || 0
20+
const rowEnd = groupEnds[groupIndex]
21+
// Initialize with resolvable promises
22+
for (let i = rowStart; i < rowEnd; i++) {
23+
data[i] = Object.fromEntries(header.map(key => [key, resolvablePromise()]))
24+
}
25+
parquetQueryWorker({ from, metadata, rowStart, rowEnd }).then(groupData => {
26+
for (let i = rowStart; i < rowEnd; i++) {
27+
for (const [key, value] of Object.entries(groupData[i - rowStart])) {
28+
data[i][key].resolve(value)
29+
}
30+
}
31+
})
32+
groups[groupIndex] = true
33+
}
34+
}
35+
36+
function getSortIndex(orderBy: string) {
37+
let sortIndex = sortCache.get(orderBy)
38+
if (!sortIndex) {
39+
sortIndex = parquetSortIndexWorker({ from, metadata, orderBy })
40+
sortCache.set(orderBy, sortIndex)
41+
}
42+
return sortIndex
43+
}
44+
1045
return {
11-
header: children.map(child => child.element.name),
46+
header,
1247
numRows: Number(metadata.num_rows),
1348
rows(rowStart: number, rowEnd: number, orderBy?: string) {
14-
return parquetQueryWorker({ asyncBuffer: from, rowStart, rowEnd, orderBy })
49+
if (orderBy) {
50+
const numRows = rowEnd - rowStart
51+
const wrapped = new Array(numRows).fill(null)
52+
.map(() => Object.fromEntries(header.map(key => [key, resolvablePromise()])))
53+
54+
let sortIndex = getSortIndex(orderBy)
55+
sortIndex.then(indices => {
56+
// Compute row groups to fetch
57+
for (const index of indices.slice(rowStart, rowEnd)) {
58+
const groupIndex = groupEnds.findIndex(end => index < end)
59+
fetchRowGroup(groupIndex)
60+
}
61+
62+
// Re-assemble data in sorted order into wrapped
63+
for (let i = rowStart; i < rowEnd; i++) {
64+
for (const key of header) {
65+
data[indices[i]][key].then((value: any) => wrapped[i - rowStart][key].resolve(value))
66+
}
67+
}
68+
})
69+
70+
return wrapped
71+
} else {
72+
for (let i = 0; i < groups.length; i++) {
73+
const groupStart = groupEnds[i - 1] || 0
74+
if (rowStart < groupEnds[i] && rowEnd > groupStart) {
75+
fetchRowGroup(i)
76+
}
77+
}
78+
return data.slice(rowStart, rowEnd)
79+
}
1580
},
1681
sortable: true,
1782
}

src/workers/parquetWorker.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,28 @@
11
import { ColumnData, parquetQuery } from 'hyparquet'
22
import { compressors } from 'hyparquet-compressors'
3-
import { asyncBufferFrom } from './parquetWorkerClient.js'
3+
import { asyncBufferFrom, compare } from './parquetWorkerClient.js'
44

55
self.onmessage = async ({ data }) => {
6-
const { metadata, asyncBuffer, rowStart, rowEnd, orderBy, queryId, chunks } = data
7-
const file = await asyncBufferFrom(asyncBuffer)
6+
const { metadata, from, rowStart, rowEnd, orderBy, columns, queryId, chunks, sortIndex } = data
7+
const file = await asyncBufferFrom(from)
88
const onChunk = chunks ? (chunk: ColumnData) => self.postMessage({ chunk, queryId }) : undefined
99
try {
10-
const result = await parquetQuery({
11-
metadata, file, rowStart, rowEnd, orderBy, compressors, onChunk
12-
})
13-
self.postMessage({ result, queryId })
10+
if (sortIndex) {
11+
// Special case for sorted index
12+
if (orderBy === undefined) throw new Error('sortIndex requires orderBy')
13+
if (rowStart !== undefined || rowEnd !== undefined) throw new Error('sortIndex requires all rows')
14+
const sortColumn = await parquetQuery({
15+
metadata, file, columns: [orderBy], compressors
16+
})
17+
const result = Array.from(sortColumn, (_, index) => index)
18+
.sort((a, b) => compare(sortColumn[a][orderBy], sortColumn[b][orderBy]))
19+
self.postMessage({ result, queryId })
20+
} else {
21+
const result = await parquetQuery({
22+
metadata, file, rowStart, rowEnd, orderBy, columns, compressors, onChunk
23+
})
24+
self.postMessage({ result, queryId })
25+
}
1426
} catch (error) {
1527
self.postMessage({ error, queryId })
1628
}

src/workers/parquetWorkerClient.ts

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { asyncBufferFromUrl, cachedAsyncBuffer, AsyncBuffer, ParquetReadOptions } from 'hyparquet'
1+
import { asyncBufferFromUrl, cachedAsyncBuffer, AsyncBuffer, ParquetReadOptions, FileMetaData } from 'hyparquet'
22

33
// Serializable constructor for AsyncBuffers
44
export interface AsyncBufferFrom {
@@ -8,13 +8,38 @@ export interface AsyncBufferFrom {
88

99
// Same as ParquetReadOptions, but AsyncBufferFrom instead of AsyncBuffer
1010
interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'file'> {
11-
asyncBuffer: AsyncBufferFrom
11+
from: AsyncBufferFrom
1212
orderBy?: string
13+
sortIndex?: boolean
1314
}
1415

1516
let worker: Worker | undefined
1617
let nextQueryId = 0
17-
const pending = new Map<number, { resolve: (value: any) => void, reject: (error: any) => void }>()
18+
interface QueryAgent {
19+
resolve: (value: any) => void
20+
reject: (error: any) => void
21+
onChunk?: (chunk: any) => void
22+
}
23+
const pending = new Map<number, QueryAgent>()
24+
25+
function getWorker() {
26+
if (!worker) {
27+
worker = new Worker(new URL('worker.min.js', import.meta.url))
28+
worker.onmessage = ({ data }) => {
29+
const { resolve, reject, onChunk } = pending.get(data.queryId)!
30+
if (data.error) {
31+
reject(data.error)
32+
} else if (data.result) {
33+
resolve(data.result)
34+
} else if (data.chunk) {
35+
onChunk?.(data.chunk)
36+
} else {
37+
reject(new Error('Unexpected message from worker'))
38+
}
39+
}
40+
}
41+
return worker
42+
}
1843

1944
/**
2045
* Presents almost the same interface as parquetRead, but runs in a worker.
@@ -23,32 +48,35 @@ const pending = new Map<number, { resolve: (value: any) => void, reject: (error:
2348
* to be serialized to the worker.
2449
*/
2550
export function parquetQueryWorker(
26-
{ metadata, asyncBuffer, rowStart, rowEnd, orderBy, onChunk }: ParquetReadWorkerOptions
51+
{ metadata, from, rowStart, rowEnd, orderBy, onChunk }: ParquetReadWorkerOptions
2752
): Promise<Record<string, any>[]> {
2853
return new Promise((resolve, reject) => {
2954
const queryId = nextQueryId++
30-
pending.set(queryId, { resolve, reject })
31-
// Create a worker
32-
if (!worker) {
33-
worker = new Worker(new URL('worker.min.js', import.meta.url))
34-
worker.onmessage = ({ data }) => {
35-
const { resolve, reject } = pending.get(data.queryId)!
36-
// Convert postmessage data to callbacks
37-
if (data.error) {
38-
reject(data.error)
39-
} else if (data.result) {
40-
resolve(data.result)
41-
} else if (data.chunk) {
42-
onChunk?.(data.chunk)
43-
} else {
44-
reject(new Error('Unexpected message from worker'))
45-
}
46-
}
47-
}
55+
pending.set(queryId, { resolve, reject, onChunk })
56+
const worker = getWorker()
57+
4858
// If caller provided an onChunk callback, worker will send chunks as they are parsed
4959
const chunks = onChunk !== undefined
5060
worker.postMessage({
51-
queryId, metadata, asyncBuffer, rowStart, rowEnd, orderBy, chunks
61+
queryId, metadata, from, rowStart, rowEnd, orderBy, chunks
62+
})
63+
})
64+
}
65+
66+
interface ParquetSortIndexOptions {
67+
metadata: FileMetaData
68+
from: AsyncBufferFrom
69+
orderBy: string
70+
}
71+
72+
export function parquetSortIndexWorker({ metadata, from, orderBy }: ParquetSortIndexOptions): Promise<number[]> {
73+
return new Promise((resolve, reject) => {
74+
const queryId = nextQueryId++
75+
pending.set(queryId, { resolve, reject })
76+
const worker = getWorker()
77+
78+
worker.postMessage({
79+
queryId, metadata, from, orderBy, sortIndex: true
5280
})
5381
})
5482
}
@@ -65,3 +93,9 @@ export async function asyncBufferFrom(from: AsyncBufferFrom): Promise<AsyncBuffe
6593
return asyncBuffer
6694
}
6795
const cache = new Map<string, Promise<AsyncBuffer>>()
96+
97+
export function compare(a: any, b: any): number {
98+
if (a < b) return -1
99+
if (a > b) return 1
100+
return 1 // TODO: how to handle nulls?
101+
}

0 commit comments

Comments
 (0)