Skip to content

Commit 9a5ac88

Browse files
committed
copy files from hightable and remove dependency
(it was including React in the bundle)
1 parent 4d40980 commit 9a5ac88

File tree

6 files changed

+447
-2
lines changed

6 files changed

+447
-2
lines changed

packages/utils/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
"test": "vitest run"
3737
},
3838
"dependencies": {
39-
"hightable": "0.6.3",
4039
"hyparquet": "1.5.0",
4140
"hyparquet-compressors": "0.1.4"
4241
},

packages/utils/src/dataframe.ts

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
type WrappedPromise<T> = Promise<T> & {
2+
resolved?: T
3+
rejected?: Error
4+
}
5+
6+
/**
7+
* A row where each cell is a promise.
8+
* The promise must be wrapped with `wrapPromise` so that HighTable can render
9+
* the state synchronously.
10+
*/
11+
export type AsyncRow = Record<string, WrappedPromise<any>>
12+
13+
/**
14+
* A row where each cell is a resolved value.
15+
*/
16+
export type Row = Record<string, any>
17+
18+
/**
19+
* Streamable row data
20+
*/
21+
export interface DataFrame {
22+
header: string[]
23+
numRows: number
24+
// Rows are 0-indexed, excludes the header, end is exclusive
25+
rows(start: number, end: number, orderBy?: string): AsyncRow[] | Promise<Row[]>
26+
sortable?: boolean
27+
}
28+
29+
export function resolvableRow(header: string[]): { [key: string]: ResolvablePromise<any> } {
30+
return Object.fromEntries(header.map(key => [key, resolvablePromise<any>()]))
31+
}
32+
33+
/**
34+
* Helper method to wrap future rows into AsyncRows.
35+
* Helpful when you want to define a DataFrame with simple async fetching of rows.
36+
* This function turns future data into a "grid" of wrapped promises.
37+
*/
38+
export function asyncRows(rows: AsyncRow[] | Promise<Row[]>, numRows: number, header: string[]): AsyncRow[] {
39+
if (Array.isArray(rows)) return rows
40+
// Make grid of resolvable promises
41+
const wrapped = new Array(numRows).fill(null).map(_ => resolvableRow(header))
42+
rows.then(rows => {
43+
if (rows.length !== numRows) {
44+
console.warn(`Expected ${numRows} rows, got ${rows.length}`)
45+
}
46+
for (let i = 0; i < rows.length; i++) {
47+
const row = rows[i]
48+
for (const key of header) {
49+
wrapped[i][key].resolve(row[key])
50+
}
51+
}
52+
}).catch(error => {
53+
// Reject all promises on error
54+
for (let i = 0; i < numRows; i++) {
55+
for (const key of header) {
56+
wrapped[i][key].reject(error)
57+
}
58+
}
59+
})
60+
return wrapped
61+
}
62+
63+
/**
64+
* Wrap a promise to save the resolved value and error.
65+
* Note: you can't await on a WrappedPromise, you must use then.
66+
*/
67+
export function wrapPromise<T>(promise: Promise<T> | T): WrappedPromise<T> {
68+
if (!(promise instanceof Promise)) {
69+
promise = Promise.resolve(promise)
70+
}
71+
const wrapped: WrappedPromise<T> = promise.then(resolved => {
72+
wrapped.resolved = resolved
73+
return resolved
74+
}).catch(rejected => {
75+
wrapped.rejected = rejected
76+
throw rejected
77+
})
78+
return wrapped
79+
}
80+
81+
export type ResolvablePromise<T> = Promise<T> & {
82+
resolve: (value: T) => void
83+
reject: (error: Error) => void
84+
}
85+
86+
/**
87+
* Create a promise that can be resolved or rejected later.
88+
*/
89+
export function resolvablePromise<T>(): ResolvablePromise<T> {
90+
let resolve: (value: T) => void
91+
let reject: (error: Error) => void
92+
const promise = wrapPromise(new Promise<T>((res, rej) => {
93+
resolve = res
94+
reject = rej
95+
})) as ResolvablePromise<T>
96+
promise.resolve = resolve!
97+
promise.reject = reject!
98+
return promise
99+
}
100+
101+
/**
102+
* Wraps a DataFrame to make it sortable.
103+
* Requires fetching all rows to sort.
104+
*/
105+
export function sortableDataFrame(data: DataFrame): DataFrame {
106+
if (data.sortable) return data // already sortable
107+
// Fetch all rows and add __index__ column
108+
let all: Promise<Row[]>
109+
return {
110+
...data,
111+
rows(start: number, end: number, orderBy?: string): AsyncRow[] | Promise<Row[]> {
112+
if (orderBy) {
113+
if (!data.header.includes(orderBy)) {
114+
throw new Error(`Invalid orderBy field: ${orderBy}`)
115+
}
116+
if (!all) {
117+
// Fetch all rows and add __index__ column
118+
all = awaitRows(data.rows(0, data.numRows))
119+
.then(rows => rows.map((row, i) => ({ __index__: i, ...row })))
120+
}
121+
const sorted = all.then(all => {
122+
return all.sort((a, b) => {
123+
if (a[orderBy] < b[orderBy]) return -1
124+
if (a[orderBy] > b[orderBy]) return 1
125+
return 0
126+
}).slice(start, end)
127+
})
128+
return sorted
129+
} else {
130+
return data.rows(start, end)
131+
}
132+
},
133+
sortable: true,
134+
}
135+
}
136+
137+
/**
138+
* Await all promises in an AsyncRow and return resolved row.
139+
*/
140+
export function awaitRow(row: AsyncRow): Promise<Row> {
141+
return Promise.all(Object.values(row))
142+
.then(values => Object.fromEntries(Object.keys(row).map((key, i) => [key, values[i]])))
143+
}
144+
145+
/**
146+
* Await all promises in list of AsyncRows and return resolved rows.
147+
*/
148+
export function awaitRows(rows: AsyncRow[] | Promise<Row[]>): Promise<Row[]> {
149+
if (rows instanceof Promise) return rows
150+
return Promise.all(rows.map(awaitRow))
151+
}
152+
153+
export function arrayDataFrame(data: Row[]): DataFrame {
154+
if (!data.length) return { header: [], numRows: 0, rows: () => Promise.resolve([]) }
155+
return {
156+
header: Object.keys(data[0]),
157+
numRows: data.length,
158+
rows(start: number, end: number): Promise<Row[]> {
159+
return Promise.resolve(data.slice(start, end))
160+
},
161+
}
162+
}

