Skip to content

Commit 2d9c1f2

Browse files
committed
implement multi column sort
1 parent a4d0998 commit 2d9c1f2

File tree

4 files changed

+147
-76
lines changed

4 files changed

+147
-76
lines changed

src/lib/tableProvider.ts

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,41 @@
1-
import { DataFrame, ResolvableRow, resolvableRow } from 'hightable'
1+
import { DataFrame, OrderBy, ResolvableRow, resolvableRow } from 'hightable'
22
import { FileMetaData, parquetSchema } from 'hyparquet'
3-
import { parquetQueryWorker, parquetSortIndexWorker } from './workers/parquetWorkerClient.js'
3+
import { parquetColumnRanksWorker, parquetQueryWorker } from './workers/parquetWorkerClient.js'
44
import type { AsyncBufferFrom } from './workers/types.d.ts'
55

6+
/*
7+
* sortIndex[0] gives the index of the first row in the sorted table
8+
*/
9+
export function computeSortIndex(orderByRanks: { direction: 'ascending' | 'descending', ranks: number[] }[]): number[] {
10+
if (!(0 in orderByRanks)) {
11+
throw new Error('orderByRanks should have at least one element')
12+
}
13+
const numRows = orderByRanks[0].ranks.length
14+
return Array
15+
.from({ length: numRows }, (_, i) => i)
16+
.sort((a, b) => {
17+
for (const { direction, ranks } of orderByRanks) {
18+
const rankA = ranks[a]
19+
const rankB = ranks[b]
20+
if (rankA === undefined || rankB === undefined) {
21+
throw new Error('Invalid ranks')
22+
}
23+
const value = direction === 'ascending' ? 1 : -1
24+
if (rankA < rankB) return -value
25+
if (rankA > rankB) return value
26+
}
27+
return 0
28+
})
29+
}
30+
631
/**
732
* Convert a parquet file into a dataframe.
833
*/
934
export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): DataFrame {
1035
const { children } = parquetSchema(metadata)
1136
const header = children.map(child => child.element.name)
1237
const sortCache = new Map<string, Promise<number[]>>()
38+
const columnRanksCache = new Map<string, Promise<number[]>>()
1339
const data = new Array<ResolvableRow | undefined>(Number(metadata.num_rows))
1440
const groups = new Array(metadata.row_groups.length).fill(false)
1541
let groupStart = 0
@@ -34,7 +60,8 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
3460
throw new Error(`Missing data row for index ${i}`)
3561
}
3662
dataRow.index.resolve(i)
37-
const row = groupData[i - rowStart]
63+
const j = i - rowStart
64+
const row = groupData[j]
3865
if (row === undefined) {
3966
throw new Error(`Missing row in groupData for index: ${i - rowStart}`)
4067
}
@@ -54,20 +81,33 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
5481
}
5582
}
5683

