Skip to content

Commit dd2fbcd

Browse files
committed
feat(vx): add SQL query command with DataFusion support
Adds a new 'vx query' command that executes SQL queries against Vortex files using DataFusion. The table is available as 'data' in queries. Outputs structured JSON with schema and row data for tooling integration. Signed-off-by: Baris Palaska <[email protected]>
1 parent bcc85a6 commit dd2fbcd

File tree

5 files changed

+348
-0
lines changed

5 files changed

+348
-0
lines changed

Cargo.lock

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-tui/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@ version = { workspace = true }
1515

1616
[dependencies]
1717
anyhow = { workspace = true }
18+
arrow-array = { workspace = true }
19+
arrow-schema = { workspace = true }
1820
clap = { workspace = true, features = ["derive"] }
1921
crossterm = { workspace = true }
22+
datafusion = { workspace = true }
2023
env_logger = { version = "0.11" }
2124
flatbuffers = { workspace = true }
2225
futures = { workspace = true, features = ["executor"] }
@@ -26,10 +29,13 @@ indicatif = { workspace = true, features = ["futures"] }
2629
itertools = { workspace = true }
2730
parquet = { workspace = true, features = ["arrow", "async"] }
2831
ratatui = { workspace = true }
32+
serde = { workspace = true, features = ["derive"] }
33+
serde_json = { workspace = true }
2934
taffy = { workspace = true }
3035
termtree = { workspace = true }
3136
tokio = { workspace = true, features = ["rt-multi-thread"] }
3237
vortex = { workspace = true, features = ["tokio"] }
38+
vortex-datafusion = { workspace = true }
3339

