|
| 1 | +import { parquetSchema } from 'hyparquet' |
1 | 2 | import { parquetPlan } from 'hyparquet/src/plan.js' |
2 | 3 | import { asyncGroupToRows, readRowGroup } from 'hyparquet/src/rowgroup.js' |
3 | 4 | import { whereToParquetFilter } from './parquetFilter.js' |
4 | 5 |
|
5 | 6 | /** |
6 | | - * @import { AsyncBuffer, FileMetaData } from 'hyparquet' |
7 | | - * @import { AsyncDataSource, AsyncRow } from 'squirreling' |
| 7 | + * @import { AsyncBuffer, Compressors, FileMetaData } from 'hyparquet' |
| 8 | + * @import { AsyncDataSource } from 'squirreling' |
| 9 | + * @import { AsyncCells } from 'squirreling/src/types.js' |
8 | 10 | */ |
9 | 11 |
|
10 | 12 | /** |
11 | 13 | * Creates a parquet data source for use with squirreling SQL engine. |
12 | 14 | * |
13 | 15 | * @param {AsyncBuffer} file |
14 | 16 | * @param {FileMetaData} metadata |
15 | | - * @param {import('hyparquet-compressors').Compressors} compressors |
| 17 | + * @param {Compressors} compressors |
16 | 18 | * @returns {AsyncDataSource} |
17 | 19 | */ |
18 | 20 | export function parquetDataSource(file, metadata, compressors) { |
19 | 21 | return { |
20 | | - async *getRows(hints) { |
| 22 | + async *scan(hints) { |
21 | 23 | const options = { |
22 | 24 | file, |
23 | 25 | metadata, |
24 | 26 | compressors, |
25 | 27 | columns: hints?.columns, |
| 28 | + // convert WHERE clause to parquet pushdown filter |
26 | 29 | filter: whereToParquetFilter(hints?.where), |
| 30 | + filterStrict: false, |
27 | 31 | } |
| 32 | + |
| 33 | + // TODO: check that columns exist in parquet file |
| 34 | + let { columns } = options |
| 35 | + if (!columns?.length) { |
| 36 | + const schema = parquetSchema(metadata) |
| 37 | + columns = schema.children.map(col => col.element.name) |
| 38 | + } |
| 39 | + |
28 | 40 | const plan = parquetPlan(options) |
29 | | - let count = 0 |
30 | 41 | for (const subplan of plan.groups) { |
| 42 | + // Read row group |
31 | 43 | const rg = readRowGroup(options, plan, subplan) |
| 44 | + // Transpose to materialized rows |
32 | 45 | const rows = await asyncGroupToRows(rg, 0, rg.groupRows, undefined, 'object') |
33 | | - for (const asyncRow of rows) { |
34 | | - /** @type {AsyncRow} */ |
35 | | - const row = {} |
36 | | - for (const [key, value] of Object.entries(asyncRow)) { |
37 | | - row[key] = () => Promise.resolve(value) |
| 46 | + // Convert to AsyncRow generator |
| 47 | + for (const row of rows) { |
| 48 | + /** @type {AsyncCells} */ |
| 49 | + const cells = {} |
| 50 | + for (const [key, value] of Object.entries(row)) { |
| 51 | + cells[key] = () => Promise.resolve(value) |
38 | 52 | } |
39 | | - yield row |
40 | | - count++ |
41 | | - // Check limit after each row |
42 | | - if (hints?.limit !== undefined && count >= hints.limit) return |
| 53 | + yield { columns, cells } |
43 | 54 | } |
44 | 55 | } |
45 | 56 | }, |
|
0 commit comments