57-
function getSortIndex(orderBy: string) {
58-
let sortIndex = sortCache.get(orderBy)
84+
function getColumnRanks(column: string): Promise<number[]> {
85+
let columnRanks = columnRanksCache.get(column)
86+
if (!columnRanks) {
87+
columnRanks = parquetColumnRanksWorker({ from, metadata, column })
88+
columnRanksCache.set(column, columnRanks)
89+
}
90+
return columnRanks
91+
}
92+
93+
function getSortIndex(orderBy: OrderBy): Promise<number[]> {
94+
const orderByKey = JSON.stringify(orderBy)
95+
let sortIndex = sortCache.get(orderByKey)
5996
if (!sortIndex) {
60-
sortIndex = parquetSortIndexWorker({ from, metadata, orderBy })
61-
sortCache.set(orderBy, sortIndex)
97+
const orderByRanksPromise = Promise.all(
98+
orderBy.map(({ column, direction }) => getColumnRanks(column).then(ranks => ({ direction, ranks })))
99+
)
100+
sortIndex = orderByRanksPromise.then(orderByRanks => computeSortIndex(orderByRanks))
101+
sortCache.set(orderByKey, sortIndex)
62102
}
63103
return sortIndex
64104
}
65105

66106
return {
67107
header,
68108
numRows: Number(metadata.num_rows),
69-
rows({ start, end, orderBy }: { start: number, end: number, orderBy?: string}) {
70-
if (orderBy) {
109+
rows({ start, end, orderBy }: { start: number, end: number, orderBy?: OrderBy}) {
110+
if (orderBy?.length) {
71111
const numRows = end - start
72112
const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header))
73113

src/lib/workers/parquetWorker.ts

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { ColumnData, parquetQuery } from 'hyparquet'
22
import { compressors } from 'hyparquet-compressors'
33
import { asyncBufferFrom } from '../utils.js'
4-
import type { ChunkMessage, ErrorMessage, IndicesMessage, ParquetReadWorkerOptions, ResultMessage } from './types.js'
4+
import type { ChunkMessage, ClientMessage, ColumnRanksMessage, ErrorMessage, ResultMessage } from './types.js'
55

66
function postChunkMessage ({ chunk, queryId }: ChunkMessage) {
77
self.postMessage({ chunk, queryId })
@@ -12,35 +12,51 @@ function postResultMessage ({ result, queryId }: ResultMessage) {
1212
function postErrorMessage ({ error, queryId }: ErrorMessage) {
1313
self.postMessage({ error, queryId })
1414
}
15-
function postIndicesMessage ({ indices, queryId }: IndicesMessage) {
16-
self.postMessage({ indices, queryId })
15+
function postColumnRanksMessage ({ columnRanks, queryId }: ColumnRanksMessage) {
16+
self.postMessage({ columnRanks, queryId })
1717
}
1818

19-
self.onmessage = async ({ data }: {
20-
data: ParquetReadWorkerOptions & { queryId: number; chunks: boolean };
21-
}) => {
22-
const { metadata, from, rowStart, rowEnd, orderBy, columns, queryId, chunks, sortIndex } = data
19+
self.onmessage = async ({ data }: { data: ClientMessage }) => {
20+
const { metadata, from, kind, queryId } = data
2321
const file = await asyncBufferFrom(from)
24-
if (sortIndex === undefined) {
25-
const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined
22+
if (kind === 'columnRanks') {
23+
const { column } = data
24+
// return the column ranks in ascending order
25+
// we can get the descending order replacing the rank with numRows - rank - 1. It's not exactly the rank of
26+
// the descending order, because the rank is the first, not the last, of the ties. But it's enough for the
27+
// purpose of sorting.
28+
29+
// TODO(SL): ensure only the expected column is fetched
2630
try {
27-
const result = await parquetQuery({ metadata, file, rowStart, rowEnd, orderBy, columns, compressors, onChunk })
28-
postResultMessage({ result, queryId })
31+
const sortColumn = await parquetQuery({ metadata, file, columns: [column], compressors })
32+
const valuesWithIndex = sortColumn.map((row, index) => ({ value: row[column] as unknown, index }))
33+
const sortedValuesWithIndex = Array.from(valuesWithIndex).sort(({ value: a }, { value: b }) => compare<unknown>(a, b))
34+
const numRows = sortedValuesWithIndex.length
35+
const columnRanks = sortedValuesWithIndex.reduce((accumulator, currentValue, rank) => {
36+
const { lastValue, lastRank, ranks } = accumulator
37+
const { value, index } = currentValue
38+
if (value === lastValue) {
39+
ranks[index] = lastRank
40+
return { ranks, lastValue, lastRank }
41+
} else {
42+
ranks[index] = rank
43+
return { ranks, lastValue: value, lastRank: rank }
44+
}
45+
}, {
46+
ranks: Array(numRows).fill(-1) as number[],
47+
lastValue: undefined as unknown,
48+
lastRank: 0,
49+
}).ranks
50+
postColumnRanksMessage({ columnRanks: columnRanks, queryId })
2951
} catch (error) {
3052
postErrorMessage({ error: error as Error, queryId })
3153
}
3254
} else {
55+
const { rowStart, rowEnd, chunks } = data
56+
const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined
3357
try {
34-
// Special case for sorted index
35-
if (orderBy === undefined)
36-
throw new Error('sortParquetWorker requires orderBy')
37-
if (rowStart !== undefined || rowEnd !== undefined)
38-
throw new Error('sortIndex requires all rows')
39-
const sortColumn = await parquetQuery({ metadata, file, columns: [orderBy], compressors })
40-
const indices = Array.from(sortColumn, (_, index) => index).sort((a, b) =>
41-
compare<unknown>(sortColumn[a]?.[orderBy], sortColumn[b]?.[orderBy])
42-
)
43-
postIndicesMessage({ indices, queryId })
58+
const result = await parquetQuery({ metadata, file, rowStart, rowEnd, compressors, onChunk })
59+
postResultMessage({ result, queryId })
4460
} catch (error) {
4561
postErrorMessage({ error: error as Error, queryId })
4662
}

src/lib/workers/parquetWorkerClient.ts

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,40 @@ import ParquetWorker from './parquetWorker?worker&inline'
22
/// ^ the worker is bundled with the main thread code (inline) which is easier for users to import
33
/// (no need to copy the worker file to the right place)
44
import { ColumnData } from 'hyparquet'
5-
import type { ParquetMessage, ParquetReadWorkerOptions, ParquetSortIndexOptions, Row, SortParquetMessage } from './types.js'
5+
import type { Cells, ColumnRanksClientMessage, ColumnRanksWorkerMessage, ColumnRanksWorkerOptions, QueryClientMessage, QueryWorkerMessage, QueryWorkerOptions } from './types.js'
66

77
let worker: Worker | undefined
88
let nextQueryId = 0
9-
interface SortQueryAgent {
10-
kind: 'sortIndex';
11-
resolve: (value: number[]) => void;
12-
reject: (error: Error) => void;
13-
}
149
interface RowsQueryAgent {
1510
kind: 'query';
16-
resolve: (value: Row[]) => void;
11+
resolve: (value: Cells[]) => void;
1712
reject: (error: Error) => void;
1813
onChunk?: (chunk: ColumnData) => void;
1914
}
20-
type QueryAgent = SortQueryAgent | RowsQueryAgent
15+
interface ColumnRanksQueryAgent {
16+
kind: 'columnRanks';
17+
resolve: (value: number[]) => void;
18+
reject: (error: Error) => void;
19+
}
20+
type QueryAgent = RowsQueryAgent | ColumnRanksQueryAgent
2121

2222
const pending = new Map<number, QueryAgent>()
2323

2424
function getWorker() {
2525
if (!worker) {
2626
worker = new ParquetWorker()
27-
worker.onmessage = ({ data }: { data: ParquetMessage | SortParquetMessage }) => {
27+
worker.onmessage = ({ data }: { data: QueryWorkerMessage | ColumnRanksWorkerMessage }) => {
2828
const pendingQueryAgent = pending.get(data.queryId)
2929
if (!pendingQueryAgent) {
3030
console.warn(
3131
`Unexpected: no pending promise found for queryId: ${data.queryId.toString()}`
3232
)
3333
return
3434
}
35+
3536
if (pendingQueryAgent.kind === 'query') {
36-
const { resolve, reject, onChunk } = pendingQueryAgent
37+
const { resolve, reject } = pendingQueryAgent
38+
const { onChunk } = pendingQueryAgent
3739
if ('error' in data) {
3840
reject(data.error)
3941
} else if ('result' in data) {
@@ -43,15 +45,16 @@ function getWorker() {
4345
} else {
4446
reject(new Error('Unexpected message from worker'))
4547
}
48+
return
49+
}
50+
51+
const { resolve, reject } = pendingQueryAgent
52+
if ('error' in data) {
53+
reject(data.error)
54+
} else if ('columnRanks' in data) {
55+
resolve(data.columnRanks)
4656
} else {
47-
const { resolve, reject } = pendingQueryAgent
48-
if ('error' in data) {
49-
reject(data.error)
50-
} else if ('indices' in data) {
51-
resolve(data.indices)
52-
} else {
53-
reject(new Error('Unexpected message from worker'))
54-
}
57+
reject(new Error('Unexpected message from worker'))
5558
}
5659
}
5760
}
@@ -64,25 +67,26 @@ function getWorker() {
6467
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
6568
* to be serialized to the worker.
6669
*/
67-
export function parquetQueryWorker({ metadata, from, rowStart, rowEnd, orderBy, onChunk }: ParquetReadWorkerOptions): Promise<Row[]> {
70+
export function parquetQueryWorker({ metadata, from, rowStart, rowEnd, onChunk }: QueryWorkerOptions): Promise<Cells[]> {
71+
// TODO(SL) Support passing columns?
6872
return new Promise((resolve, reject) => {
6973
const queryId = nextQueryId++
7074
pending.set(queryId, { kind: 'query', resolve, reject, onChunk })
7175
const worker = getWorker()
7276

7377
// If caller provided an onChunk callback, worker will send chunks as they are parsed
7478
const chunks = onChunk !== undefined
75-
worker.postMessage({ queryId, metadata, from, rowStart, rowEnd, orderBy, chunks })
79+
const message: QueryClientMessage = { queryId, metadata, from, rowStart, rowEnd, chunks, kind: 'query' }
80+
worker.postMessage(message)
7681
})
7782
}
7883

79-
export function parquetSortIndexWorker({ metadata, from, orderBy }: ParquetSortIndexOptions): Promise<number[]> {
84+
export function parquetColumnRanksWorker({ metadata, from, column }: ColumnRanksWorkerOptions): Promise<number[]> {
8085
return new Promise((resolve, reject) => {
8186
const queryId = nextQueryId++
82-
pending.set(queryId, { kind: 'sortIndex', resolve, reject })
87+
pending.set(queryId, { kind: 'columnRanks', resolve, reject })
8388
const worker = getWorker()
84-
worker.postMessage({
85-
queryId, metadata, from, orderBy, sortIndex: true,
86-
})
89+
const message: ColumnRanksClientMessage = { queryId, metadata, from, column, kind: 'columnRanks' }
90+
worker.postMessage(message)
8791
})
8892
}

src/lib/workers/types.ts

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ColumnData, FileMetaData, ParquetReadOptions } from 'hyparquet'
1+
import { ColumnData, FileMetaData } from 'hyparquet'
22

33
// Serializable constructors for AsyncBuffers
44
interface AsyncBufferFromFile {
@@ -11,37 +11,48 @@ interface AsyncBufferFromUrl {
1111
requestInit?: RequestInit
1212
}
1313
export type AsyncBufferFrom = AsyncBufferFromFile | AsyncBufferFromUrl
14+
// Cells is defined in hightable, but uses any, not unknown
15+
export type Cells = Record<string, unknown> ;
1416

15-
// Same as ParquetReadOptions, but AsyncBufferFrom instead of AsyncBuffer
16-
export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'file'> {
17+
export interface CommonWorkerOptions {
18+
metadata: FileMetaData,
1719
from: AsyncBufferFrom
18-
orderBy?: string
19-
sortIndex?: boolean
2020
}
21-
// Row is defined in hightable, but not exported + we change any to unknown
22-
export type Row = Record<string, unknown> ;
23-
2421
interface Message {
2522
queryId: number
2623
}
24+
export interface ErrorMessage extends Message {
25+
error: Error
26+
}
27+
28+
/* Query worker */
29+
export interface QueryWorkerOptions extends CommonWorkerOptions {
30+
rowStart?: number,
31+
rowEnd?: number,
32+
onChunk?: (chunk: ColumnData) => void
33+
}
34+
export interface QueryClientMessage extends QueryWorkerOptions, Message {
35+
kind: 'query',
36+
chunks?: boolean
37+
}
2738
export interface ChunkMessage extends Message {
2839
chunk: ColumnData
2940
}
3041
export interface ResultMessage extends Message {
31-
result: Row[]
42+
result: Cells[]
3243
}
33-
export interface IndicesMessage extends Message {
34-
indices: number[]
44+
export type QueryWorkerMessage = ChunkMessage | ResultMessage | ErrorMessage
45+
46+
/* ColumnRanks worker */
47+
export interface ColumnRanksWorkerOptions extends CommonWorkerOptions {
48+
column: string
3549
}
36-
export interface ErrorMessage extends Message {
37-
error: Error
50+
export interface ColumnRanksClientMessage extends ColumnRanksWorkerOptions, Message {
51+
kind: 'columnRanks'
3852
}
39-
40-
export type ParquetMessage = ChunkMessage | ResultMessage | ErrorMessage
41-
export type SortParquetMessage = IndicesMessage | ErrorMessage
42-
43-
export interface ParquetSortIndexOptions {
44-
metadata: FileMetaData
45-
from: AsyncBufferFrom
46-
orderBy: string
53+
export interface ColumnRanksMessage extends Message {
54+
columnRanks: number[]
4755
}
56+
export type ColumnRanksWorkerMessage = ColumnRanksMessage | ErrorMessage
57+
58+
export type ClientMessage = QueryClientMessage | ColumnRanksClientMessage

0 commit comments

Comments
 (0)