Skip to content

Commit eeb0a62

Browse files
authored
Fix datafusion example and also make it a test (#3134)
Honestly not sure if datafusion's listing logic changed in regareds to file extensions or if I just made some last-second change and broke a working example before committing it.
1 parent 6826e43 commit eeb0a62

File tree

2 files changed

+82
-10
lines changed

2 files changed

+82
-10
lines changed

vortex-datafusion/examples/vortex_table.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,8 @@ use datafusion::datasource::listing::{
44
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
55
};
66
use datafusion::prelude::SessionContext;
7-
use object_store::ObjectStore;
8-
use object_store::local::LocalFileSystem;
97
use tempfile::tempdir;
108
use tokio::fs::OpenOptions;
11-
use url::Url;
129
use vortex_array::arrays::{ChunkedArray, StructArray, VarBinArray};
1310
use vortex_array::stream::ArrayStreamArrayExt;
1411
use vortex_array::validity::Validity;
@@ -40,7 +37,7 @@ async fn main() -> anyhow::Result<()> {
4037
Validity::NonNullable,
4138
)?;
4239

43-
let filepath = temp_dir.path().join("a.vtx");
40+
let filepath = temp_dir.path().join("a.vortex");
4441

4542
let f = OpenOptions::new()
4643
.write(true)
@@ -54,19 +51,14 @@ async fn main() -> anyhow::Result<()> {
5451
.await?;
5552

5653
let ctx = SessionContext::new();
57-
58-
let object_store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
59-
let url: Url = Url::try_from("file://")?;
60-
ctx.register_object_store(&url, object_store);
61-
6254
let format = Arc::new(VortexFormat::default());
6355
let table_url = ListingTableUrl::parse(
6456
filepath
6557
.to_str()
6658
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?,
6759
)?;
6860
let config = ListingTableConfig::new(table_url)
69-
.with_listing_options(ListingOptions::new(format as _))
61+
.with_listing_options(ListingOptions::new(format))
7062
.infer_schema(&ctx.state())
7163
.await?;
7264

vortex-datafusion/src/persistent/mod.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,83 @@ fn register_vortex_format_factory(
2626
file_formats.push(std::sync::Arc::new(factory));
2727
}
2828
}
29+
30+
#[cfg(test)]
31+
mod tests {
32+
use std::sync::Arc;
33+
34+
use datafusion::datasource::listing::{
35+
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
36+
};
37+
use datafusion::prelude::SessionContext;
38+
use tempfile::tempdir;
39+
use tokio::fs::OpenOptions;
40+
use vortex_array::arrays::{ChunkedArray, StructArray, VarBinArray};
41+
use vortex_array::stream::ArrayStreamArrayExt;
42+
use vortex_array::validity::Validity;
43+
use vortex_array::{Array, IntoArray};
44+
use vortex_buffer::buffer;
45+
use vortex_error::vortex_err;
46+
use vortex_file::VortexWriteOptions;
47+
48+
use crate::persistent::VortexFormat;
49+
50+
#[tokio::test]
51+
async fn query_file() -> anyhow::Result<()> {
52+
let temp_dir = tempdir()?;
53+
let strings = ChunkedArray::from_iter([
54+
VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(),
55+
VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(),
56+
])
57+
.into_array();
58+
59+
let numbers = ChunkedArray::from_iter([
60+
buffer![1u32, 2, 3, 4].into_array(),
61+
buffer![5u32, 6, 7, 8].into_array(),
62+
])
63+
.into_array();
64+
65+
let st = StructArray::try_new(
66+
["strings".into(), "numbers".into()].into(),
67+
vec![strings, numbers],
68+
8,
69+
Validity::NonNullable,
70+
)?;
71+
72+
let filepath = temp_dir.path().join("data.vortex");
73+
74+
let f = OpenOptions::new()
75+
.write(true)
76+
.create(true)
77+
.truncate(true)
78+
.open(&filepath)
79+
.await?;
80+
81+
VortexWriteOptions::default()
82+
.write(f, st.to_array_stream())
83+
.await?;
84+
85+
let ctx = SessionContext::default();
86+
let format = Arc::new(VortexFormat::default());
87+
let table_url = ListingTableUrl::parse(
88+
temp_dir
89+
.path()
90+
.to_str()
91+
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?,
92+
)?;
93+
assert!(table_url.is_collection());
94+
95+
let config = ListingTableConfig::new(table_url)
96+
.with_listing_options(ListingOptions::new(format))
97+
.infer_schema(&ctx.state())
98+
.await?;
99+
100+
let listing_table = Arc::new(ListingTable::try_new(config)?);
101+
102+
ctx.register_table("vortex_tbl", listing_table as _)?;
103+
let row_count = ctx.table("vortex_tbl").await?.count().await?;
104+
assert_eq!(row_count, 8);
105+
106+
Ok(())
107+
}
108+
}

0 commit comments

Comments
 (0)