Skip to content

Commit 35af782

Browse files
committed
more
Signed-off-by: Andrew Duffy <[email protected]>
1 parent ab6ad8e commit 35af782

File tree

3 files changed

+19
-17
lines changed

3 files changed

+19
-17
lines changed

vortex-io/src/runtime/tokio.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ impl Executor for tokio::runtime::Handle {
4444
}
4545

4646
fn spawn_cpu(&self, cpu: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
47-
Box::new(tokio::runtime::Handle::spawn(self, async move { cpu() }).abort_handle())
48-
// Box::new(tokio::runtime::Handle::spawn(self, unblock(|| cpu())).abort_handle())
47+
// Box::new(tokio::runtime::Handle::spawn(self, async move { cpu() }).abort_handle())
48+
Box::new(tokio::runtime::Handle::spawn(self, unblock(|| cpu())).abort_handle())
4949
}
5050

5151
fn spawn_blocking(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
@@ -68,8 +68,8 @@ impl Executor for CurrentTokioRuntime {
6868
fn spawn_cpu(&self, cpu: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
6969
Box::new(
7070
tokio::runtime::Handle::current()
71-
.spawn(async move { cpu() })
72-
// .spawn(unblock(|| cpu()))
71+
// .spawn(async move { cpu() })
72+
.spawn(unblock(|| cpu()))
7373
.abort_handle(),
7474
)
7575
}

vortex-layout/src/layouts/compressed.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,16 +135,16 @@ impl LayoutStrategy for CompressingStrategy {
135135
handle2.spawn_cpu(move || {
136136
let (sequence_id, chunk) = chunk?;
137137
// Compute the stats for the chunk prior to compression
138-
let start = std::time::Instant::now();
138+
// let start = std::time::Instant::now();
139139
chunk
140140
.statistics()
141141
.compute_all(&Stat::all().collect::<Vec<_>>())?;
142-
let elapsed = start.elapsed();
143-
println!("COMPUTE_ALL: {elapsed:?}");
144-
let start = std::time::Instant::now();
142+
// let elapsed = start.elapsed();
143+
// println!("COMPUTE_ALL: {elapsed:?}");
144+
// let start = std::time::Instant::now();
145145
let compress = compressor.compress_chunk(&chunk)?;
146-
let elapsed = start.elapsed();
147-
println!("COMPRESS_CHUNK: {elapsed:?}");
146+
// let elapsed = start.elapsed();
147+
// println!("COMPRESS_CHUNK: {elapsed:?}");
148148
Ok((sequence_id, compress))
149149
})
150150
})

vortex/examples/rebuild_clickbench.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,24 @@ use vortex_error::vortex_err;
1818
use vortex_file::WriteOptionsSessionExt;
1919
use vortex_session::VortexSession;
2020

21-
#[tokio::main]
21+
#[tokio::main(worker_threads = 8)]
2222
pub async fn main() -> anyhow::Result<()> {
2323
// console_subscriber::init();
24-
// let store = LocalFileSystem::new();
25-
let store = HttpBuilder::new()
26-
.with_url("https://vortex-benchmark-results-database.s3.amazonaws.com")
27-
.build()?;
24+
let store = LocalFileSystem::new();
25+
// let store = HttpBuilder::new()
26+
// .with_url("https://vortex-benchmark-results-database.s3.amazonaws.com")
27+
// .build()?;
2828
let store = Arc::new(store);
2929
let reader = parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder::new(
30-
ParquetObjectReader::new(store, Path::from("testing/hits.parquet")),
31-
// ParquetObjectReader::new(store, Path::from("/Users/aduffy/Downloads/hits.parquet")),
30+
// ParquetObjectReader::new(store, Path::from("testing/hits_83.parquet")),
31+
ParquetObjectReader::new(store, Path::from("/Volumes/Code/vortex/bench-vortex/data/clickbench_partitioned/parquet/hits_83.parquet")),
3232
)
3333
.await?;
3434

3535
let stream = reader.build()?;
3636

37+
// Start reading the next batch while we're recompressing the previous one.
38+
3739
// Turn into a Vortex record batch stream
3840
let vx_stream = ArrayStreamAdapter::new(
3941
DType::from_arrow(stream.schema().as_ref()),

0 commit comments

Comments
 (0)