Skip to content

Commit c9c966a

Browse files
authored
Have vortex-datafusion depend only on vortex crate (#3438)
Also removes the in-memory DataFusion execution plans. --------- Signed-off-by: Nicholas Gates <[email protected]>
1 parent b2ded11 commit c9c966a

File tree

46 files changed

+405
-1591
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+405
-1591
lines changed

Cargo.lock

Lines changed: 4 additions & 73 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench-vortex/src/bin/random_access.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use indicatif::ProgressBar;
1111
use itertools::Itertools;
1212
use tokio::runtime::Runtime;
1313
use vortex::buffer::{Buffer, buffer};
14-
use vortex::error::VortexUnwrap;
1514

1615
#[derive(Parser, Debug)]
1716
#[command(version, about, long_about = None)]
@@ -75,11 +74,7 @@ fn random_access(
7574
&runtime,
7675
iterations,
7776
|| indices.clone(),
78-
|indices| async {
79-
take_vortex_tokio(&taxi_vortex, indices)
80-
.await
81-
.vortex_unwrap()
82-
},
77+
|indices| async { take_vortex_tokio(&taxi_vortex, indices).await },
8378
),
8479
});
8580
progress.inc(1);
@@ -93,7 +88,7 @@ fn random_access(
9388
&runtime,
9489
iterations,
9590
|| indices.clone(),
96-
|indices| async { take_parquet(&taxi_parquet, indices).await.vortex_unwrap() },
91+
|indices| async { take_parquet(&taxi_parquet, indices).await },
9792
),
9893
});
9994
progress.inc(1);

