Skip to content

Commit 92a4e32

Browse files
committed
a few things
- add integration test for vortex-file - fix bug in PathStrategy descend - return Writer back from the write pathway Signed-off-by: Andrew Duffy <[email protected]>
1 parent fd2543b commit 92a4e32

File tree

4 files changed

+145
-17
lines changed

4 files changed

+145
-17
lines changed

vortex-file/src/counting.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use vortex_io::VortexWrite;
1111

1212
/// A wrapper around an `VortexWrite` that counts the number of bytes written.
1313
pub(crate) struct CountingVortexWrite<W> {
14-
inner: W,
15-
bytes_written: Arc<AtomicU64>,
14+
pub(crate) inner: W,
15+
pub(crate) bytes_written: Arc<AtomicU64>,
1616
}
1717

1818
impl<W: VortexWrite> CountingVortexWrite<W> {

vortex-file/src/writer.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl VortexWriteOptions {
128128
self,
129129
write: W,
130130
stream: S,
131-
) -> VortexResult<WriteSummary> {
131+
) -> VortexResult<(WriteSummary, W)> {
132132
self.write_internal(write, ArrayStreamExt::boxed(stream))
133133
.await
134134
}
@@ -137,7 +137,7 @@ impl VortexWriteOptions {
137137
self,
138138
mut write: W,
139139
stream: SendableArrayStream,
140-
) -> VortexResult<WriteSummary> {
140+
) -> VortexResult<(WriteSummary, W)> {
141141
// Set up a Context to capture the encodings used in the file.
142142
let ctx = ArrayContext::empty();
143143
let dtype = stream.dtype().clone();
@@ -216,10 +216,13 @@ impl VortexWriteOptions {
216216

217217
write.flush().await?;
218218

219-
Ok(WriteSummary {
220-
footer,
221-
size: position,
222-
})
219+
Ok((
220+
WriteSummary {
221+
footer,
222+
size: position,
223+
},
224+
write,
225+
))
223226
}
224227

225228
/// Create a push-based [`Writer`] that can be used to incrementally write arrays to the file.
@@ -233,7 +236,11 @@ impl VortexWriteOptions {
233236
let write = CountingVortexWrite::new(write);
234237
let bytes_written = write.counter();
235238
let strategy = self.strategy.clone();
236-
let future = self.write(write, arrays).boxed_local().fuse();
239+
let future = self
240+
.write(write, arrays)
241+
.map(move |result| result.map(|(summary, _writer)| summary))
242+
.boxed_local()
243+
.fuse();
237244

238245
Writer {
239246
arrays: Some(arrays_send),
@@ -370,11 +377,13 @@ impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
370377
write: W,
371378
iter: impl ArrayIterator + Send + 'static,
372379
) -> VortexResult<WriteSummary> {
373-
self.runtime.block_on(async move {
374-
self.options
375-
.write(BlockingWriteAdapter(write), iter.into_array_stream())
376-
.await
377-
})
380+
self.runtime
381+
.block_on(async move {
382+
self.options
383+
.write(BlockingWriteAdapter(write), iter.into_array_stream())
384+
.await
385+
})
386+
.map(|(summary, _)| summary)
378387
}
379388

380389
pub fn writer<'w, W: Write + Unpin + 'w>(
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
#![allow(clippy::tests_outside_test_module)]
5+
6+
use std::sync::Arc;
7+
use std::sync::LazyLock;
8+
9+
use futures::StreamExt;
10+
use futures::pin_mut;
11+
use vortex_array::Array;
12+
use vortex_array::IntoArray;
13+
use vortex_array::ToCanonical;
14+
use vortex_array::arrays::PrimitiveArray;
15+
use vortex_array::arrays::StructArray;
16+
use vortex_array::expr::session::ExprSession;
17+
use vortex_array::session::ArraySession;
18+
use vortex_array::validity::Validity;
19+
use vortex_buffer::ByteBuffer;
20+
use vortex_dtype::FieldNames;
21+
use vortex_dtype::field_path;
22+
use vortex_file::OpenOptionsSessionExt;
23+
use vortex_file::WriteOptionsSessionExt;
24+
use vortex_io::session::RuntimeSession;
25+
use vortex_layout::layouts::compressed::CompressingStrategy;
26+
use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
27+
use vortex_layout::layouts::path::PathStrategy;
28+
use vortex_layout::session::LayoutSession;
29+
use vortex_metrics::VortexMetrics;
30+
use vortex_session::VortexSession;
31+
32+
static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
33+
let session = VortexSession::empty()
34+
.with::<VortexMetrics>()
35+
.with::<ArraySession>()
36+
.with::<LayoutSession>()
37+
.with::<ExprSession>()
38+
.with::<RuntimeSession>();
39+
40+
vortex_file::register_default_encodings(&session);
41+
42+
session
43+
});
44+
45+
#[tokio::test]
46+
async fn test_file_roundtrip() {
47+
// Create a simple roundtrip
48+
let nums = PrimitiveArray::from_iter((0..1024).cycle().take(16_384)).into_array();
49+
50+
let a_array = StructArray::new(
51+
FieldNames::from(["raw", "compressed"]),
52+
vec![nums.clone(), nums.clone()],
53+
16_384,
54+
Validity::NonNullable,
55+
)
56+
.into_array();
57+
58+
let b_array = PrimitiveArray::from_iter((1024..2048).cycle().take(16_384)).into_array();
59+
60+
let data = StructArray::new(
61+
FieldNames::from(["a", "b"]),
62+
vec![a_array, b_array],
63+
16_384,
64+
Validity::NonNullable,
65+
)
66+
.into_array();
67+
68+
// Create a writer which by default uses the BtrBlocks compressor, but for a.raw column it will
69+
// leave it uncompressed.
70+
let default_strategy = Arc::new(CompressingStrategy::new_btrblocks(
71+
FlatLayoutStrategy::default(),
72+
false,
73+
));
74+
75+
let writer = Arc::new(
76+
PathStrategy::new(Arc::new(FlatLayoutStrategy::default()), default_strategy)
77+
.set_field_writer(field_path!(a.raw), Arc::new(FlatLayoutStrategy::default())),
78+
);
79+
80+
let bytes = Vec::new();
81+
let (_, bytes) = SESSION
82+
.write_options()
83+
.with_strategy(writer)
84+
.write(bytes, data.to_array_stream())
85+
.await
86+
.expect("write");
87+
88+
let bytes = ByteBuffer::from(bytes);
89+
let vxf = SESSION.open_options().open(bytes).await.expect("open");
90+
91+
// Read the data back
92+
let stream = vxf
93+
.scan()
94+
.expect("scan")
95+
.into_stream()
96+
.expect("into_stream");
97+
98+
pin_mut!(stream);
99+
100+
while let Some(next) = stream.next().await {
101+
let next = next.expect("next");
102+
let next = next.to_struct();
103+
let a = next.field_by_name("a").unwrap().to_struct();
104+
105+
let raw = a.field_by_name("raw").unwrap();
106+
let compressed = a.field_by_name("compressed").unwrap();
107+
108+
assert!(raw.is_canonical());
109+
assert!(!compressed.is_canonical());
110+
111+
assert!(raw.nbytes() > compressed.nbytes());
112+
}
113+
}

vortex-layout/src/layouts/path.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,15 @@ impl PathStrategy {
133133
fn descend(&self, field: &Field) -> Self {
134134
// Start with the existing set of overrides, then only retain the ones that contain
135135
// the current field
136-
let mut new_writers = self.leaf_writers.clone();
137-
new_writers.retain(|k, _| k.starts_with_field(field));
136+
let mut new_writers = HashMap::with_capacity(self.leaf_writers.len());
137+
138+
for (field_path, strategy) in &self.leaf_writers {
139+
if field_path.starts_with_field(field)
140+
&& let Some(subpath) = field_path.clone().step_into()
141+
{
142+
new_writers.insert(subpath, strategy.clone());
143+
}
144+
}
138145

139146
Self {
140147
leaf_writers: new_writers,
@@ -256,7 +263,6 @@ impl LayoutStrategy for PathStrategy {
256263
.zip_eq(column_names)
257264
.enumerate()
258265
.map(move |(index, ((dtype, recv), name))| {
259-
println!("PathStrategy visiting {name}");
260266
let column_stream =
261267
SequentialStreamAdapter::new(dtype.clone(), recv.into_stream().boxed())
262268
.sendable();

0 commit comments

Comments
 (0)