packages/utils/src/rowCache.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { AsyncRow, DataFrame, asyncRows } from './dataframe.js'
2+
3+
/**
4+
* Wrap a dataframe with cached rows.
5+
*
6+
* @param df async dataframe to wrap
7+
*/
8+
export function rowCache(df: DataFrame): DataFrame {
9+
// Row cache is stored as a sorted array of RowGroups, per sort order
10+
const caches: {[key: string]: AsyncRow[]} = {}
11+
12+
let hits = 0
13+
let misses = 0
14+
15+
return {
16+
...df,
17+
rows(start: number, end: number, orderBy?: string): AsyncRow[] {
18+
// Cache per sort order
19+
const cache = caches[orderBy || ''] ||= new Array(df.numRows)
20+
const n = hits + misses
21+
if (n && !(n % 10)) {
22+
console.log(`Cache hits: ${hits} / ${hits + misses} (${(100 * hits / (hits + misses)).toFixed(1)}%)`)
23+
}
24+
25+
// Fetch missing rows in contiguous blocks
26+
let blockStart: number | undefined
27+
let hasCacheMiss = false
28+
for (let i = start; i <= end; i++) {
29+
if (i < end && !cache[i]) {
30+
if (blockStart === undefined) {
31+
blockStart = i
32+
}
33+
} else if (blockStart !== undefined) {
34+
const blockEnd = i
35+
const numRows = blockEnd - blockStart
36+
const futureRows = asyncRows(df.rows(blockStart, blockEnd, orderBy), numRows, df.header)
37+
for (let j = 0; j < blockEnd - blockStart; j++) {
38+
cache[blockStart + j] = futureRows[j]
39+
}
40+
blockStart = undefined
41+
hasCacheMiss = true
42+
}
43+
}
44+
if (hasCacheMiss) misses++
45+
else hits++
46+
47+
return cache.slice(start, end)
48+
},
49+
}
50+
}

