Skip to content

Commit 58d63ad

Browse files
committed
Merge branch 'develop' into ngates/arrow-executor
2 parents b00ea3f + d90bbda commit 58d63ad

File tree

59 files changed

+969
-822
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+969
-822
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -662,13 +662,10 @@ jobs:
662662
- uses: ./.github/actions/setup-rust
663663
with:
664664
repo-token: ${{ secrets.GITHUB_TOKEN }}
665-
toolchain: nightly-2025-06-26
666665
targets: "wasm32-wasip1"
667-
components: "rust-src"
668666
- name: Setup Wasmer
669667
uses: wasmerio/[email protected]
670-
# there is a compiler bug in nightly (but not in nightly-2025-06-26)
671-
- run: cargo +nightly-2025-06-26 -Zbuild-std=panic_abort,std build --target wasm32-wasip1
668+
- run: cargo build --target wasm32-wasip1
672669
working-directory: ./wasm-test
673670
- run: wasmer run ./target/wasm32-wasip1/debug/wasm-test.wasm
674671
working-directory: ./wasm-test

bench-vortex/src/bin/compress.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ fn compress(
109109
.map(|f| Target::new(Engine::default(), *f))
110110
.collect_vec();
111111

112-
let structlistofints = vec![
112+
let structlistofints = [
113113
StructListOfInts::new(100, 1000, 1),
114114
StructListOfInts::new(1000, 1000, 1),
115115
StructListOfInts::new(10000, 1000, 1),

bench-vortex/src/compress/bench.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use {
2424
crate::bench_run::run_with_setup,
2525
crate::utils::parquet::convert_utf8view_batch,
2626
crate::utils::parquet::convert_utf8view_schema,
27+
anyhow::Context,
2728
arrow_array::RecordBatch,
2829
parking_lot::Mutex,
2930
std::fs,
@@ -131,9 +132,7 @@ pub fn benchmark_vortex_decompress(
131132
bench_name: &str,
132133
) -> Result<(Duration, CompressionTimingMeasurement)> {
133134
let mut buf = Vec::new();
134-
runtime
135-
.block_on(vortex_compress_write(uncompressed, &mut buf))
136-
.expect("Failed to compress with vortex for decompression test");
135+
runtime.block_on(vortex_compress_write(uncompressed, &mut buf))?;
137136
let buffer = Bytes::from(buf);
138137

139138
// Run the benchmark and measure time.
@@ -255,7 +254,7 @@ pub fn benchmark_lance_compress(
255254
.collect::<Result<Vec<_>, _>>()?;
256255
let converted_schema = convert_utf8view_schema(&schema);
257256

258-
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
257+
let temp_dir = tempfile::tempdir().context("Failed to create temp dir")?;
259258
let iteration_paths: Arc<Mutex<Vec<PathBuf>>> = Arc::new(Mutex::new(Vec::new()));
260259
let iteration_counter = AtomicU64::new(0);
261260

@@ -289,7 +288,7 @@ pub fn benchmark_lance_compress(
289288
// Calculate size from the last iteration.
290289
let paths = iteration_paths.lock();
291290
let lance_compressed_size_val = if let Some(last_path) = paths.last() {
292-
calculate_lance_size(last_path).expect("Failed to calculate Lance size")
291+
calculate_lance_size(last_path).context("Failed to calculate Lance size")?
293292
} else {
294293
0
295294
};
@@ -320,7 +319,7 @@ pub fn benchmark_lance_decompress(
320319
// NOTE: Lance requires filesystem access unlike Parquet/Vortex which use in-memory buffers.
321320
let chunked = uncompressed.as_::<ChunkedVTable>().clone();
322321
let (batches, schema) = chunked_to_vec_record_batch(chunked);
323-
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
322+
let temp_dir = tempfile::tempdir().context("Failed to create temp dir")?;
324323

325324
// Write the Lance dataset once for all iterations.
326325
let dataset_path = runtime.block_on(async {

bench-vortex/src/engines/df/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,7 @@ pub fn make_object_store(
100100
let s3 = Arc::new(
101101
AmazonS3Builder::from_env()
102102
.with_bucket_name(bucket_name)
103-
.build()
104-
.unwrap(),
103+
.build()?,
105104
);
106105
df.register_object_store(&Url::parse(&format!("s3://{bucket_name}/"))?, s3.clone());
107106
Ok(s3)
@@ -111,8 +110,7 @@ pub fn make_object_store(
111110
let gcs = Arc::new(
112111
GoogleCloudStorageBuilder::from_env()
113112
.with_bucket_name(bucket_name)
114-
.build()
115-
.unwrap(),
113+
.build()?,
116114
);
117115
df.register_object_store(&Url::parse(&format!("gs://{bucket_name}/"))?, gcs.clone());
118116
Ok(gcs)

encodings/fastlanes/benches/bitpacking_take.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ fn patched_take_10k_adversarial(bencher: Bencher) {
237237
(0..(NUM_EXCEPTIONS + 1024) / 1024)
238238
.cycle()
239239
.map(|chunk_idx| BIG_BASE2 - 1024 + chunk_idx * 1024)
240-
.flat_map(|base_idx| (base_idx..(base_idx + per_chunk_count)))
240+
.flat_map(|base_idx| base_idx..(base_idx + per_chunk_count))
241241
.take(10000),
242242
);
243243

encodings/fastlanes/src/delta/array/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ impl DeltaArray {
122122

123123
let lanes = lane_count(ptype);
124124

125-
if (deltas.len() % 1024 == 0) != (bases.len() % lanes == 0) {
125+
if deltas.len().is_multiple_of(1024) != bases.len().is_multiple_of(lanes) {
126126
vortex_bail!(
127127
"deltas length ({}) is a multiple of 1024 iff bases length ({}) is a multiple of LANES ({})",
128128
deltas.len(),

encodings/runend/src/array.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use vortex_scalar::PValue;
4545
use crate::compress::runend_decode_bools;
4646
use crate::compress::runend_decode_primitive;
4747
use crate::compress::runend_encode;
48+
use crate::rules::RULES;
4849

4950
vtable!(RunEnd);
5051

@@ -132,6 +133,14 @@ impl VTable for RunEndVTable {
132133

133134
Ok(())
134135
}
136+
137+
fn reduce_parent(
138+
array: &Self::Array,
139+
parent: &ArrayRef,
140+
child_idx: usize,
141+
) -> VortexResult<Option<ArrayRef>> {
142+
RULES.evaluate(array, parent, child_idx)
143+
}
135144
}
136145

137146
#[derive(Clone, Debug)]

encodings/runend/src/lib.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,14 @@ pub mod _benchmarking {
2424
use vortex_array::ArrayBufferVisitor;
2525
use vortex_array::ArrayChildVisitor;
2626
use vortex_array::Canonical;
27-
use vortex_array::session::ArraySession;
2827
use vortex_array::session::ArraySessionExt;
2928
use vortex_array::vtable::ArrayVTableExt;
3029
use vortex_array::vtable::EncodeVTable;
3130
use vortex_array::vtable::VisitorVTable;
3231
use vortex_error::VortexResult;
33-
use vortex_session::SessionExt;
3432
use vortex_session::VortexSession;
3533

3634
use crate::compress::runend_encode;
37-
use crate::rules::RunEndScalarFnRule;
3835

3936
impl EncodeVTable<RunEndVTable> for RunEndVTable {
4037
fn encode(
@@ -69,10 +66,6 @@ impl VisitorVTable<RunEndVTable> for RunEndVTable {
6966
/// Initialize run-end encoding in the given session.
7067
pub fn initialize(session: &mut VortexSession) {
7168
session.arrays().register(RunEndVTable.as_vtable());
72-
session
73-
.get_mut::<ArraySession>()
74-
.optimizer_mut()
75-
.register_parent_rule(RunEndScalarFnRule);
7669
}
7770

7871
#[cfg(test)]

encodings/runend/src/rules.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,24 @@ use vortex_array::arrays::ConstantArray;
88
use vortex_array::arrays::ConstantVTable;
99
use vortex_array::arrays::ScalarFnArray;
1010
use vortex_array::optimizer::rules::ArrayParentReduceRule;
11-
use vortex_array::optimizer::rules::Exact;
11+
use vortex_array::optimizer::rules::ParentRuleSet;
1212
use vortex_dtype::DType;
1313
use vortex_error::VortexResult;
1414

1515
use crate::RunEndArray;
1616
use crate::RunEndVTable;
1717

18+
pub(super) const RULES: ParentRuleSet<RunEndVTable> =
19+
ParentRuleSet::new(&[ParentRuleSet::lift(&RunEndScalarFnRule)]);
20+
1821
/// A rule to push down scalar functions through run-end encoding into the values array.
1922
///
2023
/// This only works if all other children of the scalar function array are constants.
2124
#[derive(Debug)]
2225
pub(crate) struct RunEndScalarFnRule;
2326

24-
impl ArrayParentReduceRule<Exact<RunEndVTable>, AnyScalarFn> for RunEndScalarFnRule {
25-
fn child(&self) -> Exact<RunEndVTable> {
26-
Exact::from(&RunEndVTable)
27-
}
27+
impl ArrayParentReduceRule<RunEndVTable> for RunEndScalarFnRule {
28+
type Parent = AnyScalarFn;
2829

2930
fn parent(&self) -> AnyScalarFn {
3031
AnyScalarFn

vortex-array/benches/chunk_array_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ fn make_opt_bool_chunks(len: usize, chunk_count: usize) -> ArrayRef {
116116
let mut rng = StdRng::seed_from_u64(0);
117117

118118
const SPAN_LEN: usize = 10;
119-
assert!(len % SPAN_LEN == 0);
119+
assert!(len.is_multiple_of(SPAN_LEN));
120120

121121
(0..chunk_count)
122122
.map(|_| {

0 commit comments

Comments
 (0)