Skip to content

Commit d61c1f8

Browse files
committed
move cache code to worker + to types.d.ts
1 parent c536f49 commit d61c1f8

File tree

6 files changed

+129
-128
lines changed

6 files changed

+129
-128
lines changed

packages/components/src/lib/tableProvider.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { DataFrame, ResolvablePromise, resolvablePromise } from 'hightable'
22
import { FileMetaData, parquetSchema } from 'hyparquet'
3-
import { AsyncBufferFrom, parquetQueryWorker, parquetSortIndexWorker } from '../workers/parquetWorkerClient.ts'
3+
import { parquetQueryWorker, parquetSortIndexWorker } from '../workers/parquetWorkerClient.ts'
4+
import type { AsyncBufferFrom } from '../workers/types.d.ts'
45

56
type ResolvableRow = Record<string, ResolvablePromise<unknown>>;
67

packages/components/src/workers/parquetWorker.ts

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
import { ColumnData, parquetQuery } from 'hyparquet'
1+
import { AsyncBuffer, ColumnData, parquetQuery } from 'hyparquet'
22
import { compressors } from 'hyparquet-compressors'
3-
import {
3+
import { asyncBufferFromUrl } from '../lib/utils.ts'
4+
import type {
5+
AsyncBufferFrom,
46
ChunkMessage,
57
ErrorMessage,
68
IndicesMessage,
79
ParquetReadWorkerOptions,
810
ResultMessage,
9-
asyncBufferFrom,
10-
compare,
11-
} from './parquetWorkerClient.ts'
11+
} from './types.d.ts'
1212