packages/utils/src/tableProvider.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { DataFrame, ResolvablePromise, resolvablePromise } from 'hightable'
21
import { FileMetaData, parquetSchema } from 'hyparquet'
2+
import { DataFrame, ResolvablePromise, resolvablePromise } from './dataframe.js'
33
import { parquetQueryWorker, parquetSortIndexWorker } from './workers/parquetWorkerClient.js'
44
import type { AsyncBufferFrom } from './workers/types.d.ts'
55

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import { describe, expect, it } from 'vitest'
2+
import {
3+
DataFrame, Row, arrayDataFrame, awaitRows, resolvablePromise, sortableDataFrame, wrapPromise,
4+
} from '../src/dataframe.js'
5+
6+
function wrapObject(obj: Record<string, any>): Row {
7+
return Object.fromEntries(
8+
Object.entries(obj).map(([key, value]) => [key, wrapPromise(value)])
9+
)
10+
}
11+
12+
describe('resolvablePromise', () => {
13+
it('should resolve with a value', async () => {
14+
const promise = resolvablePromise<number>()
15+
promise.resolve(42)
16+
expect(await promise).toBe(42)
17+
})
18+
it('should reject with an error', async () => {
19+
const promise = resolvablePromise<number>()
20+
promise.reject(new Error('Failed'))
21+
await expect(promise).rejects.toThrow('Failed')
22+
})
23+
})
24+
25+
describe('awaitRows', () => {
26+
it('should resolve with a row', async () => {
27+
const row = wrapObject({ id: 1, name: 'Alice', age: 30, __index__: 0 })
28+
const result = await awaitRows([row])
29+
expect(result).toEqual([{ id: 1, name: 'Alice', age: 30, __index__: 0 }])
30+
})
31+
})
32+
33+
describe('sortableDataFrame', () => {
34+
const data = [
35+
{ id: 3, name: 'Charlie', age: 25 },
36+
{ id: 1, name: 'Alice', age: 30 },
37+
{ id: 2, name: 'Bob', age: 20 },
38+
{ id: 4, name: 'Dani', age: 20 },
39+
]
40+
41+
const dataFrame: DataFrame = {
42+
header: ['id', 'name', 'age'],
43+
numRows: data.length,
44+
rows(start: number, end: number): Row[] {
45+
// Return the slice of data between start and end indices
46+
return data.slice(start, end).map(wrapObject)
47+
},
48+
sortable: false,
49+
}
50+
51+
const sortableDf = sortableDataFrame(dataFrame)
52+
53+
it('should set sortable to true', () => {
54+
expect(sortableDf.sortable).toBe(true)
55+
})
56+
57+
it('should preserve header and numRows', () => {
58+
expect(sortableDf.header).toEqual(dataFrame.header)
59+
expect(sortableDf.numRows).toBe(dataFrame.numRows)
60+
})
61+
62+
it('should return unsorted data when orderBy is not provided', async () => {
63+
const rows = await awaitRows(sortableDf.rows(0, 3))
64+
expect(rows).toEqual([
65+
{ id: 3, name: 'Charlie', age: 25 },
66+
{ id: 1, name: 'Alice', age: 30 },
67+
{ id: 2, name: 'Bob', age: 20 },
68+
])
69+
})
70+
71+
it('should return data sorted by column "age"', async () => {
72+
const rows = await awaitRows(sortableDf.rows(0, 4, 'age'))
73+
expect(rows).toEqual([
74+
{ id: 2, name: 'Bob', age: 20, __index__: 2 },
75+
{ id: 4, name: 'Dani', age: 20, __index__: 3 },
76+
{ id: 3, name: 'Charlie', age: 25, __index__: 0 },
77+
{ id: 1, name: 'Alice', age: 30, __index__: 1 },
78+
])
79+
})
80+
81+
it('should slice the sorted data correctly', async () => {
82+
const rows = await awaitRows(sortableDf.rows(1, 3, 'id'))
83+
expect(rows).toEqual([
84+
{ id: 2, name: 'Bob', age: 20, __index__: 2 },
85+
{ id: 3, name: 'Charlie', age: 25, __index__: 0 },
86+
])
87+
})
88+
89+
it('returns self if already sortable', () => {
90+
const sortableDf2 = sortableDataFrame(sortableDf)
91+
expect(sortableDf2).toBe(sortableDf)
92+
})
93+
94+
it('should throw for invalid orderBy field', () => {
95+
expect(() => sortableDf.rows(0, 3, 'invalid'))
96+
.toThrowError('Invalid orderBy field: invalid')
97+
})
98+
})
99+
100+
describe('arrayDataFrame', () => {
101+
const testData = [
102+
{ id: 1, name: 'Alice', age: 30 },
103+
{ id: 2, name: 'Bob', age: 25 },
104+
{ id: 3, name: 'Charlie', age: 35 },
105+
]
106+
107+
it('should create a DataFrame with correct header and numRows', () => {
108+
const df = arrayDataFrame(testData)
109+
expect(df.header).toEqual(['id', 'name', 'age'])
110+
expect(df.numRows).toBe(3)
111+
})
112+
113+
it('should handle empty data array', () => {
114+
const df = arrayDataFrame([])
115+
expect(df.header).toEqual([])
116+
expect(df.numRows).toBe(0)
117+
expect(df.rows(0, 1)).resolves.toEqual([])
118+
})
119+
120+
it('should return correct rows for given range', async () => {
121+
const df = arrayDataFrame(testData)
122+
const rows = await df.rows(0, 2)
123+
expect(rows).toEqual([
124+
{ id: 1, name: 'Alice', age: 30 },
125+
{ id: 2, name: 'Bob', age: 25 },
126+
])
127+
})
128+
129+
it('should handle start index equal to end index', async () => {
130+
const df = arrayDataFrame(testData)
131+
const rows = await df.rows(1, 1)
132+
expect(rows).toEqual([])
133+
})
134+
135+
it('should return all rows when end index exceeds array length', async () => {
136+
const df = arrayDataFrame(testData)
137+
const rows = await df.rows(0, 10)
138+
expect(rows).toEqual(testData)
139+
})
140+
})

0 commit comments

Comments
 (0)