Skip to content

Commit d8821f8

Browse files
committed
experiment: eagerly decode dict on read
Signed-off-by: Andrew Duffy <[email protected]>
1 parent ee4b921 commit d8821f8

File tree

4 files changed

+81
-1
lines changed

4 files changed

+81
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-datafusion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ vortex-utils = { workspace = true, features = ["dashmap"] }
4444
anyhow = { workspace = true }
4545
datafusion = { workspace = true }
4646
insta = { workspace = true }
47+
mimalloc = "0.1"
4748
rstest = { workspace = true }
4849
tempfile = { workspace = true }
4950
tokio = { workspace = true, features = ["test-util", "rt-multi-thread", "fs"] }
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::sync::Arc;
5+
6+
use datafusion::datasource::provider::DefaultTableFactory;
7+
use datafusion::execution::SessionStateBuilder;
8+
use datafusion::prelude::SessionContext;
9+
use datafusion_common::GetExt;
10+
use datafusion_execution::config::SessionConfig;
11+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
12+
use futures::StreamExt;
13+
use mimalloc::MiMalloc;
14+
use vortex_datafusion::VortexFormatFactory;
15+
16+
#[global_allocator]
17+
static GLOBAL: MiMalloc = MiMalloc;
18+
19+
fn get_session_context() -> SessionContext {
20+
let rt_builder = RuntimeEnvBuilder::new();
21+
22+
let rt = rt_builder
23+
.build_arc()
24+
.expect("could not build runtime environment");
25+
26+
let factory = VortexFormatFactory::new();
27+
28+
let mut session_state_builder = SessionStateBuilder::new()
29+
.with_config(SessionConfig::default())
30+
.with_runtime_env(rt)
31+
.with_default_features();
32+
33+
if let Some(table_factories) = session_state_builder.table_factories() {
34+
table_factories.insert(
35+
GetExt::get_ext(&factory).to_uppercase(), // Has to be uppercase
36+
Arc::new(DefaultTableFactory::new()),
37+
);
38+
}
39+
40+
if let Some(file_formats) = session_state_builder.file_formats() {
41+
file_formats.push(Arc::new(factory));
42+
}
43+
44+
SessionContext::new_with_state(session_state_builder.build())
45+
}
46+
47+
#[tokio::main]
48+
pub async fn main() -> anyhow::Result<()> {
49+
let ctx = get_session_context();
50+
51+
ctx.sql(
52+
r#"
53+
CREATE EXTERNAL TABLE hits
54+
STORED AS VORTEX
55+
LOCATION '/Volumes/Code/vortex/bench-vortex/data/clickbench_partitioned/vortex-file-compressed/'
56+
"#,
57+
)
58+
.await?;
59+
60+
let start = std::time::Instant::now();
61+
62+
let mut stream = ctx
63+
.sql("select * from hits")
64+
.await?
65+
.execute_stream()
66+
.await?;
67+
68+
while let Some(batch) = stream.next().await.transpose()? {
69+
// Discard the batches
70+
drop(batch);
71+
}
72+
73+
let elapsed = start.elapsed();
74+
println!("scanned 14GB of clickbench data in {elapsed:?}");
75+
76+
Ok(())
77+
}

vortex-layout/src/layouts/dict/reader.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::sync::{Arc, OnceLock};
88
use futures::future::BoxFuture;
99
use futures::{FutureExt, TryFutureExt, try_join};
1010
use vortex_array::compute::{MinMaxResult, min_max, take};
11-
use vortex_array::{ArrayRef, MaskFuture};
11+
use vortex_array::{Array, ArrayRef, IntoArray, MaskFuture};
1212
use vortex_dict::DictArray;
1313
use vortex_dtype::{DType, FieldMask};
1414
use vortex_error::{VortexError, VortexExpect, VortexResult};
@@ -76,6 +76,7 @@ impl DictReader {
7676
)
7777
.vortex_expect("must construct dict values array evaluation")
7878
.map_err(Arc::new)
79+
.map_ok(|arr| arr.to_canonical().into_array())
7980
.boxed()
8081
.shared()
8182
})

0 commit comments

Comments
 (0)