Skip to content

Commit c5e2594

Browse files
authored
Add DataFrame::into_view instead of implementing TableProvider (apache#2659) (apache#4778)
1 parent 4bea81b commit c5e2594

File tree

3 files changed

+35
-20
lines changed

3 files changed

+35
-20
lines changed

datafusion/core/src/dataframe.rs

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,15 @@ impl DataFrame {
528528
self.session_state.optimize(&self.plan)
529529
}
530530

531+
/// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered
532+
/// as a table view using [`SessionContext::register_table`].
533+
///
534+
/// Note: This discards the [`SessionState`] associated with this
535+
/// [`DataFrame`] in favour of the one passed to [`TableProvider::scan`]
536+
pub fn into_view(self) -> Arc<dyn TableProvider> {
537+
Arc::new(DataFrameTableProvider { plan: self.plan })
538+
}
539+
531540
/// Return the optimized logical plan represented by this DataFrame.
532541
///
533542
/// Note: This method should not be used outside testing, as it loses the snapshot
@@ -766,9 +775,12 @@ impl DataFrame {
766775
}
767776
}
768777

769-
// TODO: This will introduce a ref cycle (#2659)
778+
struct DataFrameTableProvider {
779+
plan: LogicalPlan,
780+
}
781+
770782
#[async_trait]
771-
impl TableProvider for DataFrame {
783+
impl TableProvider for DataFrameTableProvider {
772784
fn as_any(&self) -> &dyn Any {
773785
self
774786
}
@@ -796,34 +808,27 @@ impl TableProvider for DataFrame {
796808

797809
async fn scan(
798810
&self,
799-
_state: &SessionState,
811+
state: &SessionState,
800812
projection: Option<&Vec<usize>>,
801813
filters: &[Expr],
802814
limit: Option<usize>,
803815
) -> Result<Arc<dyn ExecutionPlan>> {
804-
let mut expr = self.clone();
816+
let mut expr = LogicalPlanBuilder::from(self.plan.clone());
805817
if let Some(p) = projection {
806-
let schema = TableProvider::schema(&expr).project(p)?;
807-
let names = schema
808-
.fields()
809-
.iter()
810-
.map(|field| field.name().as_str())
811-
.collect::<Vec<_>>();
812-
expr = expr.select_columns(names.as_slice())?;
818+
expr = expr.select(p.iter().copied())?
813819
}
814820

815821
// Add filter when given
816822
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
817823
if let Some(filter) = filter {
818824
expr = expr.filter(filter)?
819825
}
826+
// add a limit if given
820827
if let Some(l) = limit {
821828
expr = expr.limit(0, Some(l))?
822829
}
823-
// add a limit if given
824-
Self::new(self.session_state.clone(), expr.plan)
825-
.create_physical_plan()
826-
.await
830+
let plan = expr.build()?;
831+
state.create_physical_plan(&plan).await
827832
}
828833
}
829834

@@ -1098,7 +1103,7 @@ mod tests {
10981103
let df_impl = DataFrame::new(ctx.state(), df.plan.clone());
10991104

11001105
// register a dataframe as a table
1101-
ctx.register_table("test_table", Arc::new(df_impl.clone()))?;
1106+
ctx.register_table("test_table", df_impl.clone().into_view())?;
11021107

11031108
// pull the table out
11041109
let table = ctx.table("test_table").await?;
@@ -1297,7 +1302,7 @@ mod tests {
12971302
let df = test_table().await?.select_columns(&["c1", "c2", "c3"])?;
12981303
let ctx = SessionContext::new();
12991304

1300-
let table = Arc::new(df);
1305+
let table = df.into_view();
13011306
ctx.register_table("t1", table.clone())?;
13021307
ctx.register_table("t2", table)?;
13031308
let df = ctx
@@ -1386,7 +1391,7 @@ mod tests {
13861391
)
13871392
.await?;
13881393

1389-
ctx.register_table("t1", Arc::new(ctx.table("test").await?))?;
1394+
ctx.register_table("t1", ctx.table("test").await?.into_view())?;
13901395

13911396
let df = ctx
13921397
.table("t1")

datafusion/core/src/datasource/view.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ mod tests {
428428
)
429429
.await?;
430430

431-
ctx.register_table("t1", Arc::new(ctx.table("test").await?))?;
431+
ctx.register_table("t1", ctx.table("test").await?.into_view())?;
432432

433433
ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;
434434

@@ -458,7 +458,7 @@ mod tests {
458458
)
459459
.await?;
460460

461-
ctx.register_table("t1", Arc::new(ctx.table("test").await?))?;
461+
ctx.register_table("t1", ctx.table("test").await?.into_view())?;
462462

463463
ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;
464464

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,16 @@ impl LogicalPlanBuilder {
288288
Ok(Self::from(project(self.plan, expr)?))
289289
}
290290

291+
/// Select the given column indices
292+
pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
293+
let fields = self.plan.schema().fields();
294+
let exprs: Vec<_> = indices
295+
.into_iter()
296+
.map(|x| Expr::Column(fields[x].qualified_column()))
297+
.collect();
298+
self.project(exprs)
299+
}
300+
291301
/// Apply a filter
292302
pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
293303
let expr = normalize_col(expr.into(), &self.plan)?;

0 commit comments

Comments
 (0)