Skip to content

Commit 933fcfc

Browse files
committed
list all files
Signed-off-by: Adam Gutglick <[email protected]>
1 parent 3a5af7a commit 933fcfc

File tree

2 files changed

+37
-30
lines changed

2 files changed

+37
-30
lines changed

vortex-datafusion/src/persistent/mod.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,14 @@ mod tests {
5555
use datafusion_datasource::file_format::format_as_file_type;
5656
use datafusion_expr::LogicalPlanBuilder;
5757
use datafusion_physical_plan::display::DisplayableExecutionPlan;
58+
use futures::StreamExt;
5859
use insta::assert_snapshot;
5960
use regex::Regex;
6061
use rstest::rstest;
6162
use tempfile::TempDir;
6263
use tempfile::tempdir;
6364
use tokio::fs::OpenOptions;
65+
use tokio::io::AsyncWriteExt;
6466
use vortex::VortexSessionDefault;
6567
use vortex::array::IntoArray;
6668
use vortex::array::arrays::ChunkedArray;
@@ -79,7 +81,7 @@ mod tests {
7981
#[case(Some(1))]
8082
#[case(None)]
8183
#[tokio::test]
82-
async fn query_file(#[case] limit: Option<usize>) -> anyhow::Result<()> {
84+
async fn test_query_file(#[case] limit: Option<usize>) -> anyhow::Result<()> {
8385
let session = VortexSession::default();
8486
let temp_dir = tempdir()?;
8587
let strings = ChunkedArray::from_iter([
@@ -131,13 +133,24 @@ mod tests {
131133
.write_options()
132134
.write(&mut f, st.to_array_stream())
133135
.await?;
136+
f.shutdown().await?;
134137

135138
let ctx = SessionContext::default();
136139
let format = Arc::new(VortexFormat::new(session));
137140

138141
let table_url = ListingTableUrl::parse(table_url)?;
139142
assert!(table_url.is_collection());
140143

144+
let state = ctx.state();
145+
let store = ctx.runtime_env().object_store(table_url.clone())?;
146+
147+
let mut s = table_url.list_all_files(&state, &store, "vortex").await?;
148+
while let Some(f) = s.next().await {
149+
let f = f?;
150+
dbg!(f);
151+
}
152+
drop(s);
153+
141154
let config = ListingTableConfig::new(table_url)
142155
.with_listing_options(
143156
ListingOptions::new(format).with_session_config_options(ctx.state().config()),

vortex-datafusion/src/persistent/sink.rs

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33

44
use std::any::Any;
55
use std::sync::Arc;
6-
use std::sync::atomic::AtomicU64;
7-
use std::sync::atomic::Ordering;
86

97
use arrow_schema::SchemaRef;
108
use async_trait::async_trait;
119
use datafusion_common::DataFusionError;
1210
use datafusion_common::Result as DFResult;
11+
use datafusion_common::exec_datafusion_err;
1312
use datafusion_common_runtime::JoinSet;
1413
use datafusion_common_runtime::SpawnedTask;
1514
use datafusion_datasource::file_sink_config::FileSink;
@@ -33,6 +32,7 @@ use vortex::dtype::DType;
3332
use vortex::dtype::arrow::FromArrowType;
3433
use vortex::error::VortexResult;
3534
use vortex::file::WriteOptionsSessionExt;
35+
use vortex::file::WriteSummary;
3636
use vortex::io::ObjectStoreWriter;
3737
use vortex::io::VortexWrite;
3838
use vortex::session::VortexSession;
@@ -108,58 +108,52 @@ impl FileSink for VortexSink {
108108
mut file_stream_rx: DemuxedStreamReceiver,
109109
object_store: Arc<dyn ObjectStore>,
110110
) -> DFResult<u64> {
111-
// This is a hack
112-
let row_counter = Arc::new(AtomicU64::new(0));
113-
114-
let mut file_write_tasks: JoinSet<DFResult<Path>> = JoinSet::new();
111+
let mut file_write_tasks: JoinSet<DFResult<(Path, WriteSummary)>> = JoinSet::new();
115112

116113
// TODO(adamg):
117114
// 1. We can probably be better at signaling how much memory we're consuming (potentially when reading too), see ParquetSink::spawn_writer_tasks_and_join.
118115
while let Some((path, rx)) = file_stream_rx.recv().await {
119116
let session = self.session.clone();
120-
let row_counter = row_counter.clone();
121117
let object_store = object_store.clone();
122118
let writer_schema = get_writer_schema(&self.config);
123119
let dtype = DType::from_arrow(writer_schema);
124120

125121
// We need to spawn work because there's a dependency between the different files. If one file has too many batches buffered,
126122
// the demux task might deadlock itself.
127123
file_write_tasks.spawn(async move {
128-
let stream = ReceiverStream::new(rx).map(move |rb| {
129-
row_counter.fetch_add(rb.num_rows() as u64, Ordering::Relaxed);
130-
VortexResult::Ok(ArrayRef::from_arrow(rb, false))
131-
});
124+
let stream = ReceiverStream::new(rx)
125+
.map(move |rb| VortexResult::Ok(ArrayRef::from_arrow(rb, false)));
132126

133127
let stream_adapter = ArrayStreamAdapter::new(dtype, stream);
134128

135-
let mut sink = ObjectStoreWriter::new(object_store.clone(), &path)
129+
let mut object_writer = ObjectStoreWriter::new(object_store, &path)
136130
.await
137-
.map_err(|e| {
138-
DataFusionError::Execution(format!(
139-
"Failed to create ObjectStoreWriter: {e}"
140-
))
141-
})?;
131+
.map_err(|e| exec_datafusion_err!("Failed to create ObjectStoreWriter: {e}"))?;
142132

143-
session
133+
let summary = session
144134
.write_options()
145-
.write(&mut sink, stream_adapter)
135+
.write(&mut object_writer, stream_adapter)
146136
.await
147-
.map_err(|e| {
148-
DataFusionError::Execution(format!("Failed to write Vortex file: {e}"))
149-
})?;
137+
.map_err(|e| exec_datafusion_err!("Failed to write Vortex file: {e}"))?;
150138

151-
sink.shutdown().await.map_err(|e| {
152-
DataFusionError::Execution(format!("Failed to shutdown Vortex writer: {e}"))
153-
})?;
139+
object_writer
140+
.shutdown()
141+
.await
142+
.map_err(|e| exec_datafusion_err!("Failed to shutdown Vortex writer: {e}"))?;
154143

155-
Ok(path)
144+
Ok((path, summary))
156145
});
157146
}
158147

148+
let mut row_count = 0;
149+
159150
while let Some(result) = file_write_tasks.join_next().await {
160151
match result {
161-
Ok(path) => {
162-
let path = path?;
152+
Ok(r) => {
153+
let (path, summary) = r?;
154+
155+
row_count += summary.row_count();
156+
163157
tracing::info!(path = %path, "Successfully written file");
164158
}
165159
Err(e) => {
@@ -177,7 +171,7 @@ impl FileSink for VortexSink {
177171
.await
178172
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
179173

180-
Ok(row_counter.load(Ordering::SeqCst))
174+
Ok(row_count)
181175
}
182176
}
183177

0 commit comments

Comments
 (0)