11import { ColumnData , parquetQuery } from 'hyparquet'
22import { compressors } from 'hyparquet-compressors'
3+ import { getParquetColumn } from '../getParquetColumn.js'
34import { asyncBufferFrom } from '../utils.js'
4- import type { ChunkMessage , ErrorMessage , IndicesMessage , ParquetReadWorkerOptions , ResultMessage } from './types.js'
5+ import type { ChunkMessage , ClientMessage , ColumnRanksMessage , ErrorMessage , ResultMessage } from './types.js'
56
67function postChunkMessage ( { chunk, queryId } : ChunkMessage ) {
78 self . postMessage ( { chunk, queryId } )
@@ -12,35 +13,50 @@ function postResultMessage ({ result, queryId }: ResultMessage) {
1213function postErrorMessage ( { error, queryId } : ErrorMessage ) {
1314 self . postMessage ( { error, queryId } )
1415}
15- function postIndicesMessage ( { indices , queryId } : IndicesMessage ) {
16- self . postMessage ( { indices , queryId } )
16+ function postColumnRanksMessage ( { columnRanks , queryId } : ColumnRanksMessage ) {
17+ self . postMessage ( { columnRanks , queryId } )
1718}
1819
19- self . onmessage = async ( { data } : {
20- data : ParquetReadWorkerOptions & { queryId : number ; chunks : boolean } ;
21- } ) => {
22- const { metadata, from, rowStart, rowEnd, orderBy, columns, queryId, chunks, sortIndex } = data
20+ self . onmessage = async ( { data } : { data : ClientMessage } ) => {
21+ const { metadata, from, kind, queryId } = data
2322 const file = await asyncBufferFrom ( from )
24- if ( sortIndex === undefined ) {
25- const onChunk = chunks ? ( chunk : ColumnData ) => { postChunkMessage ( { chunk, queryId } ) } : undefined
23+ if ( kind === 'columnRanks' ) {
24+ const { column } = data
25+ // return the column ranks in ascending order
26+ // we can get the descending order replacing the rank with numRows - rank - 1. It's not exactly the rank of
27+ // the descending order, because the rank is the first, not the last, of the ties. But it's enough for the
28+ // purpose of sorting.
29+
2630 try {
27- const result = await parquetQuery ( { metadata, file, rowStart, rowEnd, orderBy, columns, compressors, onChunk } )
28- postResultMessage ( { result, queryId } )
31+ const sortColumn = await getParquetColumn ( { metadata, file, column, compressors } )
32+ const valuesWithIndex = sortColumn . map ( ( value , index ) => ( { value, index } ) )
33+ const sortedValuesWithIndex = Array . from ( valuesWithIndex ) . sort ( ( { value : a } , { value : b } ) => compare < unknown > ( a , b ) )
34+ const numRows = sortedValuesWithIndex . length
35+ const columnRanks = sortedValuesWithIndex . reduce ( ( accumulator , currentValue , rank ) => {
36+ const { lastValue, lastRank, ranks } = accumulator
37+ const { value, index } = currentValue
38+ if ( value === lastValue ) {
39+ ranks [ index ] = lastRank
40+ return { ranks, lastValue, lastRank }
41+ } else {
42+ ranks [ index ] = rank
43+ return { ranks, lastValue : value , lastRank : rank }
44+ }
45+ } , {
46+ ranks : Array ( numRows ) . fill ( - 1 ) as number [ ] ,
47+ lastValue : undefined as unknown ,
48+ lastRank : 0 ,
49+ } ) . ranks
50+ postColumnRanksMessage ( { columnRanks : columnRanks , queryId } )
2951 } catch ( error ) {
3052 postErrorMessage ( { error : error as Error , queryId } )
3153 }
3254 } else {
55+ const { rowStart, rowEnd, chunks } = data
56+ const onChunk = chunks ? ( chunk : ColumnData ) => { postChunkMessage ( { chunk, queryId } ) } : undefined
3357 try {
34- // Special case for sorted index
35- if ( orderBy === undefined )
36- throw new Error ( 'sortParquetWorker requires orderBy' )
37- if ( rowStart !== undefined || rowEnd !== undefined )
38- throw new Error ( 'sortIndex requires all rows' )
39- const sortColumn = await parquetQuery ( { metadata, file, columns : [ orderBy ] , compressors } )
40- const indices = Array . from ( sortColumn , ( _ , index ) => index ) . sort ( ( a , b ) =>
41- compare < unknown > ( sortColumn [ a ] ?. [ orderBy ] , sortColumn [ b ] ?. [ orderBy ] )
42- )
43- postIndicesMessage ( { indices, queryId } )
58+ const result = await parquetQuery ( { metadata, file, rowStart, rowEnd, compressors, onChunk } )
59+ postResultMessage ( { result, queryId } )
4460 } catch ( error ) {
4561 postErrorMessage ( { error : error as Error , queryId } )
4662 }
0 commit comments