Skip to content

Commit 1737973

Browse files
committed
refactor: Update PyDataFrame methods to consistently use display_config for DataFrame creation
1 parent cbc4759 commit 1737973

File tree

1 file changed

+25
-29
lines changed

1 file changed

+25
-29
lines changed

src/dataframe.rs

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,8 @@ impl PyDataFrame {
127127
self.display_config.max_table_rows_in_repr,
128128
self.display_config.max_table_bytes,
129129
),
130-
let (batches, has_more) = wait_for_future(
131-
py,
132-
self.display_config.min_table_rows, self.display_config.max_table_rows_in_repr, self.display_config.max_table_bytes),
133130
)?;
131+
134132
if batches.is_empty() {
135133
// This should not be reached, but do it for safety since we index into the vector below
136134
return Ok("No data to display".to_string());
@@ -281,7 +279,7 @@ impl PyDataFrame {
281279
fn describe(&self, py: Python) -> PyDataFusionResult<Self> {
282280
let df = self.df.as_ref().clone();
283281
let stat_df = wait_for_future(py, df.describe())?;
284-
Ok(Self::new(stat_df))
282+
Ok(Self::new(stat_df, self.display_config.as_ref().clone()))
285283
}
286284

287285
/// Returns the schema from the logical plan
@@ -311,31 +309,31 @@ impl PyDataFrame {
311309
fn select_columns(&self, args: Vec<PyBackedStr>) -> PyDataFusionResult<Self> {
312310
let args = args.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
313311
let df = self.df.as_ref().clone().select_columns(&args)?;
314-
Ok(Self::new(df))
312+
Ok(Self::new(df, self.display_config.as_ref().clone()))
315313
}
316314

317315
#[pyo3(signature = (*args))]
318316
fn select(&self, args: Vec<PyExpr>) -> PyDataFusionResult<Self> {
319317
let expr = args.into_iter().map(|e| e.into()).collect();
320318
let df = self.df.as_ref().clone().select(expr)?;
321-
Ok(Self::new(df))
319+
Ok(Self::new(df, self.display_config.as_ref().clone()))
322320
}
323321

324322
#[pyo3(signature = (*args))]
325323
fn drop(&self, args: Vec<PyBackedStr>) -> PyDataFusionResult<Self> {
326324
let cols = args.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
327325
let df = self.df.as_ref().clone().drop_columns(&cols)?;
328-
Ok(Self::new(df))
326+
Ok(Self::new(df, self.display_config.as_ref().clone()))
329327
}
330328

331329
fn filter(&self, predicate: PyExpr) -> PyDataFusionResult<Self> {
332330
let df = self.df.as_ref().clone().filter(predicate.into())?;
333-
Ok(Self::new(df))
331+
Ok(Self::new(df, self.display_config.as_ref().clone()))
334332
}
335333

336334
fn with_column(&self, name: &str, expr: PyExpr) -> PyDataFusionResult<Self> {
337335
let df = self.df.as_ref().clone().with_column(name, expr.into())?;
338-
Ok(Self::new(df))
336+
Ok(Self::new(df, self.display_config.as_ref().clone()))
339337
}
340338

341339
fn with_columns(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
@@ -345,7 +343,7 @@ impl PyDataFrame {
345343
let name = format!("{}", expr.schema_name());
346344
df = df.with_column(name.as_str(), expr)?
347345
}
348-
Ok(Self::new(df))
346+
Ok(Self::new(df, self.display_config.as_ref().clone()))
349347
}
350348

351349
/// Rename one column by applying a new projection. This is a no-op if the column to be
@@ -356,27 +354,27 @@ impl PyDataFrame {
356354
.as_ref()
357355
.clone()
358356
.with_column_renamed(old_name, new_name)?;
359-
Ok(Self::new(df))
357+
Ok(Self::new(df, self.display_config.as_ref().clone()))
360358
}
361359

362360
fn aggregate(&self, group_by: Vec<PyExpr>, aggs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
363361
let group_by = group_by.into_iter().map(|e| e.into()).collect();
364362
let aggs = aggs.into_iter().map(|e| e.into()).collect();
365363
let df = self.df.as_ref().clone().aggregate(group_by, aggs)?;
366-
Ok(Self::new(df))
364+
Ok(Self::new(df, self.display_config.as_ref().clone()))
367365
}
368366

369367
#[pyo3(signature = (*exprs))]
370368
fn sort(&self, exprs: Vec<PySortExpr>) -> PyDataFusionResult<Self> {
371369
let exprs = to_sort_expressions(exprs);
372370
let df = self.df.as_ref().clone().sort(exprs)?;
373-
Ok(Self::new(df))
371+
Ok(Self::new(df, self.display_config.as_ref().clone()))
374372
}
375373

376374
#[pyo3(signature = (count, offset=0))]
377375
fn limit(&self, count: usize, offset: usize) -> PyDataFusionResult<Self> {
378376
let df = self.df.as_ref().clone().limit(offset, Some(count))?;
379-
Ok(Self::new(df))
377+
Ok(Self::new(df, self.display_config.as_ref().clone()))
380378
}
381379

382380
/// Executes the plan, returning a list of `RecordBatch`es.
@@ -393,7 +391,7 @@ impl PyDataFrame {
393391
/// Cache DataFrame.
394392
fn cache(&self, py: Python) -> PyDataFusionResult<Self> {
395393
let df = wait_for_future(py, self.df.as_ref().clone().cache())?;
396-
Ok(Self::new(df))
394+
Ok(Self::new(df, self.display_config.as_ref().clone()))
397395
}
398396

399397
/// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
@@ -418,7 +416,7 @@ impl PyDataFrame {
418416
/// Filter out duplicate rows
419417
fn distinct(&self) -> PyDataFusionResult<Self> {
420418
let df = self.df.as_ref().clone().distinct()?;
421-
Ok(Self::new(df))
419+
Ok(Self::new(df, self.display_config.as_ref().clone()))
422420
}
423421

424422
fn join(
@@ -452,7 +450,7 @@ impl PyDataFrame {
452450
&right_keys,
453451
None,
454452
)?;
455-
Ok(Self::new(df))
453+
Ok(Self::new(df, self.display_config.as_ref().clone()))
456454
}
457455

458456
fn join_on(
@@ -481,7 +479,7 @@ impl PyDataFrame {
481479
.as_ref()
482480
.clone()
483481
.join_on(right.df.as_ref().clone(), join_type, exprs)?;
484-
Ok(Self::new(df))
482+
Ok(Self::new(df, self.display_config.as_ref().clone()))
485483
}
486484

487485
/// Print the query plan
@@ -514,7 +512,7 @@ impl PyDataFrame {
514512
.as_ref()
515513
.clone()
516514
.repartition(Partitioning::RoundRobinBatch(num))?;
517-
Ok(Self::new(new_df))
515+
Ok(Self::new(new_df, self.display_config.as_ref().clone()))
518516
}
519517

520518
/// Repartition a `DataFrame` based on a logical partitioning scheme.
@@ -526,7 +524,7 @@ impl PyDataFrame {
526524
.as_ref()
527525
.clone()
528526
.repartition(Partitioning::Hash(expr, num))?;
529-
Ok(Self::new(new_df))
527+
Ok(Self::new(new_df, self.display_config.as_ref().clone()))
530528
}
531529

532530
/// Calculate the union of two `DataFrame`s, preserving duplicate rows.The
@@ -542,7 +540,7 @@ impl PyDataFrame {
542540
self.df.as_ref().clone().union(py_df.df.as_ref().clone())?
543541
};
544542

545-
Ok(Self::new(new_df))
543+
Ok(Self::new(new_df, self.display_config.as_ref().clone()))
546544
}
547545

548546
/// Calculate the distinct union of two `DataFrame`s. The
@@ -553,7 +551,7 @@ impl PyDataFrame {
553551
.as_ref()
554552
.clone()
555553
.union_distinct(py_df.df.as_ref().clone())?;
556-
Ok(Self::new(new_df))
554+
Ok(Self::new(new_df, self.display_config.as_ref().clone()))
557555
}
558556

559557
#[pyo3(signature = (column, preserve_nulls=true))]
@@ -566,7 +564,7 @@ impl PyDataFrame {
566564
.as_ref()
567565
.clone()
568566
.unnest_columns_with_options(&[column], unnest_options)?;
569-
Ok(Self::new(df))
567+
Ok(Self::new(df, self.display_config.as_ref().clone()))
570568
}
571569

572570
#[pyo3(signature = (columns, preserve_nulls=true))]
@@ -584,7 +582,7 @@ impl PyDataFrame {
584582
.as_ref()
585583
.clone()
586584
.unnest_columns_with_options(&cols, unnest_options)?;
587-
Ok(Self::new(df))
585+
Ok(Self::new(df, self.display_config.as_ref().clone()))
588586
}
589587

590588
/// Calculate the intersection of two `DataFrame`s. The two `DataFrame`s must have exactly the same schema
@@ -594,13 +592,13 @@ impl PyDataFrame {
594592
.as_ref()
595593
.clone()
596594
.intersect(py_df.df.as_ref().clone())?;
597-
Ok(Self::new(new_df))
595+
Ok(Self::new(new_df, self.display_config.as_ref().clone()))
598596
}
599597

600598
/// Calculate the exception of two `DataFrame`s. The two `DataFrame`s must have exactly the same schema
601599
fn except_all(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
602600
let new_df = self.df.as_ref().clone().except(py_df.df.as_ref().clone())?;
603-
Ok(Self::new(new_df. self.display_config))
601+
Ok(Self::new(new_df, self.display_config.as_ref().clone()))
604602
}
605603

606604
/// Write a `DataFrame` to a CSV file.
@@ -907,9 +905,7 @@ async fn collect_record_batches_to_display(
907905
let mut record_batches = Vec::default();
908906
let mut has_more = false;
909907

910-
while (size_estimate_so_far < max_bytes && rows_so_far < max_rows)
911-
|| rows_so_far < min_rows
912-
{
908+
while (size_estimate_so_far < max_bytes && rows_so_far < max_rows) || rows_so_far < min_rows {
913909
let mut rb = match stream.next().await {
914910
None => {
915911
break;

0 commit comments

Comments
 (0)