1313
function postChunkMessage ({ chunk, queryId }: ChunkMessage) {
1414
self.postMessage({ chunk, queryId })
@@ -23,6 +23,9 @@ function postIndicesMessage ({ indices, queryId }: IndicesMessage) {
2323
self.postMessage({ indices, queryId })
2424
}
2525

26+
// Cache for AsyncBuffers
27+
const cache = new Map<string, Promise<AsyncBuffer>>()
28+
2629
self.onmessage = async ({
2730
data,
2831
}: {
@@ -83,3 +86,77 @@ self.onmessage = async ({
8386
}
8487
}
8588
}
89+
90+
function compare<T>(a: T, b: T): number {
91+
if (a < b) return -1
92+
if (a > b) return 1
93+
return 1 // TODO: how to handle nulls?
94+
}
95+
96+
/**
97+
* Convert AsyncBufferFrom to AsyncBuffer and cache results.
98+
*/
99+
function asyncBufferFrom(
100+
from: AsyncBufferFrom,
101+
): Promise<AsyncBuffer> {
102+
const key = JSON.stringify(from)
103+
const cached = cache.get(key)
104+
if (cached) return cached
105+
const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer)
106+
cache.set(key, asyncBuffer)
107+
return asyncBuffer
108+
}
109+
110+
type Awaitable<T> = T | Promise<T>;
111+
112+
function cachedAsyncBuffer(asyncBuffer: AsyncBuffer): AsyncBuffer {
113+
const cache = new Map<string, Awaitable<ArrayBuffer>>()
114+
const { byteLength } = asyncBuffer
115+
return {
116+
byteLength,
117+
/**
118+
* @param {number} start
119+
* @param {number} [end]
120+
* @returns {Awaitable<ArrayBuffer>}
121+
*/
122+
slice(start: number, end?: number): Awaitable<ArrayBuffer> {
123+
const key = cacheKey(start, end, byteLength)
124+
const cached = cache.get(key)
125+
if (cached) return cached
126+
// cache miss, read from file
127+
const promise = asyncBuffer.slice(start, end)
128+
cache.set(key, promise)
129+
return promise
130+
},
131+
}
132+
}
133+
134+
/**
135+
* Returns canonical cache key for a byte range 'start,end'.
136+
* Normalize int-range and suffix-range requests to the same key.
137+
*
138+
* @param {number} start start byte of range
139+
* @param {number} [end] end byte of range, or undefined for suffix range
140+
* @param {number} [size] size of file, or undefined for suffix range
141+
* @returns {string}
142+
*/
143+
function cacheKey(start: number, end?: number, size?: number): string {
144+
if (start < 0) {
145+
if (end !== undefined)
146+
throw new Error(
147+
`invalid suffix range [${start.toString()}, ${end.toString()}]`,
148+
)
149+
if (size === undefined) return `${start.toString()},`
150+
return `${(size + start).toString()},${size.toString()}`
151+
} else if (end !== undefined) {
152+
if (start > end)
153+
throw new Error(
154+
`invalid empty range [${start.toString()}, ${end.toString()}]`,
155+
)
156+
return `${start.toString()},${end.toString()}`
157+
} else if (size === undefined) {
158+
return `${start.toString()},`
159+
} else {
160+
return `${start.toString()},${size.toString()}`
161+
}
162+
}
Lines changed: 2 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,8 @@
11
import ParquetWorker from './parquetWorker?worker&inline'
22
/// ^ the worker is bundled with the main thread code (inline) which is easier for users to import
33
/// (no need to copy the worker file to the right place)
4-
import { AsyncBuffer, ColumnData, FileMetaData, ParquetReadOptions } from 'hyparquet'
5-
import { asyncBufferFromUrl } from '../lib/utils.ts'
6-
7-
// Serializable constructor for AsyncBuffers
8-
export interface AsyncBufferFrom {
9-
url: string
10-
byteLength: number
11-
headers?: Record<string, string>
12-
}
13-
// Same as ParquetReadOptions, but AsyncBufferFrom instead of AsyncBuffer
14-
export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'file'> {
15-
from: AsyncBufferFrom
16-
orderBy?: string
17-
sortIndex?: boolean
18-
}
19-
// Row is defined in hightable, but not exported + we change any to unknown
20-
export type Row = Record<string, unknown>;
21-
22-
interface Message {
23-
queryId: number
24-
}
25-
export interface ChunkMessage extends Message {
26-
chunk: ColumnData
27-
}
28-
export interface ResultMessage extends Message {
29-
result: Row[]
30-
}
31-
export interface IndicesMessage extends Message {
32-
indices: number[]
33-
}
34-
export interface ErrorMessage extends Message {
35-
error: Error
36-
}
37-
38-
export type ParquetMessage = ChunkMessage | ResultMessage | ErrorMessage
39-
export type SortParquetMessage = IndicesMessage | ErrorMessage
40-
41-
export interface ParquetSortIndexOptions {
42-
metadata: FileMetaData
43-
from: AsyncBufferFrom
44-
orderBy: string
45-
}
46-
4+
import { ColumnData } from 'hyparquet'
5+
import type { ParquetMessage, ParquetReadWorkerOptions, ParquetSortIndexOptions, Row, SortParquetMessage } from './types.d.ts'
476

487
let worker: Worker | undefined
498
let nextQueryId = 0
@@ -99,7 +58,6 @@ function getWorker() {
9958
return worker
10059
}
10160

102-
10361
/**
10462
* Presents almost the same interface as parquetRead, but runs in a worker.
10563
* This is useful for reading large parquet files without blocking the main thread.
@@ -143,79 +101,3 @@ export function parquetSortIndexWorker({ metadata, from, orderBy }: ParquetSortI
143101
})
144102
})
145103
}
146-
147-
/**
148-
* Convert AsyncBufferFrom to AsyncBuffer and cache results.
149-
*/
150-
export function asyncBufferFrom(
151-
from: AsyncBufferFrom,
152-
): Promise<AsyncBuffer> {
153-
const key = JSON.stringify(from)
154-
const cached = cache.get(key)
155-
if (cached) return cached
156-
const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer)
157-
cache.set(key, asyncBuffer)
158-
return asyncBuffer
159-
}
160-
const cache = new Map<string, Promise<AsyncBuffer>>()
161-
162-
export function compare<T>(a: T, b: T): number {
163-
if (a < b) return -1
164-
if (a > b) return 1
165-
return 1 // TODO: how to handle nulls?
166-
}
167-
168-
// TODO(SL): once the types in cachedAsyncBuffer are fixed, import all the following from hyparquet
169-
type Awaitable<T> = T | Promise<T>;
170-
171-
function cachedAsyncBuffer(asyncBuffer: AsyncBuffer): AsyncBuffer {
172-
const cache = new Map<string, Awaitable<ArrayBuffer>>()
173-
const { byteLength } = asyncBuffer
174-
return {
175-
byteLength,
176-
/**
177-
* @param {number} start
178-
* @param {number} [end]
179-
* @returns {Awaitable<ArrayBuffer>}
180-
*/
181-
slice(start: number, end?: number): Awaitable<ArrayBuffer> {
182-
const key = cacheKey(start, end, byteLength)
183-
const cached = cache.get(key)
184-
if (cached) return cached
185-
// cache miss, read from file
186-
const promise = asyncBuffer.slice(start, end)
187-
cache.set(key, promise)
188-
return promise
189-
},
190-
}
191-
}
192-
193-
/**
194-
* Returns canonical cache key for a byte range 'start,end'.
195-
* Normalize int-range and suffix-range requests to the same key.
196-
*
197-
* @param {number} start start byte of range
198-
* @param {number} [end] end byte of range, or undefined for suffix range
199-
* @param {number} [size] size of file, or undefined for suffix range
200-
* @returns {string}
201-
*/
202-
function cacheKey(start: number, end?: number, size?: number): string {
203-
if (start < 0) {
204-
if (end !== undefined)
205-
throw new Error(
206-
`invalid suffix range [${start.toString()}, ${end.toString()}]`,
207-
)
208-
if (size === undefined) return `${start.toString()},`
209-
return `${(size + start).toString()},${size.toString()}`
210-
} else if (end !== undefined) {
211-
if (start > end)
212-
throw new Error(
213-
`invalid empty range [${start.toString()}, ${end.toString()}]`,
214-
)
215-
return `${start.toString()},${end.toString()}`
216-
} else if (size === undefined) {
217-
return `${start.toString()},`
218-
} else {
219-
return `${start.toString()},${size.toString()}`
220-
}
221-
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { ColumnData, FileMetaData, ParquetReadOptions } from 'hyparquet'
2+
3+
// Serializable constructor for AsyncBuffers
4+
export interface AsyncBufferFrom {
5+
url: string
6+
byteLength: number
7+
headers?: Record<string, string>
8+
}
9+
// Same as ParquetReadOptions, but AsyncBufferFrom instead of AsyncBuffer
10+
export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'file'> {
11+
from: AsyncBufferFrom
12+
orderBy?: string
13+
sortIndex?: boolean
14+
}
15+
// Row is defined in hightable, but not exported + we change any to unknown
16+
export type Row = Record<string, unknown>;
17+
18+
interface Message {
19+
queryId: number
20+
}
21+
export interface ChunkMessage extends Message {
22+
chunk: ColumnData
23+
}
24+
export interface ResultMessage extends Message {
25+
result: Row[]
26+
}
27+
export interface IndicesMessage extends Message {
28+
indices: number[]
29+
}
30+
export interface ErrorMessage extends Message {
31+
error: Error
32+
}
33+
34+
export type ParquetMessage = ChunkMessage | ResultMessage | ErrorMessage
35+
export type SortParquetMessage = IndicesMessage | ErrorMessage
36+
37+
export interface ParquetSortIndexOptions {
38+
metadata: FileMetaData
39+
from: AsyncBufferFrom
40+
orderBy: string
41+
}

public/build/app.min.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

public/build/app.min.js.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)