bench-vortex/src/clickbench.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,7 @@ pub async fn register_vortex_files(
222222
let config = if let Some(schema) = schema {
223223
config.with_schema(schema.into())
224224
} else {
225-
config
226-
.infer_schema(&session.state())
227-
.await
228-
.vortex_expect("cannot infer schema")
225+
config.infer_schema(&session.state()).await?
229226
};
230227

231228
let listing_table = Arc::new(ListingTable::try_new(config)?);

bench-vortex/src/compress/vortex.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use vortex::error::VortexResult;
1111
use vortex::file::{VortexLayoutStrategy, VortexOpenOptions, VortexWriteOptions};
1212

1313
#[inline(never)]
14-
pub async fn vortex_compress_write(array: &dyn Array, buf: &mut Vec<u8>) -> VortexResult<u64> {
14+
pub async fn vortex_compress_write(array: &dyn Array, buf: &mut Vec<u8>) -> anyhow::Result<u64> {
1515
Ok(VortexWriteOptions::default()
1616
.with_strategy(VortexLayoutStrategy::with_executor(Arc::new(
1717
Handle::current(),
@@ -22,8 +22,8 @@ pub async fn vortex_compress_write(array: &dyn Array, buf: &mut Vec<u8>) -> Vort
2222
}
2323

2424
#[inline(never)]
25-
pub async fn vortex_decompress_read(buf: Bytes) -> VortexResult<Vec<ArrayRef>> {
26-
VortexOpenOptions::in_memory()
25+
pub async fn vortex_decompress_read(buf: Bytes) -> anyhow::Result<Vec<ArrayRef>> {
26+
Ok(VortexOpenOptions::in_memory()
2727
.open(buf)
2828
.await?
2929
.scan()?
@@ -33,5 +33,5 @@ pub async fn vortex_decompress_read(buf: Bytes) -> VortexResult<Vec<ArrayRef>> {
3333
.await?
3434
.into_iter()
3535
.map(|a| a.into_arrow_preferred())
36-
.collect::<VortexResult<Vec<_>>>()
36+
.collect::<VortexResult<Vec<_>>>()?)
3737
}

bench-vortex/src/conversions.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
88
use vortex::TryIntoArray;
99
use vortex::dtype::DType;
1010
use vortex::dtype::arrow::FromArrowType;
11-
use vortex::error::{VortexError, VortexResult};
11+
use vortex::error::VortexError;
1212
use vortex::iter::{ArrayIteratorAdapter, ArrayIteratorExt};
1313
use vortex::stream::ArrayStream;
1414

15-
pub fn parquet_to_vortex(parquet_path: PathBuf) -> VortexResult<impl ArrayStream> {
15+
pub fn parquet_to_vortex(parquet_path: PathBuf) -> anyhow::Result<impl ArrayStream> {
1616
let reader = ParquetRecordBatchReaderBuilder::try_new(File::open(parquet_path)?)?.build()?;
1717

1818
let array_iter = ArrayIteratorAdapter::new(
@@ -31,7 +31,7 @@ pub async fn csv_to_parquet_file(
3131
options: CsvReadOptions<'_>,
3232
csv_path: &str,
3333
parquet_path: &str,
34-
) -> VortexResult<()> {
34+
) -> anyhow::Result<()> {
3535
let df = session.read_csv(csv_path, options).await?;
3636

3737
df.write_parquet(parquet_path, DataFrameWriteOptions::default(), None)

bench-vortex/src/datasets/file.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use object_store::path::Path as ObjectStorePath;
1313
use tokio::fs::OpenOptions;
1414
use tracing::info;
1515
use url::Url;
16-
use vortex::error::VortexExpect;
1716
use vortex::file::VortexWriteOptions;
1817
use vortex_datafusion::persistent::VortexFormat;
1918

@@ -73,10 +72,7 @@ pub async fn register_parquet_files(
7372
let config = if let Some(schema) = schema {
7473
config.with_schema(schema.into())
7574
} else {
76-
config
77-
.infer_schema(&session.state())
78-
.await
79-
.vortex_expect("cannot infer schema")
75+
config.infer_schema(&session.state()).await?
8076
};
8177

8278
let listing_table = Arc::new(ListingTable::try_new(config)?);

bench-vortex/src/engines/df/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use object_store::aws::AmazonS3Builder;
1818
use object_store::gcp::GoogleCloudStorageBuilder;
1919
use object_store::local::LocalFileSystem;
2020
use url::Url;
21-
use vortex::error::VortexResult;
2221
use vortex_datafusion::persistent::VortexFormatFactory;
2322

2423
pub static GIT_COMMIT_ID: LazyLock<String> = LazyLock::new(|| {
@@ -50,7 +49,7 @@ pub fn get_session_context(disable_datafusion_cache: bool) -> SessionContext {
5049
.build_arc()
5150
.expect("could not build runtime environment");
5251

53-
let factory = VortexFormatFactory::default_config();
52+
let factory = VortexFormatFactory::default();
5453

5554
let mut session_state_builder = SessionStateBuilder::new()
5655
.with_config(SessionConfig::default())
@@ -109,7 +108,7 @@ pub fn make_object_store(
109108
pub async fn execute_query(
110109
ctx: &SessionContext,
111110
query: &str,
112-
) -> VortexResult<(Vec<RecordBatch>, Arc<dyn ExecutionPlan>)> {
111+
) -> anyhow::Result<(Vec<RecordBatch>, Arc<dyn ExecutionPlan>)> {
113112
let plan = ctx.sql(query).await?;
114113
let (state, plan) = plan.into_parts();
115114
let physical_plan = state.create_physical_plan(&plan).await?;

bench-vortex/src/lib.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,6 @@ pub enum Format {
102102
Arrow,
103103
#[clap(name = "parquet")]
104104
Parquet,
105-
#[clap(name = "in-memory-vortex")]
106-
#[serde(rename = "in-memory-vortex")]
107-
InMemoryVortex,
108105
#[clap(name = "vortex")]
109106
#[serde(rename = "vortex")]
110107
OnDiskVortex,
@@ -125,7 +122,6 @@ impl Format {
125122
Format::Csv => "csv",
126123
Format::Arrow => "arrow",
127124
Format::Parquet => "parquet",
128-
Format::InMemoryVortex => "vortex-in-memory",
129125
Format::OnDiskVortex => "vortex-file-compressed",
130126
Format::OnDiskDuckDB => "duckdb",
131127
}
@@ -136,7 +132,6 @@ impl Format {
136132
Format::Csv => "csv",
137133
Format::Arrow => "arrow",
138134
Format::Parquet => "parquet",
139-
Format::InMemoryVortex => unreachable!("no extension"),
140135
Format::OnDiskVortex => "vortex",
141136
Format::OnDiskDuckDB => "duckdb",
142137
}

0 commit comments

Comments
 (0)