1- import { parquetSchema } from 'hyparquet'
2- import { parquetPlan } from 'hyparquet/src/plan.js'
3- import { asyncGroupToRows , readRowGroup } from 'hyparquet/src/rowgroup.js'
1+ import { parquetMetadataAsync , parquetReadObjects } from 'hyparquet'
42import { whereToParquetFilter } from './parquetFilter.js'
53
64/**
7- * @import { AsyncBuffer, Compressors, FileMetaData } from 'hyparquet'
8- * @import { AsyncDataSource } from 'squirreling'
5+ * @import { AsyncBuffer, Compressors, FileMetaData, ParquetQueryFilter } from 'hyparquet'
6+ * @import { AsyncDataSource, AsyncRow, SqlPrimitive } from 'squirreling'
97 * @import { AsyncCells } from 'squirreling/src/types.js'
108 */
119
@@ -20,39 +18,75 @@ import { whereToParquetFilter } from './parquetFilter.js'
2018export function parquetDataSource ( file , metadata , compressors ) {
2119 return {
2220 async * scan ( hints ) {
23- const options = {
24- file,
25- metadata,
26- compressors,
27- columns : hints ?. columns ,
28- // convert WHERE clause to parquet pushdown filter
29- filter : whereToParquetFilter ( hints ?. where ) ,
30- filterStrict : false ,
31- }
21+ metadata ??= await parquetMetadataAsync ( file )
3222
33- // TODO: check that columns exist in parquet file
34- let { columns } = options
35- if ( ! columns ?. length ) {
36- const schema = parquetSchema ( metadata )
37- columns = schema . children . map ( col => col . element . name )
38- }
23+ // Convert WHERE AST to hyparquet filter format
24+ const whereFilter = hints ?. where && whereToParquetFilter ( hints . where )
25+ /** @type {ParquetQueryFilter | undefined } */
26+ const filter = hints ?. where ? whereFilter : undefined
27+ const filterApplied = ! filter || whereFilter
3928
40- const plan = parquetPlan ( options )
41- for ( const subplan of plan . groups ) {
42- // Read row group
43- const rg = readRowGroup ( options , plan , subplan )
44- // Transpose to materialized rows
45- const rows = await asyncGroupToRows ( rg , 0 , rg . groupRows , undefined , 'object' )
46- // Convert to AsyncRow generator
47- for ( const row of rows ) {
48- /** @type { AsyncCells } */
49- const cells = { }
50- for ( const [ key , value ] of Object . entries ( row ) ) {
51- cells [ key ] = ( ) => Promise . resolve ( value )
29+ // Emit rows by row group
30+ let groupStart = 0
31+ let remainingLimit = hints ?. limit ?? Infinity
32+ for ( const rowGroup of metadata . row_groups ) {
33+ const rowCount = Number ( rowGroup . num_rows )
34+
35+ // Skip row groups by offset if where is fully applied
36+ let safeOffset = 0
37+ let safeLimit = rowCount
38+ if ( filterApplied ) {
39+ if ( hints ?. offset !== undefined && groupStart < hints . offset ) {
40+ safeOffset = Math . min ( rowCount , hints . offset - groupStart )
5241 }
53- yield { columns, cells }
42+ safeLimit = Math . min ( rowCount - safeOffset , remainingLimit )
43+ if ( safeLimit <= 0 && safeOffset < rowCount ) break
44+ }
45+ for ( let i = 0 ; i < safeOffset ; i ++ ) {
46+ // yield empty rows
47+ yield asyncRow ( { } )
48+ }
49+ if ( safeOffset === rowCount ) {
50+ groupStart += rowCount
51+ continue
52+ }
53+
54+ // Read objects from this row group
55+ const data = await parquetReadObjects ( {
56+ file,
57+ metadata,
58+ rowStart : groupStart + safeOffset ,
59+ rowEnd : groupStart + safeOffset + safeLimit ,
60+ columns : hints ?. columns ,
61+ filter,
62+ filterStrict : false ,
63+ compressors,
64+ useOffsetIndex : true ,
65+ } )
66+
67+ // Yield each row
68+ for ( const row of data ) {
69+ yield asyncRow ( row )
5470 }
71+
72+ remainingLimit -= data . length
73+ groupStart += rowCount
5574 }
5675 } ,
5776 }
5877}
78+
79+ /**
80+ * Creates an async row accessor that wraps a plain JavaScript object
81+ *
82+ * @param {Record<string, SqlPrimitive> } obj - the plain object
83+ * @returns {AsyncRow } a row accessor interface
84+ */
85+ function asyncRow ( obj ) {
86+ /** @type {AsyncCells } */
87+ const cells = { }
88+ for ( const [ key , value ] of Object . entries ( obj ) ) {
89+ cells [ key ] = ( ) => Promise . resolve ( value )
90+ }
91+ return { columns : Object . keys ( obj ) , cells }
92+ }
0 commit comments