Skip to content

Commit 475e3aa

Browse files
authored
Merge branch 'develop' into db/failing-stream
2 parents d534e33 + baafc71 commit 475e3aa

File tree

47 files changed

+2507
-289
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2507
-289
lines changed

.github/runs-on.yml

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,3 @@ images:
99
arch: "arm64"
1010
name: "vortex-ci-*"
1111
owner: "375504701696"
12-
13-
pools:
14-
# Windows pool - covers UK (8am-6pm GMT) and US-East (8am-6pm EST)
15-
# Combined in UK timezone: 8am-11pm (UK 8-6 + NYC 8-6 = UK 8-11pm)
16-
# Stopped instances only (~20-30s boot vs 2-3min cold-start)
17-
# Cost: ~$5/month (2 × $2.40 EBS storage)
18-
windows-x64:
19-
runner: family=m7i/cpu=8/image=windows22-full-x64/tag=rust-test-windows
20-
timezone: "Europe/London"
21-
schedule:
22-
- name: working-hours
23-
match:
24-
day: ["monday", "tuesday", "wednesday", "thursday", "friday"]
25-
time: ["08:00", "23:00"]
26-
stopped: 2
27-
- name: default
28-
stopped: 1

.github/workflows/ci.yml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -493,12 +493,7 @@ jobs:
493493
matrix:
494494
include:
495495
- os: windows-x64
496-
runner:
497-
- runs-on=${{ github.run_id }}
498-
- family=m7i
499-
- cpu=8
500-
- image=windows22-full-x64
501-
- tag=rust-test-windows
496+
runner: runs-on=${{ github.run_id }}/pool=windows-x64
502497
- os: linux-arm64
503498
runner:
504499
- runs-on=${{ github.run_id }}

.github/workflows/fuzz.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ jobs:
5656
- name: Run fuzzing target
5757
id: fuzz
5858
run: |
59-
RUSTFLAGS="--cfg vortex_nightly" RUST_BACKTRACE=1 cargo +nightly fuzz run --release --debug-assertions file_io -- -max_total_time=7200 2>&1 | tee fuzz_output.log
59+
RUSTFLAGS="--cfg vortex_nightly" RUST_BACKTRACE=1 cargo +nightly fuzz run --release --debug-assertions file_io -- -max_total_time=7200 -rss_limit_mb=0 2>&1 | tee fuzz_output.log
6060
continue-on-error: true
6161
- name: Check for crashes
6262
id: check
@@ -189,7 +189,7 @@ jobs:
189189
- name: Run fuzzing target
190190
id: fuzz
191191
run: |
192-
RUSTFLAGS="--cfg vortex_nightly" RUST_BACKTRACE=1 cargo +nightly fuzz run --release --debug-assertions array_ops -- -max_total_time=7200 2>&1 | tee fuzz_output.log
192+
RUSTFLAGS="--cfg vortex_nightly" RUST_BACKTRACE=1 cargo +nightly fuzz run --release --debug-assertions array_ops -- -max_total_time=7200 -rss_limit_mb=0 2>&1 | tee fuzz_output.log
193193
continue-on-error: true
194194
- name: Check for crashes
195195
id: check

