Skip to content
Draft
7 changes: 7 additions & 0 deletions R/000-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -4194,6 +4194,12 @@ class(`PlRExpr`) <- c("PlRExpr__bundle", "savvy_polars__sealed")
}
}

`PlRLazyFrame_with_optimizations` <- function(self) {
function(`optimizations`) {
.savvy_wrap_PlRLazyFrame(.Call(savvy_PlRLazyFrame_with_optimizations__impl, `self`, `optimizations`))
}
}

`PlRLazyFrame_with_row_index` <- function(self) {
function(`name`, `offset` = NULL) {
.savvy_wrap_PlRLazyFrame(.Call(savvy_PlRLazyFrame_with_row_index__impl, `self`, `name`, `offset`))
Expand Down Expand Up @@ -4263,6 +4269,7 @@ class(`PlRExpr`) <- c("PlRExpr__bundle", "savvy_polars__sealed")
e$`var` <- `PlRLazyFrame_var`(ptr)
e$`with_columns` <- `PlRLazyFrame_with_columns`(ptr)
e$`with_columns_seq` <- `PlRLazyFrame_with_columns_seq`(ptr)
e$`with_optimizations` <- `PlRLazyFrame_with_optimizations`(ptr)
e$`with_row_index` <- `PlRLazyFrame_with_row_index`(ptr)

class(e) <- c("PlRLazyFrame", "savvy_polars__sealed")
Expand Down
183 changes: 98 additions & 85 deletions R/lazyframe-frame.R
Original file line number Diff line number Diff line change
Expand Up @@ -234,20 +234,50 @@ lazyframe__group_by <- function(..., .maintain_order = FALSE) {
#' Individual optimizations may be disabled by setting the corresponding parameter to `FALSE`.
#' @inherit pl__DataFrame return
#' @inheritParams rlang::args_dots_empty
#' @inheritParams QueryOptFlags
#' @param type_coercion A logical, indicates type coercion optimization.
#' @param collapse_joins `r lifecycle::badge("deprecated")`
#' Use `predicate_pushdown` instead.
#' @param no_optimization A logical. If `TRUE`, turn off (certain) optimizations.
#' @param engine The engine name to use for processing the query.
#' One of the followings:
#' - `"auto"` (default): Select the engine automatically.
#' The `"in-memory"` engine will be selected for most cases.
#' - `"in-memory"`: Use the in-memory engine.
#' - `"streaming"`: `r lifecycle::badge("experimental")` Use the (new) streaming engine.
#' @param _eager A logical, indicates to turn off multi-node optimizations and
#' the other optimizations. This option is intended for internal use only.
#' @param _check_order,_type_check For internal use only.
#' One of the followings:
#' - `"auto"` (default): Select the engine automatically.
#' The `"in-memory"` engine will be selected for most cases.
#' - `"in-memory"`: Use the in-memory engine.
#' - `"streaming"`: `r lifecycle::badge("experimental")` Use the (new) streaming engine.
#' @param optimizations `r lifecycle::badge("experimental")`
#' A [QueryOptFlags] object to indicate optimization passes done during query optimization.
#' @param type_coercion `r lifecycle::badge("deprecated")`
#' Use the `type_coercion` property of a [QueryOptFlags] object, then pass
#' that to the `optimizations` argument instead.
#' @param predicate_pushdown `r lifecycle::badge("deprecated")`
#' Use the `predicate_pushdown` property of a [QueryOptFlags] object, then pass
#' that to the `optimizations` argument instead.
#' @param projection_pushdown `r lifecycle::badge("deprecated")`
#' Use the `projection_pushdown` property of a [QueryOptFlags] object, then pass
#' that to the `optimizations` argument instead.
#' @param simplify_expression `r lifecycle::badge("deprecated")`
#' Use the `simplify_expression` property of a [QueryOptFlags] object, then pass
#' that to the `optimizations` argument instead.
#' @param slice_pushdown `r lifecycle::badge("deprecated")`
#' Use the `slice_pushdown` property of a [QueryOptFlags] object, then pass
#' that to the `optimizations` argument instead.
#' @param comm_subplan_elim `r lifecycle::badge("deprecated")`
#' Use the `comm_subplan_elim` property of a [QueryOptFlags] object, then pass
#' that to the `optimizations` argument instead.
#' @param comm_subexpr_elim `r lifecycle::badge("deprecated")`
#' Use the `comm_subexpr_elim` property of a [QueryOptFlags] object, then pass
#' that to the `optimizations` argument instead.
#' @param cluster_with_columns `r lifecycle::badge("deprecated")`
#' Use the `cluster_with_columns` property of a [QueryOptFlags] object, then pass
#' that to the `optimizations` argument instead.
#' @param check_order_observe `r lifecycle::badge("deprecated")`
#' Use the `check_order_observe` property of a [QueryOptFlags] object, then pass
#' that to the `optimizations` argument instead.
#' @param fast_projection `r lifecycle::badge("deprecated")`
#' Use the `fast_projection` property of a [QueryOptFlags] object, then pass
#' that to the `optimizations` argument instead.
#' @param collapse_joins `r lifecycle::badge("deprecated")`
#' Use the `predicate_pushdown` property of a [QueryOptFlags] object, then pass
#' that to the `optimizations` argument instead.
#' @param no_optimization `r lifecycle::badge("deprecated")`
#' Use the `optimizations` argument with
#' [`pl$QueryOptFlags()$no_optimizations()`][QueryOptFlags] instead.
#' @seealso
#' - [`$profile()`][lazyframe__profile] - same as `$collect()` but also returns
#' a table with each operation profiled.
Expand All @@ -268,58 +298,40 @@ lazyframe__group_by <- function(..., .maintain_order = FALSE) {
#' )
lazyframe__collect <- function(
...,
type_coercion = TRUE,
`_type_check` = TRUE,
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
comm_subplan_elim = TRUE,
comm_subexpr_elim = TRUE,
cluster_with_columns = TRUE,
no_optimization = FALSE,
engine = c("auto", "in-memory", "streaming"),
`_check_order` = TRUE,
`_eager` = FALSE,
collapse_joins = deprecated()
optimizations = QueryOptFlags(),
type_coercion = deprecated(),
predicate_pushdown = deprecated(),
projection_pushdown = deprecated(),
simplify_expression = deprecated(),
slice_pushdown = deprecated(),
comm_subplan_elim = deprecated(),
comm_subexpr_elim = deprecated(),
cluster_with_columns = deprecated(),
collapse_joins = deprecated(),
no_optimization = deprecated()
) {
wrap({
check_dots_empty0(...)
engine <- arg_match0(engine, c("auto", "in-memory", "streaming"))
check_is_S7(optimizations, QueryOptFlags)

if (is_present(collapse_joins)) {
deprecate_warn(
c(
`!` = sprintf("%s is deprecated.", format_arg("collapse_joins")),
`i` = sprintf("Use %s instead.", format_arg("predicate_pushdown"))
)
)
}

if (isTRUE(no_optimization) || isTRUE(`_eager`)) {
predicate_pushdown <- FALSE
projection_pushdown <- FALSE
slice_pushdown <- FALSE
comm_subplan_elim <- FALSE
comm_subexpr_elim <- FALSE
cluster_with_columns <- FALSE
`_check_order` <- FALSE
}

ldf <- self$`_ldf`$optimization_toggle(
optimizations <- forward_old_opt_flags(
optimizations,
type_coercion = type_coercion,
`_type_check` = `_type_check`,
predicate_pushdown = predicate_pushdown,
projection_pushdown = projection_pushdown,
simplify_expression = simplify_expression,
slice_pushdown = slice_pushdown,
comm_subplan_elim = comm_subplan_elim,
comm_subexpr_elim = comm_subexpr_elim,
cluster_with_columns = cluster_with_columns,
`_check_order` = `_check_order`,
`_eager` = `_eager`
collapse_joins = collapse_joins,
no_optimization = no_optimization
)

ldf <- self$`_ldf`$with_optimizations(optimizations)

ldf$collect(engine)
})
}
Expand Down Expand Up @@ -446,59 +458,60 @@ lazyframe__profile <- function(
#' lazy_query <- lazy_frame$sort("Species")$filter(pl$col("Species") != "setosa")
#'
#' # This is the query that was written by the user, without any optimizations
#' # (use cat() for better printing)
#' lazy_query$explain(optimized = FALSE) |> cat()
#' # (use writeLines() for better printing)
#' lazy_query$explain(optimized = FALSE) |> writeLines()
#'
#' # This is the query after `polars` optimizes it: instead of sorting first and
#' # then filtering, it is faster to filter first and then sort the rest.
#' lazy_query$explain() |> cat()
#' lazy_query$explain() |> writeLines()
#'
#' # You can disable specific optimizations.
#' lazy_query$explain(
#' optimizations = pl$QueryOptFlags(predicate_pushdown = FALSE)
#' ) |>
#' writeLines()
#'
#' # Also possible to see this as tree format
#' lazy_query$explain(format = "tree") |> cat()
#' lazy_query$explain(format = "tree") |> writeLines()
lazyframe__explain <- function(
...,
format = c("plain", "tree"),
engine = c("auto", "in-memory", "streaming"),
optimized = TRUE,
type_coercion = TRUE,
`_type_check` = TRUE,
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
comm_subplan_elim = TRUE,
comm_subexpr_elim = TRUE,
cluster_with_columns = TRUE,
collapse_joins = deprecated(),
`_check_order` = TRUE
optimizations = QueryOptFlags(),
type_coercion = deprecated(),
predicate_pushdown = deprecated(),
projection_pushdown = deprecated(),
simplify_expression = deprecated(),
slice_pushdown = deprecated(),
comm_subplan_elim = deprecated(),
comm_subexpr_elim = deprecated(),
cluster_with_columns = deprecated(),
collapse_joins = deprecated()
) {
wrap({
check_dots_empty0(...)

format <- arg_match0(format, c("plain", "tree"))
engine <- arg_match0(engine, c("auto", "in-memory", "streaming"))
check_is_S7(optimizations, QueryOptFlags)

if (is_present(collapse_joins)) {
deprecate_warn(
c(
`!` = sprintf("%s is deprecated.", format_arg("collapse_joins")),
`i` = sprintf("Use %s instead.", format_arg("predicate_pushdown"))
)
)
}
optimizations <- forward_old_opt_flags(
optimizations,
type_coercion = type_coercion,
predicate_pushdown = predicate_pushdown,
projection_pushdown = projection_pushdown,
simplify_expression = simplify_expression,
slice_pushdown = slice_pushdown,
comm_subplan_elim = comm_subplan_elim,
comm_subexpr_elim = comm_subexpr_elim,
cluster_with_columns = cluster_with_columns,
collapse_joins = collapse_joins
)

if (isTRUE(optimized)) {
ldf <- self$`_ldf`$optimization_toggle(
type_coercion = type_coercion,
`_type_check` = `_type_check`,
predicate_pushdown = predicate_pushdown,
projection_pushdown = projection_pushdown,
simplify_expression = simplify_expression,
slice_pushdown = slice_pushdown,
comm_subplan_elim = comm_subplan_elim,
comm_subexpr_elim = comm_subexpr_elim,
cluster_with_columns = cluster_with_columns,
`_check_order` = `_check_order`,
`_eager` = FALSE
)
prop(optimizations, "streaming", check = FALSE) <- engine == "streaming"
ldf <- self$`_ldf`$with_optimizations(optimizations)

if (format == "tree") {
ldf$describe_optimized_plan_tree()
Expand Down
96 changes: 96 additions & 0 deletions R/lazyframe-utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,99 @@ parse_percentiles <- function(percentiles, inject_median = FALSE) {

c(sub_50_percentiles, at_or_above_50_percentiles)
}

forward_old_opt_flags <- function(
optimizations,
type_coercion = deprecated(),
predicate_pushdown = deprecated(),
projection_pushdown = deprecated(),
simplify_expression = deprecated(),
slice_pushdown = deprecated(),
comm_subplan_elim = deprecated(),
comm_subexpr_elim = deprecated(),
cluster_with_columns = deprecated(),
collapse_joins = deprecated(),
no_optimization = deprecated()
) {
call <- caller_env(2L)
warn_func <- function(arg_name) {
deprecate_warn(
c(
`!` = sprintf("%s is deprecated.", format_arg(arg_name)),
`i` = sprintf("Use %s instead.", format_arg("optimizations"))
),
always = TRUE,
user_env = call
)
}

need_validation <- FALSE

if (is_present(type_coercion)) {
warn_func("type_coercion")
prop(optimizations, "type_coercion", check = FALSE) <- type_coercion
need_validation <- TRUE
}
if (is_present(predicate_pushdown)) {
warn_func("predicate_pushdown")
prop(optimizations, "predicate_pushdown", check = FALSE) <- predicate_pushdown
need_validation <- TRUE
}
if (is_present(projection_pushdown)) {
warn_func("projection_pushdown")
prop(optimizations, "projection_pushdown", check = FALSE) <- projection_pushdown
need_validation <- TRUE
}
if (is_present(simplify_expression)) {
warn_func("simplify_expression")
prop(optimizations, "simplify_expression", check = FALSE) <- simplify_expression
need_validation <- TRUE
}
if (is_present(slice_pushdown)) {
warn_func("slice_pushdown")
prop(optimizations, "slice_pushdown", check = FALSE) <- slice_pushdown
need_validation <- TRUE
}
if (is_present(comm_subplan_elim)) {
warn_func("comm_subplan_elim")
prop(optimizations, "comm_subplan_elim", check = FALSE) <- comm_subplan_elim
need_validation <- TRUE
}
if (is_present(comm_subexpr_elim)) {
warn_func("comm_subexpr_elim")
prop(optimizations, "comm_subexpr_elim", check = FALSE) <- comm_subexpr_elim
need_validation <- TRUE
}
if (is_present(cluster_with_columns)) {
warn_func("cluster_with_columns")
prop(optimizations, "cluster_with_columns", check = FALSE) <- cluster_with_columns
need_validation <- TRUE
}

if (is_present(collapse_joins)) {
warn_func("collapse_joins")
# collapse_joins was merged to predicate_pushdown, so there is no flag anymore
}

if (is_present(no_optimization)) {
warn_func("no_optimization")
if (isTRUE(no_optimization)) {
props_uncheck(optimizations) <- list(
predicate_pushdown = FALSE,
projection_pushdown = FALSE,
slice_pushdown = FALSE,
comm_subplan_elim = FALSE,
comm_subexpr_elim = FALSE,
cluster_with_columns = FALSE,
check_order_observe = FALSE
)
need_validation <- TRUE
}
}

if (need_validation) {
validate(optimizations)
}

optimizations
}
6 changes: 6 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -2894,6 +2894,11 @@ SEXP savvy_PlRLazyFrame_with_columns_seq__impl(SEXP self__, SEXP c_arg__exprs) {
return handle_result(res);
}

SEXP savvy_PlRLazyFrame_with_optimizations__impl(SEXP self__, SEXP c_arg__optimizations) {
SEXP res = savvy_PlRLazyFrame_with_optimizations__ffi(self__, c_arg__optimizations);
return handle_result(res);
}

SEXP savvy_PlRLazyFrame_with_row_index__impl(SEXP self__, SEXP c_arg__name, SEXP c_arg__offset) {
SEXP res = savvy_PlRLazyFrame_with_row_index__ffi(self__, c_arg__name, c_arg__offset);
return handle_result(res);
Expand Down Expand Up @@ -3953,6 +3958,7 @@ static const R_CallMethodDef CallEntries[] = {
{"savvy_PlRLazyFrame_var__impl", (DL_FUNC) &savvy_PlRLazyFrame_var__impl, 2},
{"savvy_PlRLazyFrame_with_columns__impl", (DL_FUNC) &savvy_PlRLazyFrame_with_columns__impl, 2},
{"savvy_PlRLazyFrame_with_columns_seq__impl", (DL_FUNC) &savvy_PlRLazyFrame_with_columns_seq__impl, 2},
{"savvy_PlRLazyFrame_with_optimizations__impl", (DL_FUNC) &savvy_PlRLazyFrame_with_optimizations__impl, 2},
{"savvy_PlRLazyFrame_with_row_index__impl", (DL_FUNC) &savvy_PlRLazyFrame_with_row_index__impl, 3},
{"savvy_PlRLazyGroupBy_agg__impl", (DL_FUNC) &savvy_PlRLazyGroupBy_agg__impl, 2},
{"savvy_PlRLazyGroupBy_head__impl", (DL_FUNC) &savvy_PlRLazyGroupBy_head__impl, 2},
Expand Down
1 change: 1 addition & 0 deletions src/rust/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ SEXP savvy_PlRLazyFrame_unpivot__ffi(SEXP self__, SEXP c_arg__on, SEXP c_arg__in
SEXP savvy_PlRLazyFrame_var__ffi(SEXP self__, SEXP c_arg__ddof);
SEXP savvy_PlRLazyFrame_with_columns__ffi(SEXP self__, SEXP c_arg__exprs);
SEXP savvy_PlRLazyFrame_with_columns_seq__ffi(SEXP self__, SEXP c_arg__exprs);
SEXP savvy_PlRLazyFrame_with_optimizations__ffi(SEXP self__, SEXP c_arg__optimizations);
SEXP savvy_PlRLazyFrame_with_row_index__ffi(SEXP self__, SEXP c_arg__name, SEXP c_arg__offset);

// methods and associated functions for PlRLazyGroupBy
Expand Down
Loading