Skip to content

Commit 8532c47

Browse files
authored
Streaming conversion CLI (#2687)
1 parent ca37e44 commit 8532c47

File tree

3 files changed

+39
-109
lines changed

3 files changed

+39
-109
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-tui/Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,12 @@ rust-version = { workspace = true }
1414
version = { workspace = true }
1515

1616
[dependencies]
17-
arrow-array = { workspace = true }
1817
async-trait = { workspace = true }
1918
clap = { workspace = true, features = ["derive"] }
2019
crossterm = { workspace = true }
2120
futures-util = { workspace = true }
22-
indicatif = { workspace = true }
21+
indicatif = { workspace = true, features = ["futures"] }
2322
parquet = { workspace = true, features = ["arrow", "async"] }
24-
pin-project = { workspace = true }
2523
ratatui = { workspace = true }
2624
tokio = { workspace = true, features = ["rt-multi-thread"] }
2725
vortex = { workspace = true, features = ["tokio", "parquet"] }

vortex-tui/src/convert.rs

Lines changed: 37 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,23 @@
1-
#![allow(clippy::use_debug)]
2-
31
use std::path::Path;
4-
use std::pin::Pin;
5-
use std::task::{Context, Poll};
6-
use std::time::Instant;
72

8-
use arrow_array::StructArray as ArrowStructArray;
9-
use futures_util::Stream;
3+
use futures_util::StreamExt;
104
use indicatif::ProgressBar;
115
use parquet::arrow::ParquetRecordBatchStreamBuilder;
12-
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
13-
use pin_project::pin_project;
146
use tokio::fs::File;
15-
use vortex::arrays::ChunkedArray;
16-
use vortex::arrow::FromArrowArray;
7+
use vortex::TryIntoArray;
178
use vortex::dtype::DType;
18-
use vortex::error::{VortexExpect, VortexResult};
9+
use vortex::dtype::arrow::FromArrowType;
10+
use vortex::error::{VortexError, VortexExpect, VortexResult};
1911
use vortex::file::VortexWriteOptions;
20-
use vortex::stream::{ArrayStream, ArrayStreamArrayExt};
21-
use vortex::{Array, ArrayRef};
12+
use vortex::stream::ArrayStreamAdapter;
2213

2314
#[derive(Default)]
2415
pub struct Flags {
2516
pub quiet: bool,
2617
}
2718

19+
const BATCH_SIZE: usize = 8192;
20+
2821
/// Convert Parquet files to Vortex.
2922
pub async fn exec_convert(input_path: impl AsRef<Path>, flags: Flags) -> VortexResult<()> {
3023
if !flags.quiet {
@@ -34,101 +27,41 @@ pub async fn exec_convert(input_path: impl AsRef<Path>, flags: Flags) -> VortexR
3427
);
3528
}
3629

37-
let wall_start = Instant::now();
38-
3930
let output_path = input_path.as_ref().with_extension("vortex");
40-
let mut file = File::open(input_path).await?;
41-
42-
let metadata =
43-
ArrowReaderMetadata::load_async(&mut file, ArrowReaderOptions::default()).await?;
44-
let has_root_level_nulls = metadata.parquet_schema().root_schema().is_optional();
45-
46-
let mut reader = ParquetRecordBatchStreamBuilder::new(file).await?.build()?;
47-
let mut chunks = Vec::new();
48-
49-
while let Some(mut reader) = reader.next_row_group().await? {
50-
for batch in reader.by_ref() {
51-
let batch = ArrowStructArray::from(batch?);
52-
let next_chunk = ArrayRef::from_arrow(&batch, has_root_level_nulls);
53-
chunks.push(next_chunk);
54-
}
55-
}
56-
57-
let read_complete = Instant::now();
58-
59-
if !flags.quiet {
60-
eprintln!(
61-
"Read Parquet file in {:?}",
62-
read_complete.duration_since(wall_start)
63-
);
64-
65-
eprintln!(
66-
"Writing {} chunks to new Vortex file {}",
67-
chunks.len(),
68-
output_path.display()
69-
);
70-
}
71-
72-
let dtype = chunks.first().vortex_expect("empty chunks").dtype().clone();
73-
let chunked_array = ChunkedArray::try_new(chunks, dtype)?;
74-
75-
let writer = VortexWriteOptions::default();
76-
77-
let output_file = File::create(output_path).await?;
31+
let file = File::open(input_path).await?;
32+
33+
let parquet = ParquetRecordBatchStreamBuilder::new(file)
34+
.await?
35+
.with_batch_size(BATCH_SIZE);
36+
let num_rows = parquet.metadata().file_metadata().num_rows();
37+
38+
let dtype = DType::from_arrow(parquet.schema().as_ref());
39+
let mut vortex_stream = parquet
40+
.build()?
41+
.map(|record_batch| {
42+
record_batch
43+
.map_err(VortexError::from)
44+
.and_then(|rb| rb.try_into_array())
45+
})
46+
.boxed();
7847

7948
if !flags.quiet {
80-
let pb = ProgressBar::new(chunked_array.nchunks() as u64);
81-
let stream = ProgressArrayStream {
82-
inner: chunked_array.to_array_stream(),
83-
progress: pb,
84-
};
85-
writer.write(output_file, stream).await?;
86-
} else {
87-
writer
88-
.write(output_file, chunked_array.to_array_stream())
89-
.await?;
49+
// Parquet reader returns batches, rather than row groups. So make sure we correctly
50+
// configure the progress bar.
51+
let nbatches = u64::try_from(num_rows)
52+
.vortex_expect("negative row count?")
53+
.div_ceil(BATCH_SIZE as u64);
54+
vortex_stream = ProgressBar::new(nbatches)
55+
.wrap_stream(vortex_stream)
56+
.boxed();
9057
}
9158

92-
if !flags.quiet {
93-
eprintln!(
94-
"Wrote Vortex in {:?}",
95-
Instant::now().duration_since(read_complete)
96-
);
97-
}
59+
VortexWriteOptions::default()
60+
.write(
61+
File::create(output_path).await?,
62+
ArrayStreamAdapter::new(dtype, vortex_stream),
63+
)
64+
.await?;
9865

9966
Ok(())
10067
}
101-
102-
#[pin_project]
103-
struct ProgressArrayStream<S> {
104-
#[pin]
105-
inner: S,
106-
progress: ProgressBar,
107-
}
108-
109-
impl<S> Stream for ProgressArrayStream<S>
110-
where
111-
S: Stream<Item = VortexResult<ArrayRef>>,
112-
{
113-
type Item = VortexResult<ArrayRef>;
114-
115-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
116-
let this = self.project();
117-
match this.inner.poll_next(cx) {
118-
Poll::Ready(inner) => {
119-
this.progress.inc(1);
120-
Poll::Ready(inner)
121-
}
122-
Poll::Pending => Poll::Pending,
123-
}
124-
}
125-
}
126-
127-
impl<S> ArrayStream for ProgressArrayStream<S>
128-
where
129-
S: ArrayStream,
130-
{
131-
fn dtype(&self) -> &DType {
132-
self.inner.dtype()
133-
}
134-
}

0 commit comments

Comments
 (0)