Skip to content

Commit 549d2dd

Browse files
committed
Add comments and fix format check failure.
1 parent 51fdc12 commit 549d2dd

File tree

1 file changed

+19
-2
lines changed

1 file changed

+19
-2
lines changed

src/dataframe.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,12 @@ impl PyDataFrame {
9292

9393
fn __repr__(&self, py: Python) -> PyDataFusionResult<String> {
9494
let df = self.df.as_ref().clone();
95+
96+
// Mostly the same functionality of `df.limit(0, 10).collect()`. But
97+
// `df.limit(0, 10)` is a semantically different plan, which might be
98+
// invalid. A case is df=`EXPLAIN ...` as `Explain` must be the root.
9599
let batches: Vec<RecordBatch> = get_batches(py, df, 10)?;
100+
96101
let batches_as_string = pretty::pretty_format_batches(&batches);
97102
match batches_as_string {
98103
Ok(batch) => Ok(format!("DataFrame()\n{batch}")),
@@ -103,6 +108,9 @@ impl PyDataFrame {
103108
fn _repr_html_(&self, py: Python) -> PyDataFusionResult<String> {
104109
let mut html_str = "<table border='1'>\n".to_string();
105110

111+
// Mostly the same functionality of `df.limit(0, 10).collect()`. But
112+
// `df.limit(0, 10)` is a semantically different plan, which might be
113+
// invalid. A case is df=`EXPLAIN ...` as `Explain` must be the root.
106114
let df = self.df.as_ref().clone();
107115
let batches: Vec<RecordBatch> = get_batches(py, df, 10)?;
108116

@@ -735,27 +743,36 @@ fn record_batch_into_schema(
735743
RecordBatch::try_new(schema, data_arrays)
736744
}
737745

746+
/// get dataframe as a list of `RecordBatch`es containing at most `max_rows` rows.
738747
fn get_batches(
739748
py: Python,
740749
df: DataFrame,
741750
max_rows: usize,
742751
) -> Result<Vec<RecordBatch>, PyDataFusionError> {
743-
let partitioned_stream = wait_for_future(py, df.execute_stream_partitioned()).map_err(py_datafusion_err)?;
752+
// Here uses `df.execute_stream_partitioned` instead of `df.execute_stream`
753+
// as the later one internally appends `CoalescePartitionsExec` to merge
754+
// the result into a signle partition thus might cause loading of
755+
// unnecessary partitions.
756+
let partitioned_stream =
757+
wait_for_future(py, df.execute_stream_partitioned()).map_err(py_datafusion_err)?;
744758
let stream = futures::stream::iter(partitioned_stream).flatten();
745759
wait_for_future(
746760
py,
747761
stream
748762
.scan(0, |state, x| {
749763
let total = *state;
750764
if total >= max_rows {
765+
// If scanning more than `max_rows`, then stop
751766
future::ready(None)
752767
} else {
753768
match x {
754769
Ok(batch) => {
755770
if total + batch.num_rows() <= max_rows {
771+
// Add the whole batch when not exceeding `max_rows`
756772
*state = total + batch.num_rows();
757773
future::ready(Some(Ok(batch)))
758774
} else {
775+
// Partially load `max_rows - total` rows.
759776
*state = max_rows;
760777
future::ready(Some(Ok(batch.slice(0, max_rows - total))))
761778
}
@@ -768,4 +785,4 @@ fn get_batches(
768785
)
769786
.into_iter()
770787
.collect::<Result<Vec<_>, _>>()
771-
}
788+
}

0 commit comments

Comments
 (0)