Skip to content

Commit 37d42f4

Browse files
committed
Refactor pipeline datasource and IO argument modules.
This introduces a reusable DataSource abstraction and splits read/write argument structs into dedicated modules while updating convert tests to use DataFrameReader directly. Made-with: Cursor
1 parent f599ed8 commit 37d42f4

File tree

6 files changed

+148
-74
lines changed

6 files changed

+148
-74
lines changed

src/cli/convert.rs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ mod tests {
1818
use super::*;
1919
use crate::pipeline::Source;
2020
use crate::pipeline::Step;
21-
use crate::pipeline::dataframe::read_dataframe;
21+
use crate::pipeline::dataframe::DataFrameReader;
2222
use crate::pipeline::read_to_batches;
2323
use crate::pipeline::write_batches;
2424

@@ -43,7 +43,7 @@ mod tests {
4343

4444
#[tokio::test(flavor = "multi_thread")]
4545
async fn test_read_dataframe() {
46-
let df = *read_dataframe(
46+
let df = *DataFrameReader::new(
4747
"fixtures/table.parquet",
4848
FileType::Parquet,
4949
None,
@@ -61,7 +61,7 @@ mod tests {
6161
#[tokio::test(flavor = "multi_thread")]
6262
async fn test_read_dataframe_with_select() {
6363
let select = Some(vec!["one".to_string(), "two".to_string()]);
64-
let df = *read_dataframe(
64+
let df = *DataFrameReader::new(
6565
"fixtures/table.parquet",
6666
FileType::Parquet,
6767
select,
@@ -81,7 +81,7 @@ mod tests {
8181

8282
#[tokio::test(flavor = "multi_thread")]
8383
async fn test_read_dataframe_with_limit() {
84-
let df = *read_dataframe(
84+
let df = *DataFrameReader::new(
8585
"fixtures/table.parquet",
8686
FileType::Parquet,
8787
None,
@@ -99,7 +99,7 @@ mod tests {
9999
#[tokio::test(flavor = "multi_thread")]
100100
async fn test_read_dataframe_with_select_and_limit() {
101101
let select = Some(vec!["two".to_string()]);
102-
let df = *read_dataframe(
102+
let df = *DataFrameReader::new(
103103
"fixtures/table.parquet",
104104
FileType::Parquet,
105105
select,
@@ -119,7 +119,7 @@ mod tests {
119119

120120
#[tokio::test(flavor = "multi_thread")]
121121
async fn test_read_dataframe_avro() {
122-
let df = *read_dataframe(
122+
let df = *DataFrameReader::new(
123123
"fixtures/userdata5.avro",
124124
FileType::Avro,
125125
None,
@@ -136,7 +136,7 @@ mod tests {
136136

137137
#[tokio::test(flavor = "multi_thread")]
138138
async fn test_read_dataframe_orc() {
139-
let df = *read_dataframe("fixtures/userdata.orc", FileType::Orc, None, Some(5), None)
139+
let df = *DataFrameReader::new("fixtures/userdata.orc", FileType::Orc, None, Some(5), None)
140140
.execute(())
141141
.await
142142
.unwrap()
@@ -147,7 +147,7 @@ mod tests {
147147

148148
#[tokio::test(flavor = "multi_thread")]
149149
async fn test_read_dataframe_csv() {
150-
let df = *read_dataframe("fixtures/table.csv", FileType::Csv, None, Some(2), None)
150+
let df = *DataFrameReader::new("fixtures/table.csv", FileType::Csv, None, Some(2), None)
151151
.execute(())
152152
.await
153153
.unwrap()
@@ -158,7 +158,7 @@ mod tests {
158158

159159
#[tokio::test(flavor = "multi_thread")]
160160
async fn test_read_dataframe_unsupported_type() {
161-
let result = read_dataframe("fixtures/data.json", FileType::Json, None, None, None)
161+
let result = DataFrameReader::new("fixtures/data.json", FileType::Json, None, None, None)
162162
.execute(())
163163
.await;
164164
assert!(result.is_err());
@@ -174,7 +174,7 @@ mod tests {
174174

175175
#[tokio::test(flavor = "multi_thread")]
176176
async fn test_write_dataframe_to_parquet() {
177-
let source = read_dataframe(
177+
let source = DataFrameReader::new(
178178
"fixtures/table.parquet",
179179
FileType::Parquet,
180180
None,
@@ -192,7 +192,7 @@ mod tests {
192192
.unwrap();
193193
assert!(std::path::Path::new(&output).exists());
194194

195-
let df2 = *read_dataframe(&output, FileType::Parquet, None, None, None)
195+
let df2 = *DataFrameReader::new(&output, FileType::Parquet, None, None, None)
196196
.execute(())
197197
.await
198198
.unwrap()
@@ -203,7 +203,7 @@ mod tests {
203203

204204
#[tokio::test(flavor = "multi_thread")]
205205
async fn test_write_dataframe_to_csv() {
206-
let source = read_dataframe(
206+
let source = DataFrameReader::new(
207207
"fixtures/table.parquet",
208208
FileType::Parquet,
209209
None,
@@ -224,7 +224,7 @@ mod tests {
224224

225225
#[tokio::test(flavor = "multi_thread")]
226226
async fn test_write_dataframe_to_json() {
227-
let source = read_dataframe(
227+
let source = DataFrameReader::new(
228228
"fixtures/table.parquet",
229229
FileType::Parquet,
230230
None,
@@ -246,7 +246,7 @@ mod tests {
246246

247247
#[tokio::test(flavor = "multi_thread")]
248248
async fn test_write_dataframe_to_json_pretty() {
249-
let source = read_dataframe(
249+
let source = DataFrameReader::new(
250250
"fixtures/table.parquet",
251251
FileType::Parquet,
252252
None,
@@ -269,7 +269,7 @@ mod tests {
269269

270270
#[tokio::test(flavor = "multi_thread")]
271271
async fn test_write_dataframe_to_yaml() {
272-
let source = read_dataframe(
272+
let source = DataFrameReader::new(
273273
"fixtures/table.parquet",
274274
FileType::Parquet,
275275
None,
@@ -292,7 +292,7 @@ mod tests {
292292
#[tokio::test(flavor = "multi_thread")]
293293
async fn test_write_dataframe_to_avro() {
294294
let select = Some(vec!["two".to_string(), "three".to_string()]);
295-
let source = read_dataframe(
295+
let source = DataFrameReader::new(
296296
"fixtures/table.parquet",
297297
FileType::Parquet,
298298
select,
@@ -310,7 +310,7 @@ mod tests {
310310
.unwrap();
311311
assert!(std::path::Path::new(&output).exists());
312312

313-
let df2 = *read_dataframe(&output, FileType::Avro, None, None, None)
313+
let df2 = *DataFrameReader::new(&output, FileType::Avro, None, None, None)
314314
.execute(())
315315
.await
316316
.unwrap()
@@ -322,7 +322,7 @@ mod tests {
322322
#[tokio::test(flavor = "multi_thread")]
323323
async fn test_write_dataframe_to_orc() {
324324
let select = Some(vec!["id".to_string(), "first_name".to_string()]);
325-
let source = read_dataframe(
325+
let source = DataFrameReader::new(
326326
"fixtures/userdata5.avro",
327327
FileType::Avro,
328328
select,
@@ -340,7 +340,7 @@ mod tests {
340340
.unwrap();
341341
assert!(std::path::Path::new(&output).exists());
342342

343-
let df2 = *read_dataframe(&output, FileType::Orc, None, None, None)
343+
let df2 = *DataFrameReader::new(&output, FileType::Orc, None, None, None)
344344
.execute(())
345345
.await
346346
.unwrap()
@@ -351,7 +351,7 @@ mod tests {
351351

352352
#[tokio::test(flavor = "multi_thread")]
353353
async fn test_write_dataframe_to_xlsx() {
354-
let source = read_dataframe(
354+
let source = DataFrameReader::new(
355355
"fixtures/table.parquet",
356356
FileType::Parquet,
357357
None,
@@ -502,7 +502,7 @@ mod tests {
502502
let select = Some(vec!["two".to_string(), "three".to_string()]);
503503
let temp_dir = tempfile::tempdir().unwrap();
504504

505-
let source = read_dataframe(
505+
let source = DataFrameReader::new(
506506
"fixtures/table.parquet",
507507
FileType::Parquet,
508508
select,
@@ -518,7 +518,7 @@ mod tests {
518518
.await
519519
.unwrap();
520520

521-
let source2 = read_dataframe(&avro_path, FileType::Avro, None, None, None)
521+
let source2 = DataFrameReader::new(&avro_path, FileType::Avro, None, None, None)
522522
.execute(())
523523
.await
524524
.unwrap();
@@ -528,7 +528,7 @@ mod tests {
528528
.await
529529
.unwrap();
530530

531-
let df3 = *read_dataframe(&parquet_path, FileType::Parquet, None, None, None)
531+
let df3 = *DataFrameReader::new(&parquet_path, FileType::Parquet, None, None, None)
532532
.execute(())
533533
.await
534534
.unwrap()
@@ -542,7 +542,7 @@ mod tests {
542542
let select = Some(vec!["id".to_string(), "first_name".to_string()]);
543543
let temp_dir = tempfile::tempdir().unwrap();
544544

545-
let source = read_dataframe(
545+
let source = DataFrameReader::new(
546546
"fixtures/userdata5.avro",
547547
FileType::Avro,
548548
select,
@@ -558,7 +558,7 @@ mod tests {
558558
.await
559559
.unwrap();
560560

561-
let source2 = read_dataframe(&orc_path, FileType::Orc, None, None, None)
561+
let source2 = DataFrameReader::new(&orc_path, FileType::Orc, None, None, None)
562562
.execute(())
563563
.await
564564
.unwrap();
@@ -568,7 +568,7 @@ mod tests {
568568
.await
569569
.unwrap();
570570

571-
let df3 = *read_dataframe(&parquet_path, FileType::Parquet, None, None, None)
571+
let df3 = *DataFrameReader::new(&parquet_path, FileType::Parquet, None, None, None)
572572
.execute(())
573573
.await
574574
.unwrap()

src/pipeline.rs

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
pub mod avro;
44
pub mod csv;
55
pub mod dataframe;
6+
pub mod datasource;
67
pub mod display;
78
pub mod json;
89
pub mod orc;
910
pub mod parquet;
11+
pub mod read;
1012
pub mod record_batch_filter;
1113
pub mod select;
14+
pub mod write;
1215
pub mod xlsx;
1316
pub mod yaml;
1417

@@ -18,37 +21,12 @@ use futures::StreamExt;
1821

1922
use crate::FileType;
2023
use crate::Result;
24+
use crate::pipeline::dataframe::DataFrameReader;
2125
use crate::pipeline::dataframe::DataFrameSource;
22-
23-
/// Arguments for reading a file (Avro, CSV, Parquet, ORC).
24-
pub struct ReadArgs {
25-
pub path: String,
26-
pub limit: Option<usize>,
27-
pub offset: Option<usize>,
28-
/// When reading CSV: has_header for CsvReadOptions. None is treated as true.
29-
pub csv_has_header: Option<bool>,
30-
}
31-
32-
/// Arguments for writing a file (CSV, Avro, Parquet, ORC, XLSX).
33-
pub struct WriteArgs {
34-
pub path: String,
35-
}
36-
37-
/// Arguments for writing a JSON file.
38-
pub struct WriteJsonArgs {
39-
pub path: String,
40-
/// When true, omit keys with null/missing values. When false, output default values.
41-
pub sparse: bool,
42-
/// When true, format output with indentation and newlines.
43-
pub pretty: bool,
44-
}
45-
46-
/// Arguments for writing a YAML file.
47-
pub struct WriteYamlArgs {
48-
pub path: String,
49-
/// When true, omit keys with null/missing values. When false, output default values.
50-
pub sparse: bool,
51-
}
26+
pub use crate::pipeline::read::ReadArgs;
27+
pub use crate::pipeline::write::WriteArgs;
28+
pub use crate::pipeline::write::WriteJsonArgs;
29+
pub use crate::pipeline::write::WriteYamlArgs;
5230

5331
/// A `Step` defines a step in the pipeline that can be executed
5432
/// and has an input and output type.
@@ -179,13 +157,10 @@ pub async fn read_to_batches(
179157
limit: Option<usize>,
180158
csv_has_header: Option<bool>,
181159
) -> anyhow::Result<Vec<arrow::record_batch::RecordBatch>> {
182-
let source = dataframe::read_dataframe(
183-
input_path,
184-
input_file_type,
185-
select.clone(),
186-
limit,
187-
csv_has_header,
188-
)
160+
let source = {
161+
let select = select.clone();
162+
DataFrameReader::new(input_path, input_file_type, select, limit, csv_has_header)
163+
}
189164
.execute(())
190165
.await?;
191166
let reader = DataFrameToBatchReader::try_new(source)

src/pipeline/dataframe.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -216,17 +216,6 @@ impl Step for DataFrameReader {
216216
}
217217
}
218218

219-
/// Creates a `DataFrameReader` that reads an input file into a DataFusion DataFrame.
220-
pub fn read_dataframe(
221-
input_path: &str,
222-
input_file_type: FileType,
223-
select: Option<Vec<String>>,
224-
limit: Option<usize>,
225-
csv_has_header: Option<bool>,
226-
) -> DataFrameReader {
227-
DataFrameReader::new(input_path, input_file_type, select, limit, csv_has_header)
228-
}
229-
230219
/// Reads an ORC file into record batches (ORC is not natively supported by DataFusion).
231220
/// Limit is applied via DataFusion after reading.
232221
fn read_orc_to_batches(path: &str) -> crate::Result<Vec<arrow::record_batch::RecordBatch>> {

0 commit comments

Comments
 (0)