Skip to content

Commit 8f5b749

Browse files
authored
feat: teach LayoutBatchStream to filter by indices (#1242)
I have some thoughts on how to make the stream.rs logic easier to follow; however, this, I think, is the fewest changes to achieve indices filtering. This change evaluates the filter on any row in a chunk containing at least one kept index. I have some thoughts on how to avoid evaluating the filter at a sub-chunk level but they require more substantial changes.
1 parent 78f48c6 commit 8f5b749

File tree

9 files changed

+367
-31
lines changed

9 files changed

+367
-31
lines changed

pyvortex/python/vortex/dataset.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,9 @@ def take(
230230
table : :class:`.pyarrow.Table`
231231
232232
"""
233-
return self._dataset.to_array(columns=columns, row_filter=filter).take(encoding.array(indices)).to_arrow_table()
233+
return self._dataset.to_array(
234+
columns=columns, row_filter=filter, indices=encoding.array(indices)
235+
).to_arrow_table()
234236

235237
def to_record_batch_reader(
236238
self,

pyvortex/src/dataset.rs

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub async fn layout_stream_from_reader<T: VortexReadAt + Unpin>(
2828
reader: T,
2929
projection: Projection,
3030
row_filter: Option<RowFilter>,
31+
indices: Option<Array>,
3132
) -> VortexResult<LayoutBatchStream<T>> {
3233
let mut builder = LayoutBatchStreamBuilder::new(
3334
reader,
@@ -42,15 +43,20 @@ pub async fn layout_stream_from_reader<T: VortexReadAt + Unpin>(
4243
builder = builder.with_row_filter(row_filter);
4344
}
4445

46+
if let Some(indices) = indices {
47+
builder = builder.with_indices(indices);
48+
}
49+
4550
builder.build().await
4651
}
4752

4853
pub async fn read_array_from_reader<T: VortexReadAt + Unpin + 'static>(
4954
reader: T,
5055
projection: Projection,
5156
row_filter: Option<RowFilter>,
57+
indices: Option<Array>,
5258
) -> VortexResult<Array> {
53-
layout_stream_from_reader(reader, projection, row_filter)
59+
layout_stream_from_reader(reader, projection, row_filter, indices)
5460
.await?
5561
.read_all()
5662
.await
@@ -119,11 +125,13 @@ impl TokioFileDataset {
119125
&self,
120126
columns: Option<Vec<Bound<'_, PyAny>>>,
121127
row_filter: Option<&Bound<'_, PyExpr>>,
128+
indices: Option<&PyArray>,
122129
) -> PyResult<PyArray> {
123130
let inner = read_array_from_reader(
124131
self.file().await?,
125132
projection_from_python(columns)?,
126133
row_filter_from_python(row_filter),
134+
indices.map(PyArray::unwrap).cloned(),
127135
)
128136
.await?;
129137
Ok(PyArray::new(inner))
@@ -133,11 +141,13 @@ impl TokioFileDataset {
133141
self_: PyRef<'_, Self>,
134142
columns: Option<Vec<Bound<'_, PyAny>>>,
135143
row_filter: Option<&Bound<'_, PyExpr>>,
144+
indices: Option<&PyArray>,
136145
) -> PyResult<PyObject> {
137146
let layout_reader = layout_stream_from_reader(
138147
self_.file().await?,
139148
projection_from_python(columns)?,
140149
row_filter_from_python(row_filter),
150+
indices.map(PyArray::unwrap).cloned(),
141151
)
142152
.await?;
143153

@@ -154,23 +164,25 @@ impl TokioFileDataset {
154164
self_.schema.clone().to_pyarrow(self_.py())
155165
}
156166

157-
#[pyo3(signature = (*, columns=None, row_filter=None))]
167+
#[pyo3(signature = (*, columns = None, row_filter = None, indices = None))]
158168
pub fn to_array(
159169
&self,
160170
columns: Option<Vec<Bound<'_, PyAny>>>,
161171
row_filter: Option<&Bound<'_, PyExpr>>,
172+
indices: Option<&PyArray>,
162173
) -> PyResult<PyArray> {
163-
TOKIO_RUNTIME.block_on(self.async_to_array(columns, row_filter))
174+
TOKIO_RUNTIME.block_on(self.async_to_array(columns, row_filter, indices))
164175
}
165176

166-
#[pyo3(signature = (*, columns=None, row_filter=None))]
177+
#[pyo3(signature = (*, columns = None, row_filter = None, indices = None))]
167178
pub fn to_record_batch_reader(
168179
self_: PyRef<Self>,
169180
columns: Option<Vec<Bound<'_, PyAny>>>,
170181
row_filter: Option<&Bound<'_, PyExpr>>,
182+
indices: Option<&PyArray>,
171183
) -> PyResult<PyObject> {
172184
TOKIO_RUNTIME.block_on(Self::async_to_record_batch_reader(
173-
self_, columns, row_filter,
185+
self_, columns, row_filter, indices,
174186
))
175187
}
176188
}
@@ -197,11 +209,13 @@ impl ObjectStoreUrlDataset {
197209
&self,
198210
columns: Option<Vec<Bound<'_, PyAny>>>,
199211
row_filter: Option<&Bound<'_, PyExpr>>,
212+
indices: Option<&PyArray>,
200213
) -> PyResult<PyArray> {
201214
let inner = read_array_from_reader(
202215
self.reader().await?,
203216
projection_from_python(columns)?,
204217
row_filter_from_python(row_filter),
218+
indices.map(PyArray::unwrap).cloned(),
205219
)
206220
.await?;
207221
Ok(PyArray::new(inner))
@@ -211,11 +225,13 @@ impl ObjectStoreUrlDataset {
211225
self_: PyRef<'_, Self>,
212226
columns: Option<Vec<Bound<'_, PyAny>>>,
213227
row_filter: Option<&Bound<'_, PyExpr>>,
228+
indices: Option<&PyArray>,
214229
) -> PyResult<PyObject> {
215230
let layout_reader = layout_stream_from_reader(
216231
self_.reader().await?,
217232
projection_from_python(columns)?,
218233
row_filter_from_python(row_filter),
234+
indices.map(PyArray::unwrap).cloned(),
219235
)
220236
.await?;
221237

@@ -232,23 +248,25 @@ impl ObjectStoreUrlDataset {
232248
self_.schema.clone().to_pyarrow(self_.py())
233249
}
234250

235-
#[pyo3(signature = (*, columns=None, row_filter=None))]
251+
#[pyo3(signature = (*, columns = None, row_filter = None, indices = None))]
236252
pub fn to_array(
237253
&self,
238254
columns: Option<Vec<Bound<'_, PyAny>>>,
239255
row_filter: Option<&Bound<'_, PyExpr>>,
256+
indices: Option<&PyArray>,
240257
) -> PyResult<PyArray> {
241-
TOKIO_RUNTIME.block_on(self.async_to_array(columns, row_filter))
258+
TOKIO_RUNTIME.block_on(self.async_to_array(columns, row_filter, indices))
242259
}
243260

244-
#[pyo3(signature = (*, columns=None, row_filter=None))]
261+
#[pyo3(signature = (*, columns = None, row_filter = None, indices = None))]
245262
pub fn to_record_batch_reader(
246263
self_: PyRef<Self>,
247264
columns: Option<Vec<Bound<'_, PyAny>>>,
248265
row_filter: Option<&Bound<'_, PyExpr>>,
266+
indices: Option<&PyArray>,
249267
) -> PyResult<PyObject> {
250268
TOKIO_RUNTIME.block_on(Self::async_to_record_batch_reader(
251-
self_, columns, row_filter,
269+
self_, columns, row_filter, indices,
252270
))
253271
}
254272
}

pyvortex/src/io.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,15 @@ use crate::{PyArray, TOKIO_RUNTIME};
125125
/// >>> # b.to_arrow_array()
126126
///
127127
#[pyfunction]
128-
#[pyo3(signature = (path, *, projection = None, row_filter = None))]
128+
#[pyo3(signature = (path, *, projection = None, row_filter = None, indices = None))]
129129
pub fn read_path(
130130
path: Bound<PyString>,
131131
projection: Option<Vec<Bound<PyAny>>>,
132132
row_filter: Option<&Bound<PyExpr>>,
133+
indices: Option<&PyArray>,
133134
) -> PyResult<PyArray> {
134135
let dataset = TOKIO_RUNTIME.block_on(TokioFileDataset::try_new(path.extract()?))?;
135-
dataset.to_array(projection, row_filter)
136+
dataset.to_array(projection, row_filter, indices)
136137
}
137138

138139
/// Read a vortex struct array from a URL.
@@ -177,14 +178,15 @@ pub fn read_path(
177178
/// >>> a = vortex.io.read_url("file:/path/to/dataset.vortex") # doctest: +SKIP
178179
///
179180
#[pyfunction]
180-
#[pyo3(signature = (url, *, projection = None, row_filter = None))]
181+
#[pyo3(signature = (url, *, projection = None, row_filter = None, indices = None))]
181182
pub fn read_url(
182183
url: Bound<PyString>,
183184
projection: Option<Vec<Bound<PyAny>>>,
184185
row_filter: Option<&Bound<PyExpr>>,
186+
indices: Option<&PyArray>,
185187
) -> PyResult<PyArray> {
186188
let dataset = TOKIO_RUNTIME.block_on(ObjectStoreUrlDataset::try_new(url.extract()?))?;
187-
dataset.to_array(projection, row_filter)
189+
dataset.to_array(projection, row_filter, indices)
188190
}
189191

190192
/// Write a vortex struct array to the local filesystem.

pyvortex/test/test_dataset.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ def ds(tmpdir_factory) -> vortex.dataset.VortexDataset:
2424
if not os.path.exists(fname):
2525
a = pa.array([record(x) for x in range(1_000_000)])
2626
arr = vortex.encoding.compress(vortex.array(a))
27-
vortex.io.write_path(arr, "/tmp/foo.vortex")
28-
return vortex.dataset.VortexDataset.from_path("/tmp/foo.vortex")
27+
vortex.io.write_path(arr, str(fname))
28+
return vortex.dataset.VortexDataset.from_path(str(fname))
2929

3030

3131
def test_schema(ds):

vortex-serde/src/layouts/read/buffered.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ impl BufferedLayoutReader {
2828

2929
// TODO(robert): Support out of order reads
3030
fn buffer_read(&mut self, mask: &RowMask) -> VortexResult<Option<Vec<Message>>> {
31+
while let Some(((begin, end), layout)) = self.layouts.pop_front() {
32+
if mask.begin() <= begin && begin < mask.end()
33+
|| mask.begin() < end && end <= mask.end()
34+
{
35+
self.layouts.push_front(((begin, end), layout));
36+
break;
37+
}
38+
}
39+
3140
while let Some(((begin, end), mut layout)) = self.layouts.pop_front() {
3241
// This selection doesn't know about rows in this chunk, we should put it back and wait for another request with different range
3342
if mask.end() <= begin || mask.begin() > end {

vortex-serde/src/layouts/read/builder.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use vortex_error::VortexResult;
55
use vortex_expr::Select;
66
use vortex_schema::projection::Projection;
77

8+
use super::RowMask;
89
use crate::io::VortexReadAt;
910
use crate::layouts::read::cache::{LayoutMessageCache, LazyDeserializedDType, RelativeLayoutCache};
1011
use crate::layouts::read::context::LayoutDeserializer;
@@ -18,7 +19,7 @@ pub struct LayoutBatchStreamBuilder<R> {
1819
layout_serde: LayoutDeserializer,
1920
projection: Option<Projection>,
2021
size: Option<u64>,
21-
indices: Option<Array>,
22+
row_mask: Option<Array>,
2223
row_filter: Option<RowFilter>,
2324
}
2425

@@ -28,9 +29,9 @@ impl<R: VortexReadAt> LayoutBatchStreamBuilder<R> {
2829
reader,
2930
layout_serde,
3031
projection: None,
31-
row_filter: None,
3232
size: None,
33-
indices: None,
33+
row_mask: None,
34+
row_filter: None,
3435
}
3536
}
3637

@@ -45,12 +46,12 @@ impl<R: VortexReadAt> LayoutBatchStreamBuilder<R> {
4546
}
4647

4748
pub fn with_indices(mut self, array: Array) -> Self {
48-
// TODO(#441): Allow providing boolean masks
4949
assert!(
50-
array.dtype().is_int(),
51-
"Mask arrays have to be integer arrays"
50+
!array.dtype().is_nullable() && (array.dtype().is_int() || array.dtype().is_boolean()),
51+
"Mask arrays have to be non-nullable integer or boolean arrays"
5252
);
53-
self.indices = Some(array);
53+
54+
self.row_mask = Some(array);
5455
self
5556
}
5657

@@ -87,22 +88,34 @@ impl<R: VortexReadAt> LayoutBatchStreamBuilder<R> {
8788

8889
let filter_reader = self
8990
.row_filter
90-
.as_ref()
91-
.map(|filter| {
91+
.map(|row_filter| {
9292
footer.layout(
93-
Scan::new(Some(Arc::new(filter.clone()))),
93+
Scan::new(Some(Arc::new(row_filter))),
9494
RelativeLayoutCache::new(message_cache.clone(), footer_dtype),
9595
)
9696
})
9797
.transpose()?;
9898

99+
let row_mask = self
100+
.row_mask
101+
.as_ref()
102+
.map(|row_mask| {
103+
if row_mask.dtype().is_int() {
104+
RowMask::from_index_array(row_mask, 0, row_count as usize)
105+
} else {
106+
RowMask::from_mask_array(row_mask, 0, row_count as usize)
107+
}
108+
})
109+
.transpose()?;
110+
99111
Ok(LayoutBatchStream::new(
100112
self.reader,
101113
data_reader,
102114
filter_reader,
103115
message_cache,
104116
projected_dtype,
105117
row_count,
118+
row_mask,
106119
))
107120
}
108121

vortex-serde/src/layouts/read/mask.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,21 @@ impl RowMask {
121121
}
122122
}
123123

124+
/// Unset, in place, any bits that are unset in `other`.
125+
pub fn and_inplace(&mut self, other: &RowMask) -> VortexResult<()> {
126+
if self.begin != other.begin || self.end != other.end {
127+
vortex_bail!(
128+
"begin and ends must match: {}-{} {}-{}",
129+
self.begin,
130+
self.end,
131+
other.begin,
132+
other.end
133+
);
134+
}
135+
self.values.and_inplace(&other.values);
136+
Ok(())
137+
}
138+
124139
/// Filter array with this `RowMask`.
125140
///
126141
/// This function assumes that Array is no longer than the mask length and that the mask starts on same offset as the array,

0 commit comments

Comments
 (0)