Skip to content

Commit f8caed0

Browse files
authored
Run decompress benchmark on tokio runtime (#2001)
1 parent 6c3550f commit f8caed0

File tree

1 file changed

+14
-23
lines changed

1 file changed

+14
-23
lines changed

bench-vortex/benches/compress.rs

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ use parquet::basic::{Compression, ZstdLevel};
2727
use parquet::file::properties::WriterProperties;
2828
use regex::Regex;
2929
use simplelog::*;
30-
use tokio::runtime::Runtime;
30+
use tokio::runtime::{Handle, Runtime};
3131
use vortex::array::{ChunkedArray, StructArray};
3232
use vortex::dtype::FieldName;
3333
use vortex::error::VortexResult;
34-
use vortex::file::{Scan, VortexOpenOptions, VortexWriteOptions};
34+
use vortex::file::{ExecutionMode, Scan, VortexOpenOptions, VortexWriteOptions};
3535
use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
3636
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
3737
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoCanonical};
@@ -116,39 +116,30 @@ fn vortex_compress_write(
116116
array: &ArrayData,
117117
buf: &mut Vec<u8>,
118118
) -> VortexResult<u64> {
119-
async fn async_write(array: &ArrayData, cursor: &mut Cursor<&mut Vec<u8>>) -> VortexResult<()> {
120-
VortexWriteOptions::default()
121-
.write(cursor, array.clone().into_array_stream())
122-
.await?;
123-
124-
Ok(())
125-
}
126-
127119
let compressed = compressor.compress(array, None)?.into_array();
128-
let mut cursor = Cursor::new(buf);
129-
130-
runtime.block_on(async_write(&compressed, &mut cursor))?;
131-
132-
Ok(cursor.position())
120+
runtime
121+
.block_on(async {
122+
VortexWriteOptions::default()
123+
.write(Cursor::new(buf), compressed.into_array_stream())
124+
.await
125+
})
126+
.map(|c| c.position())
133127
}
134128

135129
#[inline(never)]
136130
fn vortex_decompress_read(runtime: &Runtime, buf: Bytes) -> VortexResult<Vec<ArrayRef>> {
137-
async fn async_read(buf: Bytes) -> VortexResult<Vec<ArrayRef>> {
138-
let batches = VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone())
131+
runtime.block_on(async {
132+
VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone())
133+
.with_execution_mode(ExecutionMode::TokioRuntime(Handle::current()))
139134
.open(buf)
140135
.await?
141136
.scan(Scan::all())?
142137
.try_collect::<Vec<_>>()
143138
.await?
144139
.into_iter()
145140
.map(|a| a.into_arrow())
146-
.collect::<VortexResult<Vec<_>>>()?;
147-
148-
Ok(batches)
149-
}
150-
151-
runtime.block_on(async_read(buf))
141+
.collect::<VortexResult<Vec<_>>>()
142+
})
152143
}
153144

154145
fn vortex_compressed_written_size(

0 commit comments

Comments
 (0)