3440
[lints]
3541
workspace = true
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! Shared DataFusion query execution utilities for both CLI and TUI.
5+
6+
use std::sync::Arc;
7+
8+
use arrow_array::Array as ArrowArray;
9+
use arrow_array::RecordBatch;
10+
use datafusion::datasource::listing::ListingOptions;
11+
use datafusion::datasource::listing::ListingTable;
12+
use datafusion::datasource::listing::ListingTableConfig;
13+
use datafusion::datasource::listing::ListingTableUrl;
14+
use datafusion::prelude::SessionContext;
15+
use vortex_datafusion::VortexFormat;
16+
17+
use crate::SESSION;
18+
19+
/// Execute a SQL query against a Vortex file.
20+
///
21+
/// The file is registered as a table named "data".
22+
/// Returns the result as a vector of RecordBatches.
23+
pub async fn execute_vortex_query(file_path: &str, sql: &str) -> Result<Vec<RecordBatch>, String> {
24+
let ctx = create_vortex_context(file_path).await?;
25+
26+
let df = ctx.sql(sql).await.map_err(|e| format!("SQL error: {e}"))?;
27+
28+
df.collect()
29+
.await
30+
.map_err(|e| format!("Query execution error: {e}"))
31+
}
32+
33+
/// Create a DataFusion SessionContext with a Vortex file registered as "data".
34+
pub async fn create_vortex_context(file_path: &str) -> Result<SessionContext, String> {
35+
let ctx = SessionContext::new();
36+
let format = Arc::new(VortexFormat::new(SESSION.clone()));
37+
38+
let table_url =
39+
ListingTableUrl::parse(file_path).map_err(|e| format!("Failed to parse file path: {e}"))?;
40+
41+
let config = ListingTableConfig::new(table_url)
42+
.with_listing_options(
43+
ListingOptions::new(format).with_session_config_options(ctx.state().config()),
44+
)
45+
.infer_schema(&ctx.state())
46+
.await
47+
.map_err(|e| format!("Failed to infer schema: {e}"))?;
48+
49+
let listing_table = Arc::new(
50+
ListingTable::try_new(config).map_err(|e| format!("Failed to create table: {e}"))?,
51+
);
52+
53+
ctx.register_table("data", listing_table)
54+
.map_err(|e| format!("Failed to register table: {e}"))?;
55+
56+
Ok(ctx)
57+
}
58+
59+
/// Convert an Arrow array value at a given index to a JSON value.
60+
#[allow(clippy::unwrap_used)]
61+
pub fn arrow_value_to_json(array: &dyn ArrowArray, idx: usize) -> serde_json::Value {
62+
use arrow_array::*;
63+
use arrow_schema::DataType;
64+
65+
if array.is_null(idx) {
66+
return serde_json::Value::Null;
67+
}
68+
69+
match array.data_type() {
70+
DataType::Null => serde_json::Value::Null,
71+
DataType::Boolean => {
72+
let arr = array.as_any().downcast_ref::<BooleanArray>().unwrap();
73+
serde_json::Value::Bool(arr.value(idx))
74+
}
75+
DataType::Int8 => {
76+
let arr = array.as_any().downcast_ref::<Int8Array>().unwrap();
77+
serde_json::json!(arr.value(idx))
78+
}
79+
DataType::Int16 => {
80+
let arr = array.as_any().downcast_ref::<Int16Array>().unwrap();
81+
serde_json::json!(arr.value(idx))
82+
}
83+
DataType::Int32 => {
84+
let arr = array.as_any().downcast_ref::<Int32Array>().unwrap();
85+
serde_json::json!(arr.value(idx))
86+
}
87+
DataType::Int64 => {
88+
let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
89+
serde_json::json!(arr.value(idx))
90+
}
91+
DataType::UInt8 => {
92+
let arr = array.as_any().downcast_ref::<UInt8Array>().unwrap();
93+
serde_json::json!(arr.value(idx))
94+
}
95+
DataType::UInt16 => {
96+
let arr = array.as_any().downcast_ref::<UInt16Array>().unwrap();
97+
serde_json::json!(arr.value(idx))
98+
}
99+
DataType::UInt32 => {
100+
let arr = array.as_any().downcast_ref::<UInt32Array>().unwrap();
101+
serde_json::json!(arr.value(idx))
102+
}
103+
DataType::UInt64 => {
104+
let arr = array.as_any().downcast_ref::<UInt64Array>().unwrap();
105+
serde_json::json!(arr.value(idx))
106+
}
107+
DataType::Float16 => {
108+
let arr = array.as_any().downcast_ref::<Float16Array>().unwrap();
109+
serde_json::json!(arr.value(idx).to_f32())
110+
}
111+
DataType::Float32 => {
112+
let arr = array.as_any().downcast_ref::<Float32Array>().unwrap();
113+
serde_json::json!(arr.value(idx))
114+
}
115+
DataType::Float64 => {
116+
let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
117+
serde_json::json!(arr.value(idx))
118+
}
119+
DataType::Utf8 => {
120+
let arr = array.as_any().downcast_ref::<StringArray>().unwrap();
121+
serde_json::Value::String(arr.value(idx).to_string())
122+
}
123+
DataType::LargeUtf8 => {
124+
let arr = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
125+
serde_json::Value::String(arr.value(idx).to_string())
126+
}
127+
DataType::Utf8View => {
128+
let arr = array.as_any().downcast_ref::<StringViewArray>().unwrap();
129+
serde_json::Value::String(arr.value(idx).to_string())
130+
}
131+
DataType::Binary => {
132+
let arr = array.as_any().downcast_ref::<BinaryArray>().unwrap();
133+
let hex: String = arr.value(idx).iter().map(|b| format!("{b:02x}")).collect();
134+
serde_json::Value::String(hex)
135+
}
136+
DataType::LargeBinary => {
137+
let arr = array.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
138+
let hex: String = arr.value(idx).iter().map(|b| format!("{b:02x}")).collect();
139+
serde_json::Value::String(hex)
140+
}
141+
DataType::BinaryView => {
142+
let arr = array.as_any().downcast_ref::<BinaryViewArray>().unwrap();
143+
let hex: String = arr.value(idx).iter().map(|b| format!("{b:02x}")).collect();
144+
serde_json::Value::String(hex)
145+
}
146+
DataType::Date32 => {
147+
let arr = array.as_any().downcast_ref::<Date32Array>().unwrap();
148+
serde_json::json!(arr.value(idx))
149+
}
150+
DataType::Date64 => {
151+
let arr = array.as_any().downcast_ref::<Date64Array>().unwrap();
152+
serde_json::json!(arr.value(idx))
153+
}
154+
DataType::Timestamp(_, _) => {
155+
if let Some(arr) = array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
156+
serde_json::json!(arr.value(idx))
157+
} else if let Some(arr) = array.as_any().downcast_ref::<TimestampMillisecondArray>() {
158+
serde_json::json!(arr.value(idx))
159+
} else if let Some(arr) = array.as_any().downcast_ref::<TimestampSecondArray>() {
160+
serde_json::json!(arr.value(idx))
161+
} else if let Some(arr) = array.as_any().downcast_ref::<TimestampNanosecondArray>() {
162+
serde_json::json!(arr.value(idx))
163+
} else {
164+
serde_json::Value::String("<timestamp>".to_string())
165+
}
166+
}
167+
DataType::Decimal128(_, _) => {
168+
let arr = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
169+
serde_json::Value::String(arr.value_as_string(idx))
170+
}
171+
DataType::Decimal256(_, _) => {
172+
let arr = array.as_any().downcast_ref::<Decimal256Array>().unwrap();
173+
serde_json::Value::String(arr.value_as_string(idx))
174+
}
175+
DataType::List(_) => {
176+
let arr = array.as_any().downcast_ref::<ListArray>().unwrap();
177+
let value_arr = arr.value(idx);
178+
let elements: Vec<serde_json::Value> = (0..value_arr.len())
179+
.map(|i| arrow_value_to_json(value_arr.as_ref(), i))
180+
.collect();
181+
serde_json::Value::Array(elements)
182+
}
183+
DataType::LargeList(_) => {
184+
let arr = array.as_any().downcast_ref::<LargeListArray>().unwrap();
185+
let value_arr = arr.value(idx);
186+
let elements: Vec<serde_json::Value> = (0..value_arr.len())
187+
.map(|i| arrow_value_to_json(value_arr.as_ref(), i))
188+
.collect();
189+
serde_json::Value::Array(elements)
190+
}
191+
DataType::Struct(_) => {
192+
let arr = array.as_any().downcast_ref::<StructArray>().unwrap();
193+
let mut obj = serde_json::Map::new();
194+
for (i, field) in arr.fields().iter().enumerate() {
195+
let col = arr.column(i);
196+
obj.insert(field.name().clone(), arrow_value_to_json(col.as_ref(), idx));
197+
}
198+
serde_json::Value::Object(obj)
199+
}
200+
_ => {
201+
// Fallback for unsupported types
202+
serde_json::Value::String(format!("<{}>", array.data_type()))
203+
}
204+
}
205+
}
206+
207+
/// Format a JSON value for display in the TUI.
208+
///
209+
/// - Null becomes "NULL"
210+
/// - Strings are displayed without quotes
211+
/// - Other values use their JSON string representation
212+
pub fn json_value_to_display(value: serde_json::Value) -> String {
213+
match value {
214+
serde_json::Value::Null => "NULL".to_string(),
215+
serde_json::Value::String(s) => s,
216+
other => other.to_string(),
217+
}
218+
}

