Skip to content

Commit 6e292f3

Browse files
authored
Better SQL tool (#362)
1 parent 9662537 commit 6e292f3

File tree

3 files changed

+81
-45
lines changed

3 files changed

+81
-45
lines changed

bin/chat.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ let outputMode = 'text' // default output mode
77
const instructions =
88
'You are a machine learning web application named "Hyperparam" running on a CLI terminal.'
99
+ '\nYou assist users with analyzing and exploring datasets, particularly in parquet format.'
10-
+ ' The website and api are available at hyperparam.app.'
10+
+ ' The website is available at hyperparam.app.'
1111
+ ' The Hyperparam CLI tool can list and explore local parquet files.'
1212
+ '\nYou are on a terminal and can only output: text, emojis, terminal colors, and terminal formatting.'
13-
+ ' Don\'t add additional markdown or html formatting unless requested.'
13+
+ ' Limited markdown formatting is available: inline code blocks.'
1414
+ (process.stdout.isTTY ? ` The terminal width is ${process.stdout.columns} characters.` : '')
1515

1616
const colors = {

bin/tools/parquetDataSource.js

Lines changed: 67 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
import { parquetSchema } from 'hyparquet'
2-
import { parquetPlan } from 'hyparquet/src/plan.js'
3-
import { asyncGroupToRows, readRowGroup } from 'hyparquet/src/rowgroup.js'
1+
import { parquetMetadataAsync, parquetReadObjects } from 'hyparquet'
42
import { whereToParquetFilter } from './parquetFilter.js'
53

64
/**
7-
* @import { AsyncBuffer, Compressors, FileMetaData } from 'hyparquet'
8-
* @import { AsyncDataSource } from 'squirreling'
5+
* @import { AsyncBuffer, Compressors, FileMetaData, ParquetQueryFilter } from 'hyparquet'
6+
* @import { AsyncDataSource, AsyncRow, SqlPrimitive } from 'squirreling'
97
* @import { AsyncCells } from 'squirreling/src/types.js'
108
*/
119

@@ -20,39 +18,75 @@ import { whereToParquetFilter } from './parquetFilter.js'
2018
export function parquetDataSource(file, metadata, compressors) {
2119
return {
2220
async *scan(hints) {
23-
const options = {
24-
file,
25-
metadata,
26-
compressors,
27-
columns: hints?.columns,
28-
// convert WHERE clause to parquet pushdown filter
29-
filter: whereToParquetFilter(hints?.where),
30-
filterStrict: false,
31-
}
21+
metadata ??= await parquetMetadataAsync(file)
3222

33-
// TODO: check that columns exist in parquet file
34-
let { columns } = options
35-
if (!columns?.length) {
36-
const schema = parquetSchema(metadata)
37-
columns = schema.children.map(col => col.element.name)
38-
}
23+
// Convert WHERE AST to hyparquet filter format
24+
const whereFilter = hints?.where && whereToParquetFilter(hints.where)
25+
/** @type {ParquetQueryFilter | undefined} */
26+
const filter = hints?.where ? whereFilter : undefined
27+
const filterApplied = !filter || whereFilter
3928

40-
const plan = parquetPlan(options)
41-
for (const subplan of plan.groups) {
42-
// Read row group
43-
const rg = readRowGroup(options, plan, subplan)
44-
// Transpose to materialized rows
45-
const rows = await asyncGroupToRows(rg, 0, rg.groupRows, undefined, 'object')
46-
// Convert to AsyncRow generator
47-
for (const row of rows) {
48-
/** @type {AsyncCells} */
49-
const cells = {}
50-
for (const [key, value] of Object.entries(row)) {
51-
cells[key] = () => Promise.resolve(value)
29+
// Emit rows by row group
30+
let groupStart = 0
31+
let remainingLimit = hints?.limit ?? Infinity
32+
for (const rowGroup of metadata.row_groups) {
33+
const rowCount = Number(rowGroup.num_rows)
34+
35+
// Skip row groups by offset if where is fully applied
36+
let safeOffset = 0
37+
let safeLimit = rowCount
38+
if (filterApplied) {
39+
if (hints?.offset !== undefined && groupStart < hints.offset) {
40+
safeOffset = Math.min(rowCount, hints.offset - groupStart)
5241
}
53-
yield { columns, cells }
42+
safeLimit = Math.min(rowCount - safeOffset, remainingLimit)
43+
if (safeLimit <= 0 && safeOffset < rowCount) break
44+
}
45+
for (let i = 0; i < safeOffset; i++) {
46+
// yield empty rows
47+
yield asyncRow({})
48+
}
49+
if (safeOffset === rowCount) {
50+
groupStart += rowCount
51+
continue
52+
}
53+
54+
// Read objects from this row group
55+
const data = await parquetReadObjects({
56+
file,
57+
metadata,
58+
rowStart: groupStart + safeOffset,
59+
rowEnd: groupStart + safeOffset + safeLimit,
60+
columns: hints?.columns,
61+
filter,
62+
filterStrict: false,
63+
compressors,
64+
useOffsetIndex: true,
65+
})
66+
67+
// Yield each row
68+
for (const row of data) {
69+
yield asyncRow(row)
5470
}
71+
72+
remainingLimit -= data.length
73+
groupStart += rowCount
5574
}
5675
},
5776
}
5877
}
78+
79+
/**
80+
* Creates an async row accessor that wraps a plain JavaScript object
81+
*
82+
* @param {Record<string, SqlPrimitive>} obj - the plain object
83+
* @returns {AsyncRow} a row accessor interface
84+
*/
85+
function asyncRow(obj) {
86+
/** @type {AsyncCells} */
87+
const cells = {}
88+
for (const [key, value] of Object.entries(obj)) {
89+
cells[key] = () => Promise.resolve(value)
90+
}
91+
return { columns: Object.keys(obj), cells }
92+
}

bin/tools/parquetSql.js

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { asyncBufferFromFile, parquetMetadataAsync } from 'hyparquet'
1+
import { asyncBufferFromFile, asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet'
22
import { compressors } from 'hyparquet-compressors'
33
import { collect, executeSql } from 'squirreling'
44
import { parquetDataSource } from './parquetDataSource.js'
@@ -27,9 +27,9 @@ export const parquetSql = {
2727
parameters: {
2828
type: 'object',
2929
properties: {
30-
filename: {
30+
file: {
3131
type: 'string',
32-
description: 'The name of the parquet file to query.',
32+
description: 'The parquet file to query either local file path or url.',
3333
},
3434
query: {
3535
type: 'string',
@@ -40,16 +40,16 @@ export const parquetSql = {
4040
description: 'Whether to truncate long string values in the results. If true (default), each string cell is limited to 1000 characters. If false, each string cell is limited to 10,000 characters.',
4141
},
4242
},
43-
required: ['filename', 'query'],
43+
required: ['file', 'query'],
4444
},
4545
},
4646
/**
4747
* @param {Record<string, unknown>} args
4848
* @returns {Promise<string>}
4949
*/
50-
async handleToolCall({ filename, query, truncate = true }) {
51-
if (typeof filename !== 'string') {
52-
throw new Error('Expected filename to be a string')
50+
async handleToolCall({ file, query, truncate = true }) {
51+
if (typeof file !== 'string') {
52+
throw new Error('Expected file to be a string')
5353
}
5454
if (typeof query !== 'string' || query.trim().length === 0) {
5555
throw new Error('Query parameter must be a non-empty string')
@@ -59,9 +59,11 @@ export const parquetSql = {
5959
const startTime = performance.now()
6060

6161
// Load parquet file and create data source
62-
const file = await asyncBufferFromFile(filename)
63-
const metadata = await parquetMetadataAsync(file)
64-
const table = parquetDataSource(file, metadata, compressors)
62+
const asyncBuffer = file.startsWith('http://') || file.startsWith('https://')
63+
? await asyncBufferFromUrl({ url: file })
64+
: await asyncBufferFromFile(file)
65+
const metadata = await parquetMetadataAsync(asyncBuffer)
66+
const table = parquetDataSource(asyncBuffer, metadata, compressors)
6567

6668
// Execute SQL query
6769
const results = await collect(executeSql({ tables: { table }, query }))

0 commit comments

Comments
 (0)