.github/workflows/fuzzer-fix-automation.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ jobs:
187187
echo "Attempting to reproduce crash with fuzzer (debug mode)..."
188188
189189
# Run fuzzer with crash file (debug mode, no sanitizer, full backtrace)
190-
RUSTFLAGS="--cfg vortex_nightly" RUST_BACKTRACE=full timeout 30s cargo +nightly fuzz run --dev --sanitizer=none "${{ steps.extract.outputs.target }}" "${{ steps.download.outputs.crash_file_path }}" -- -runs=1 2>&1 | tee crash_reproduction.log
190+
RUSTFLAGS="--cfg vortex_nightly" RUST_BACKTRACE=full timeout 30s cargo +nightly fuzz run --dev --sanitizer=none "${{ steps.extract.outputs.target }}" "${{ steps.download.outputs.crash_file_path }}" -- -runs=1 -rss_limit_mb=0 2>&1 | tee crash_reproduction.log
191191
192192
FUZZ_EXIT_CODE=${PIPESTATUS[0]}
193193
@@ -216,7 +216,7 @@ jobs:
216216
217217
I ran:
218218
\`\`\`bash
219-
cargo +nightly fuzz run --sanitizer=none ${{ steps.extract.outputs.target }} ${{ steps.download.outputs.crash_file_path }} -- -runs=1
219+
cargo +nightly fuzz run --sanitizer=none ${{ steps.extract.outputs.target }} ${{ steps.download.outputs.crash_file_path }} -- -runs=1 -rss_limit_mb=0
220220
\`\`\`
221221
222222
The fuzzer exited with code 0 (success).
@@ -275,7 +275,7 @@ jobs:
275275
- This ensures your work is visible and reviewable even if you hit the turn limit
276276
- Keep fixes minimal - only fix the specific bug
277277
- Follow CLAUDE.md code style guidelines
278-
- **Use `--dev` flag** for faster builds: `cargo +nightly fuzz run --dev --sanitizer=none`
278+
- **Use `--dev` flag** for faster builds: `cargo +nightly fuzz run --dev --sanitizer=none <target> <crash_file> -- -rss_limit_mb=0`
279279
280280
## Fixability Guidelines
281281

.github/workflows/report-fuzz-crash.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ jobs:
155155
### Reproduction
156156
157157
```bash
158-
cargo +nightly fuzz run --sanitizer=none $FUZZ_TARGET $FUZZ_TARGET/$CRASH_FILE
158+
cargo +nightly fuzz run -D --sanitizer=none $FUZZ_TARGET $FUZZ_TARGET/$CRASH_FILE -- -rss_limit_mb=0
159159
```
160160
161161
---
@@ -219,12 +219,12 @@ jobs:
219219
2. Reproduce locally:
220220
```bash
221221
# The artifact contains $FUZZ_TARGET/$CRASH_FILE
222-
cargo +nightly fuzz run --sanitizer=none $FUZZ_TARGET $FUZZ_TARGET/$CRASH_FILE
222+
cargo +nightly fuzz run -D --sanitizer=none $FUZZ_TARGET $FUZZ_TARGET/$CRASH_FILE -- -rss_limit_mb=0
223223
```
224224
225225
3. Get full backtrace:
226226
```bash
227-
RUST_BACKTRACE=full cargo +nightly fuzz run --sanitizer=none $FUZZ_TARGET $FUZZ_TARGET/$CRASH_FILE
227+
RUST_BACKTRACE=full cargo +nightly fuzz run -D --sanitizer=none $FUZZ_TARGET $FUZZ_TARGET/$CRASH_FILE -- -rss_limit_mb=0
228228
```
229229
230230
---

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench-vortex/src/random_access/take.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@ use stream::StreamExt;
2525
use vortex::array::Array;
2626
use vortex::array::ArrayRef;
2727
use vortex::array::IntoArray;
28+
use vortex::array::VectorExecutor;
2829
use vortex::array::stream::ArrayStreamExt;
30+
use vortex::array::vectors::VectorIntoArray;
2931
use vortex::buffer::Buffer;
3032
use vortex::file::OpenOptionsSessionExt;
33+
use vortex::layout::layouts::USE_VORTEX_OPERATORS;
3134
use vortex::utils::aliases::hash_map::HashMap;
3235

3336
use crate::SESSION;
@@ -43,18 +46,22 @@ pub async fn take_vortex_tokio(
4346
}
4447

4548
async fn take_vortex(reader: impl AsRef<Path>, indices: Buffer<u64>) -> anyhow::Result<ArrayRef> {
46-
Ok(SESSION
49+
let array = SESSION
4750
.open_options()
4851
.open(reader.as_ref())
4952
.await?
5053
.scan()?
5154
.with_row_indices(indices)
5255
.into_array_stream()?
5356
.read_all()
54-
.await?
55-
// We canonicalize / decompress for equivalence to Arrow's `RecordBatch`es.
56-
.to_canonical()
57-
.into_array())
57+
.await?;
58+
59+
// We canonicalize / decompress for equivalence to Arrow's `RecordBatch`es.
60+
Ok(if *USE_VORTEX_OPERATORS {
61+
array.execute_vector(&SESSION)?.into_array(array.dtype())
62+
} else {
63+
array.to_canonical().into_array()
64+
})
5865
}
5966

6067
pub async fn take_parquet(path: &Path, indices: Buffer<u64>) -> anyhow::Result<RecordBatch> {

encodings/fsst/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ workspace = true
1818

1919
[dependencies]
2020
fsst-rs = { workspace = true }
21+
num-traits = { workspace = true }
2122
prost = { workspace = true }
2223
rand = { workspace = true, optional = true }
2324
vortex-array = { workspace = true }

encodings/fsst/src/array.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use vortex_array::ArrayHash;
1818
use vortex_array::ArrayRef;
1919
use vortex_array::Canonical;
2020
use vortex_array::DeserializeMetadata;
21+
use vortex_array::ExecutionCtx;
2122
use vortex_array::Precision;
2223
use vortex_array::ProstMetadata;
2324
use vortex_array::SerializeMetadata;
@@ -46,9 +47,11 @@ use vortex_error::VortexResult;
4647
use vortex_error::vortex_bail;
4748
use vortex_error::vortex_ensure;
4849
use vortex_error::vortex_err;
50+
use vortex_vector::Vector;
4951

5052
use crate::fsst_compress;
5153
use crate::fsst_train_compressor;
54+
use crate::kernel::PARENT_KERNELS;
5255

5356
vtable!(FSST);
5457

@@ -178,6 +181,15 @@ impl VTable for FSSTVTable {
178181

179182
Ok(())
180183
}
184+
185+
fn execute_parent(
186+
array: &Self::Array,
187+
parent: &ArrayRef,
188+
child_idx: usize,
189+
ctx: &mut ExecutionCtx,
190+
) -> VortexResult<Option<Vector>> {
191+
PARENT_KERNELS.execute(array, parent, child_idx, ctx)
192+
}
181193
}
182194

183195
#[derive(Clone)]

encodings/fsst/src/kernel.rs

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::sync::Arc;
5+
6+
use fsst::Decompressor;
7+
use num_traits::AsPrimitive;
8+
use vortex_array::Array;
9+
use vortex_array::ExecutionCtx;
10+
use vortex_array::VectorExecutor;
11+
use vortex_array::arrays::FilterArray;
12+
use vortex_array::arrays::FilterVTable;
13+
use vortex_array::builtins::ArrayBuiltins;
14+
use vortex_array::kernel::ExecuteParentKernel;
15+
use vortex_array::kernel::ParentKernelSet;
16+
use vortex_array::mask::MaskExecutor;
17+
use vortex_array::matchers::Exact;
18+
use vortex_array::validity::Validity;
19+
use vortex_array::vtable::ValidityHelper;
20+
use vortex_buffer::Buffer;
21+
use vortex_buffer::BufferMut;
22+
use vortex_buffer::ByteBuffer;
23+
use vortex_buffer::ByteBufferMut;
24+
use vortex_dtype::DType;
25+
use vortex_dtype::IntegerPType;
26+
use vortex_dtype::Nullability;
27+
use vortex_dtype::PType;
28+
use vortex_dtype::PTypeDowncastExt;
29+
use vortex_dtype::match_each_integer_ptype;
30+
use vortex_error::VortexResult;
31+
use vortex_mask::Mask;
32+
use vortex_mask::MaskValues;
33+
use vortex_vector::Vector;
34+
use vortex_vector::binaryview::BinaryVector;
35+
use vortex_vector::binaryview::BinaryView;
36+
use vortex_vector::binaryview::StringVector;
37+
38+
use crate::FSSTArray;
39+
use crate::FSSTVTable;
40+
41+
pub(super) const PARENT_KERNELS: ParentKernelSet<FSSTVTable> =
42+
ParentKernelSet::new(&[ParentKernelSet::lift(&FSSTFilterKernel)]);
43+
44+
#[derive(Debug)]
45+
struct FSSTFilterKernel;
46+
47+
impl ExecuteParentKernel<FSSTVTable> for FSSTFilterKernel {
48+
type Parent = Exact<FilterVTable>;
49+
50+
fn parent(&self) -> Self::Parent {
51+
Exact::from(&FilterVTable)
52+
}
53+
54+
fn execute_parent(
55+
&self,
56+
array: &FSSTArray,
57+
parent: &FilterArray,
58+
_child_idx: usize,
59+
ctx: &mut ExecutionCtx,
60+
) -> VortexResult<Option<Vector>> {
61+
let mask_values = match parent.filter_mask() {
62+
Mask::AllTrue(_) | Mask::AllFalse(_) => return Ok(None),
63+
Mask::Values(v) => v,
64+
};
65+
66+
// We filter the uncompressed lengths
67+
let uncompressed_lens = array
68+
.uncompressed_lengths()
69+
.filter(parent.filter_mask().clone())?
70+
.execute(ctx)?
71+
.into_primitive();
72+
73+
// Extract the filtered validity
74+
let validity = match array.codes().validity().filter(parent.filter_mask())? {
75+
Validity::NonNullable | Validity::AllValid => {
76+
Mask::new_true(parent.filter_mask().true_count())
77+
}
78+
Validity::AllInvalid => Mask::new_false(parent.filter_mask().true_count()),
79+
Validity::Array(a) => a.execute_mask(ctx.session())?,
80+
};
81+
82+
// First we unpack the codes VarBinArray to get access to the raw data.
83+
let codes_data = array.codes().bytes();
84+
let codes_offsets = array
85+
.codes()
86+
.offsets()
87+
.cast(DType::Primitive(PType::U32, Nullability::NonNullable))?
88+
.execute(ctx)?
89+
.into_primitive()
90+
.downcast::<u32>()
91+
.into_nonnull_buffer();
92+
93+
let decompressor = array.decompressor();
94+
95+
let (views, buffer) = match_each_integer_ptype!(uncompressed_lens.ptype(), |S| {
96+
fsst_decode::<S>(
97+
decompressor,
98+
codes_data,
99+
&codes_offsets,
100+
mask_values,
101+
&validity,
102+
&uncompressed_lens.downcast::<S>().into_nonnull_buffer(),
103+
)
104+
});
105+
106+
let vector = match array.dtype() {
107+
DType::Binary(_) => unsafe {
108+
BinaryVector::new_unchecked(views, Arc::new(vec![buffer].into()), validity)
109+
}
110+
.into(),
111+
DType::Utf8(_) => unsafe {
112+
StringVector::new_unchecked(views, Arc::new(vec![buffer].into()), validity)
113+
}
114+
.into(),
115+
_ => unreachable!("Not a supported FSST DType"),
116+
};
117+
118+
Ok(Some(vector))
119+
}
120+
}
121+
122+
fn fsst_decode<S: IntegerPType + AsPrimitive<usize> + AsPrimitive<u32>>(
123+
decompressor: Decompressor,
124+
codes_data: &[u8],
125+
codes_offsets: &[u32],
126+
filter_mask: &MaskValues,
127+
filtered_validity: &Mask,
128+
filtered_uncompressed_lengths: &[S],
129+
) -> (Buffer<BinaryView>, ByteBuffer) {
130+
let total_uncompressed_size: usize = filtered_uncompressed_lengths
131+
.iter()
132+
.map(|x| <S as AsPrimitive<usize>>::as_(*x))
133+
.sum();
134+
135+
// We allocate an extra 7 bytes per the FSST decompressor's requirement for padding.
136+
let mut uncompressed = ByteBufferMut::with_capacity(total_uncompressed_size + 7);
137+
let mut spare_capacity = uncompressed.spare_capacity_mut();
138+
139+
match filtered_validity {
140+
Mask::AllTrue(_) => {
141+
for &idx in filter_mask.indices() {
142+
let start = codes_offsets[idx] as usize;
143+
let end = codes_offsets[idx + 1] as usize;
144+
let compressed_slice = &codes_data[start..end];
145+
146+
let uncompressed_len =
147+
decompressor.decompress_into(compressed_slice, spare_capacity);
148+
spare_capacity = &mut spare_capacity[uncompressed_len..];
149+
}
150+
}
151+
Mask::AllFalse(_) => {
152+
// Nothing to decompress
153+
unsafe { uncompressed.set_len(0) };
154+
return (Buffer::empty(), uncompressed.freeze());
155+
}
156+
Mask::Values(values) => {
157+
for (filtered_idx, (idx, is_valid)) in filter_mask
158+
.indices()
159+
.iter()
160+
.copied()
161+
.zip(values.bit_buffer().iter())
162+
.enumerate()
163+
{
164+
if is_valid {
165+
let start = codes_offsets[idx] as usize;
166+
let end = codes_offsets[idx + 1] as usize;
167+
let compressed_slice = &codes_data[start..end];
168+
169+
let uncompressed_len =
170+
decompressor.decompress_into(compressed_slice, spare_capacity);
171+
spare_capacity = &mut spare_capacity[uncompressed_len..];
172+
} else {
173+
// We advance the output buffer to make it faster to assemble views below.
174+
spare_capacity =
175+
&mut spare_capacity[filtered_uncompressed_lengths[filtered_idx].as_()..];
176+
}
177+
}
178+
}
179+
}
180+
181+
unsafe { uncompressed.set_len(total_uncompressed_size) };
182+
let uncompressed = uncompressed.freeze();
183+
let uncompressed_slice = uncompressed.as_ref();
184+
185+
// Loop over the uncompressed lengths to construct the BinaryViews.
186+
let mut views = BufferMut::<BinaryView>::with_capacity(filtered_uncompressed_lengths.len());
187+
let mut offset = 0u32;
188+
for len in filtered_uncompressed_lengths {
189+
let view = BinaryView::make_view(
190+
&uncompressed_slice[offset as usize..][..len.as_()],
191+
0u32,
192+
offset,
193+
);
194+
offset += <S as AsPrimitive<u32>>::as_(*len);
195+
unsafe { views.push_unchecked(view) };
196+
}
197+
unsafe { views.set_len(filtered_uncompressed_lengths.len()) };
198+
199+
(views.freeze(), uncompressed)
200+
}

0 commit comments

Comments
 (0)