vortex-tui/src/main.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
#![allow(clippy::expect_used)]
55
mod browse;
66
mod convert;
7+
mod datafusion_helper;
78
mod inspect;
9+
mod query;
810
mod segment_tree;
911
mod segments;
1012
mod tree;
@@ -42,6 +44,8 @@ enum Commands {
4244
Browse { file: PathBuf },
4345
/// Inspect Vortex file footer and metadata
4446
Inspect(InspectArgs),
47+
/// Execute a SQL query against a Vortex file using DataFusion
48+
Query(query::QueryArgs),
4549
/// Display segment information for a Vortex file
4650
Segments(SegmentsArgs),
4751
}
@@ -56,6 +60,7 @@ impl Commands {
5660
Commands::Browse { file } => file,
5761
Commands::Convert(flags) => &flags.file,
5862
Commands::Inspect(args) => &args.file,
63+
Commands::Query(args) => &args.file,
5964
Commands::Segments(args) => &args.file,
6065
}
6166
}
@@ -88,6 +93,7 @@ async fn main() -> anyhow::Result<()> {
8893
Commands::Convert(flags) => convert::exec_convert(flags).await?,
8994
Commands::Browse { file } => exec_tui(file).await?,
9095
Commands::Inspect(args) => inspect::exec_inspect(args).await?,
96+
Commands::Query(args) => query::exec_query(args).await?,
9197
Commands::Segments(args) => segments::exec_segments(args).await?,
9298
};
9399

0 commit comments

Comments
 (0)