diff --git a/bigframes/core/compile/polars/compiler.py b/bigframes/core/compile/polars/compiler.py index 1bfbe0f734..840f0adaee 100644 --- a/bigframes/core/compile/polars/compiler.py +++ b/bigframes/core/compile/polars/compiler.py @@ -33,7 +33,9 @@ import bigframes.operations.aggregations as agg_ops import bigframes.operations.bool_ops as bool_ops import bigframes.operations.comparison_ops as comp_ops +import bigframes.operations.date_ops as date_ops import bigframes.operations.datetime_ops as dt_ops +import bigframes.operations.frequency_ops as freq_ops import bigframes.operations.generic_ops as gen_ops import bigframes.operations.json_ops as json_ops import bigframes.operations.numeric_ops as num_ops @@ -74,6 +76,20 @@ def decorator(func): if polars_installed: + _FREQ_MAPPING = { + "Y": "1y", + "Q": "1q", + "M": "1mo", + "W": "1w", + "D": "1d", + "h": "1h", + "min": "1m", + "s": "1s", + "ms": "1ms", + "us": "1us", + "ns": "1ns", + } + _DTYPE_MAPPING = { # Direct mappings bigframes.dtypes.INT_DTYPE: pl.Int64(), @@ -329,11 +345,48 @@ def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: else: return pl.any_horizontal(*(input.str.ends_with(pat) for pat in op.pat)) + @compile_op.register(freq_ops.FloorDtOp) + def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: + assert isinstance(op, freq_ops.FloorDtOp) + return input.dt.truncate(every=_FREQ_MAPPING[op.freq]) + @compile_op.register(dt_ops.StrftimeOp) def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: assert isinstance(op, dt_ops.StrftimeOp) return input.dt.strftime(op.date_format) + @compile_op.register(date_ops.YearOp) + def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: + return input.dt.year() + + @compile_op.register(date_ops.QuarterOp) + def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: + return input.dt.quarter() + + @compile_op.register(date_ops.MonthOp) + def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: + return input.dt.month() + + @compile_op.register(date_ops.DayOfWeekOp) + def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: + return input.dt.weekday() - 1 + + @compile_op.register(date_ops.DayOp) + def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: + return input.dt.day() + + @compile_op.register(date_ops.IsoYearOp) + def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: + return input.dt.iso_year() + + @compile_op.register(date_ops.IsoWeekOp) + def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: + return input.dt.week() + + @compile_op.register(date_ops.IsoDayOp) + def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: + return input.dt.weekday() + @compile_op.register(dt_ops.ParseDatetimeOp) def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: assert isinstance(op, dt_ops.ParseDatetimeOp) diff --git a/bigframes/operations/datetimes.py b/bigframes/operations/datetimes.py index 14bf10f463..95896ddc97 100644 --- a/bigframes/operations/datetimes.py +++ b/bigframes/operations/datetimes.py @@ -30,6 +30,7 @@ _ONE_DAY = pandas.Timedelta("1d") _ONE_SECOND = pandas.Timedelta("1s") _ONE_MICRO = pandas.Timedelta("1us") +_SUPPORTED_FREQS = ("Y", "Q", "M", "W", "D", "h", "min", "s", "ms", "us") @log_adapter.class_logger @@ -155,4 +156,6 @@ def normalize(self) -> series.Series: return self._apply_unary_op(ops.normalize_op) def floor(self, freq: str) -> series.Series: - return self._apply_unary_op(ops.FloorDtOp(freq=freq)) + if freq not in _SUPPORTED_FREQS: + raise ValueError(f"freq must be one of {_SUPPORTED_FREQS}") + return self._apply_unary_op(ops.FloorDtOp(freq=freq)) # type: ignore diff --git a/bigframes/operations/frequency_ops.py b/bigframes/operations/frequency_ops.py index 2d5a854c32..b94afa7271 100644 --- a/bigframes/operations/frequency_ops.py +++ b/bigframes/operations/frequency_ops.py @@ -27,9 +27,22 @@ @dataclasses.dataclass(frozen=True) class FloorDtOp(base_ops.UnaryOp): name: typing.ClassVar[str] = "floor_dt" - freq: str + freq: typing.Literal[ + "Y", + "Q", + "M", + "W", + "D", + "h", + "min", + "s", + "ms", + "us", + ] def output_type(self, *input_types): + if not dtypes.is_datetime_like(input_types[0]): + raise TypeError("dt floor requires datetime-like arguments") return input_types[0] diff --git a/bigframes/session/polars_executor.py b/bigframes/session/polars_executor.py index b93d31d255..d8df558fe4 100644 --- a/bigframes/session/polars_executor.py +++ b/bigframes/session/polars_executor.py @@ -24,6 +24,8 @@ from bigframes.operations import ( bool_ops, comparison_ops, + date_ops, + frequency_ops, generic_ops, numeric_ops, string_ops, @@ -60,6 +62,15 @@ comparison_ops.GtOp, comparison_ops.LeOp, comparison_ops.GeOp, + date_ops.YearOp, + date_ops.QuarterOp, + date_ops.MonthOp, + date_ops.DayOfWeekOp, + date_ops.DayOp, + date_ops.IsoYearOp, + date_ops.IsoWeekOp, + date_ops.IsoDayOp, + frequency_ops.FloorDtOp, numeric_ops.AddOp, numeric_ops.SubOp, numeric_ops.MulOp, diff --git a/tests/system/small/engines/test_temporal_ops.py b/tests/system/small/engines/test_temporal_ops.py new file mode 100644 index 0000000000..5a39587886 --- /dev/null +++ b/tests/system/small/engines/test_temporal_ops.py @@ -0,0 +1,66 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from bigframes.core import array_value +import bigframes.operations as ops +from bigframes.session import polars_executor +from bigframes.testing.engine_utils import assert_equivalence_execution + +pytest.importorskip("polars") + +# Polars used as reference as its fast and local. Generally though, prefer gbq engine where they disagree. +REFERENCE_ENGINE = polars_executor.PolarsExecutor() + + +@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +def test_engines_dt_floor(scalars_array_value: array_value.ArrayValue, engine): + arr, _ = scalars_array_value.compute_values( + [ + ops.FloorDtOp("us").as_expr("timestamp_col"), + ops.FloorDtOp("ms").as_expr("timestamp_col"), + ops.FloorDtOp("s").as_expr("timestamp_col"), + ops.FloorDtOp("min").as_expr("timestamp_col"), + ops.FloorDtOp("h").as_expr("timestamp_col"), + ops.FloorDtOp("D").as_expr("timestamp_col"), + ops.FloorDtOp("W").as_expr("timestamp_col"), + ops.FloorDtOp("M").as_expr("timestamp_col"), + ops.FloorDtOp("Q").as_expr("timestamp_col"), + ops.FloorDtOp("Y").as_expr("timestamp_col"), + ops.FloorDtOp("Q").as_expr("datetime_col"), + ops.FloorDtOp("us").as_expr("datetime_col"), + ] + ) + assert_equivalence_execution(arr.node, REFERENCE_ENGINE, engine) + + +@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +def test_engines_date_accessors(scalars_array_value: array_value.ArrayValue, engine): + datelike_cols = ["datetime_col", "timestamp_col", "date_col"] + accessors = [ + ops.day_op, + ops.dayofweek_op, + ops.month_op, + ops.quarter_op, + ops.year_op, + ops.iso_day_op, + ops.iso_week_op, + ops.iso_year_op, + ] + + exprs = [acc.as_expr(col) for acc in accessors for col in datelike_cols] + + arr, _ = scalars_array_value.compute_values(exprs) + assert_equivalence_execution(arr.node, REFERENCE_ENGINE, engine) diff --git a/tests/unit/core/compile/sqlglot/expressions/snapshots/test_unary_compiler/test_floor_dt/out.sql b/tests/unit/core/compile/sqlglot/expressions/snapshots/test_unary_compiler/test_floor_dt/out.sql index 3c7efd3098..ad4fdb23a1 100644 --- a/tests/unit/core/compile/sqlglot/expressions/snapshots/test_unary_compiler/test_floor_dt/out.sql +++ b/tests/unit/core/compile/sqlglot/expressions/snapshots/test_unary_compiler/test_floor_dt/out.sql @@ -5,7 +5,7 @@ WITH `bfcte_0` AS ( ), `bfcte_1` AS ( SELECT *, - TIMESTAMP_TRUNC(`bfcol_0`, DAY) AS `bfcol_1` + TIMESTAMP_TRUNC(`bfcol_0`, D) AS `bfcol_1` FROM `bfcte_0` ) SELECT diff --git a/tests/unit/core/compile/sqlglot/expressions/test_unary_compiler.py b/tests/unit/core/compile/sqlglot/expressions/test_unary_compiler.py index f011721ee5..8f3af11842 100644 --- a/tests/unit/core/compile/sqlglot/expressions/test_unary_compiler.py +++ b/tests/unit/core/compile/sqlglot/expressions/test_unary_compiler.py @@ -153,7 +153,7 @@ def test_expm1(scalar_types_df: bpd.DataFrame, snapshot): def test_floor_dt(scalar_types_df: bpd.DataFrame, snapshot): bf_df = scalar_types_df[["timestamp_col"]] - sql = _apply_unary_op(bf_df, ops.FloorDtOp("DAY"), "timestamp_col") + sql = _apply_unary_op(bf_df, ops.FloorDtOp("D"), "timestamp_col") snapshot.assert_match(sql, "out.sql")