Skip to content

Commit ab6ad8e

Browse files
committed
more experiments
Signed-off-by: Andrew Duffy <[email protected]>
1 parent 248eee6 commit ab6ad8e

File tree

8 files changed

+240
-30
lines changed

8 files changed

+240
-30
lines changed

Cargo.lock

Lines changed: 164 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/sequence/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ vortex-array = { workspace = true }
2020
vortex-buffer = { workspace = true }
2121
vortex-dtype = { workspace = true }
2222
vortex-error = { workspace = true }
23-
vortex-io = { workspace = true }
2423
vortex-mask = { workspace = true }
2524
vortex-proto = { workspace = true }
2625
vortex-scalar = { workspace = true }

vortex-array/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ vortex-compute = { workspace = true, default-features = true }
6565
vortex-dtype = { workspace = true, features = ["arrow", "serde"] }
6666
vortex-error = { workspace = true, features = ["prost"] }
6767
vortex-flatbuffers = { workspace = true, features = ["array"] }
68-
vortex-io = { workspace = true }
6968
vortex-mask = { workspace = true }
7069
vortex-metrics = { workspace = true }
7170
vortex-scalar = { workspace = true }

vortex-io/benches/spawn.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ fn tokio_spawn(b: Bencher, work_ms: u64) {
1414
let handle = rt.handle().clone();
1515
b.bench_local(|| {
1616
// Spawn 1000 tasks that all do 1MS of CPU-blocking work.
17-
let join_handles: Vec<_> = (0..1000)
17+
let join_handles: Vec<_> = (0..100)
1818
.map(|_| handle.spawn(async move { thread::sleep(Duration::from_millis(work_ms)) }))
1919
.collect();
2020

@@ -30,7 +30,7 @@ fn tokio_spawn_blocking(b: Bencher, work_ms: u64) {
3030
b.bench_local(|| {
3131
let work_ms = work_ms;
3232
// Spawn 1000 tasks that all do 1MS of CPU-blocking work.
33-
let join_handles: Vec<_> = (0..1000)
33+
let join_handles: Vec<_> = (0..100)
3434
.map(|_| handle.spawn_blocking(move || thread::sleep(Duration::from_millis(work_ms))))
3535
.collect();
3636

@@ -46,7 +46,7 @@ fn tokio_unblock(b: Bencher, work_ms: u64) {
4646
b.bench_local(|| {
4747
// Spawn 1000 tasks that all do 1MS of CPU-blocking work.
4848
let work_ms = work_ms;
49-
let join_handles: Vec<_> = (0..1000)
49+
let join_handles: Vec<_> = (0..100)
5050
.map(|_| {
5151
handle.spawn(blocking::unblock(move || {
5252
thread::sleep(Duration::from_millis(work_ms.clone()))
@@ -65,7 +65,7 @@ fn smol_unblock(b: Bencher, work_ms: u64) {
6565
b.bench_local(|| {
6666
// Spawn 1000 tasks that all do 1MS of CPU-blocking work.
6767
let work_ms = work_ms;
68-
let mut join_handles = Vec::with_capacity(1000);
68+
let mut join_handles = Vec::with_capacity(100);
6969
exec.spawn_many(
7070
(0..1000).map(|_| {
7171
blocking::unblock(move || thread::sleep(Duration::from_millis(work_ms.clone())))

vortex-io/src/runtime/tokio.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use std::sync::{Arc, LazyLock};
55

6+
use blocking::unblock;
67
use futures::future::BoxFuture;
78
use tracing::Instrument;
89

@@ -44,6 +45,7 @@ impl Executor for tokio::runtime::Handle {
4445

4546
fn spawn_cpu(&self, cpu: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
4647
Box::new(tokio::runtime::Handle::spawn(self, async move { cpu() }).abort_handle())
48+
// Box::new(tokio::runtime::Handle::spawn(self, unblock(|| cpu())).abort_handle())
4749
}
4850

4951
fn spawn_blocking(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
@@ -67,6 +69,7 @@ impl Executor for CurrentTokioRuntime {
6769
Box::new(
6870
tokio::runtime::Handle::current()
6971
.spawn(async move { cpu() })
72+
// .spawn(unblock(|| cpu()))
7073
.abort_handle(),
7174
)
7275
}

vortex-layout/src/layouts/compressed.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,17 @@ impl LayoutStrategy for CompressingStrategy {
135135
handle2.spawn_cpu(move || {
136136
let (sequence_id, chunk) = chunk?;
137137
// Compute the stats for the chunk prior to compression
138+
let start = std::time::Instant::now();
138139
chunk
139140
.statistics()
140141
.compute_all(&Stat::all().collect::<Vec<_>>())?;
141-
Ok((sequence_id, compressor.compress_chunk(&chunk)?))
142+
let elapsed = start.elapsed();
143+
println!("COMPUTE_ALL: {elapsed:?}");
144+
let start = std::time::Instant::now();
145+
let compress = compressor.compress_chunk(&chunk)?;
146+
let elapsed = start.elapsed();
147+
println!("COMPRESS_CHUNK: {elapsed:?}");
148+
Ok((sequence_id, compress))
142149
})
143150
})
144151
.buffered(self.concurrency);

vortex/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,13 @@ vortex-zstd = { workspace = true, optional = true }
5555
[dev-dependencies]
5656
anyhow = { workspace = true }
5757
arrow-array = { workspace = true }
58+
console-subscriber = "0.5.0"
5859
divan = { workspace = true }
60+
futures = { workspace = true }
5961
itertools = { workspace = true }
6062
mimalloc = { workspace = true }
61-
parquet = { workspace = true }
63+
object_store = { workspace = true, features = ["http", "fs"] }
64+
parquet = { workspace = true, features = ["arrow", "async", "object_store"] }
6265
rand = { workspace = true }
6366
serde_json = { workspace = true }
6467
tokio = { workspace = true, features = ["full"] }
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::sync::Arc;
5+
6+
use futures::stream::StreamExt;
7+
use object_store::http::HttpBuilder;
8+
use object_store::local::LocalFileSystem;
9+
use object_store::path::Path;
10+
use parquet::arrow::async_reader::ParquetObjectReader;
11+
use vortex::VortexSessionDefault;
12+
use vortex_array::ArrayRef;
13+
use vortex_array::arrow::FromArrowArray;
14+
use vortex_array::stream::ArrayStreamAdapter;
15+
use vortex_dtype::DType;
16+
use vortex_dtype::arrow::FromArrowType;
17+
use vortex_error::vortex_err;
18+
use vortex_file::WriteOptionsSessionExt;
19+
use vortex_session::VortexSession;
20+
21+
#[tokio::main]
22+
pub async fn main() -> anyhow::Result<()> {
23+
// console_subscriber::init();
24+
// let store = LocalFileSystem::new();
25+
let store = HttpBuilder::new()
26+
.with_url("https://vortex-benchmark-results-database.s3.amazonaws.com")
27+
.build()?;
28+
let store = Arc::new(store);
29+
let reader = parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder::new(
30+
ParquetObjectReader::new(store, Path::from("testing/hits.parquet")),
31+
// ParquetObjectReader::new(store, Path::from("/Users/aduffy/Downloads/hits.parquet")),
32+
)
33+
.await?;
34+
35+
let stream = reader.build()?;
36+
37+
// Turn into a Vortex record batch stream
38+
let vx_stream = ArrayStreamAdapter::new(
39+
DType::from_arrow(stream.schema().as_ref()),
40+
stream.map(|br| {
41+
br.map_err(|e| vortex_err!("error: {e}"))
42+
.map(|b| ArrayRef::from_arrow(b, false))
43+
}),
44+
);
45+
46+
// Write to file
47+
let output = tokio::fs::File::create("/tmp/output.vortex").await?;
48+
let session = VortexSession::default();
49+
println!("begin writing");
50+
let start = std::time::Instant::now();
51+
session.write_options().write(output, vx_stream).await?;
52+
let duration = start.elapsed();
53+
54+
println!("rewrote in {duration:?}");
55+
56+
Ok(())
57+
}

0 commit comments

Comments
 (0)