Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions bigframes/core/compile/polars/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import bigframes.operations.aggregations as agg_ops
import bigframes.operations.bool_ops as bool_ops
import bigframes.operations.comparison_ops as comp_ops
import bigframes.operations.date_ops as date_ops
import bigframes.operations.datetime_ops as dt_ops
import bigframes.operations.frequency_ops as freq_ops
import bigframes.operations.generic_ops as gen_ops
import bigframes.operations.json_ops as json_ops
import bigframes.operations.numeric_ops as num_ops
Expand Down Expand Up @@ -74,6 +76,20 @@ def decorator(func):


if polars_installed:
_FREQ_MAPPING = {
"Y": "1y",
"Q": "1q",
"M": "1mo",
"W": "1w",
"D": "1d",
"h": "1h",
"min": "1m",
"s": "1s",
"ms": "1ms",
"us": "1us",
"ns": "1ns",
}

_DTYPE_MAPPING = {
# Direct mappings
bigframes.dtypes.INT_DTYPE: pl.Int64(),
Expand Down Expand Up @@ -329,11 +345,48 @@ def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
else:
return pl.any_horizontal(*(input.str.ends_with(pat) for pat in op.pat))

@compile_op.register(freq_ops.FloorDtOp)
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
assert isinstance(op, freq_ops.FloorDtOp)
return input.dt.truncate(every=_FREQ_MAPPING[op.freq])

@compile_op.register(dt_ops.StrftimeOp)
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
assert isinstance(op, dt_ops.StrftimeOp)
return input.dt.strftime(op.date_format)

@compile_op.register(date_ops.YearOp)
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
return input.dt.year()

@compile_op.register(date_ops.QuarterOp)
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
return input.dt.quarter()

@compile_op.register(date_ops.MonthOp)
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
return input.dt.month()

@compile_op.register(date_ops.DayOfWeekOp)
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
return input.dt.weekday() - 1

@compile_op.register(date_ops.DayOp)
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
return input.dt.day()

@compile_op.register(date_ops.IsoYearOp)
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
return input.dt.iso_year()

@compile_op.register(date_ops.IsoWeekOp)
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
return input.dt.week()

@compile_op.register(date_ops.IsoDayOp)
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
return input.dt.weekday()

@compile_op.register(dt_ops.ParseDatetimeOp)
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
assert isinstance(op, dt_ops.ParseDatetimeOp)
Expand Down
5 changes: 4 additions & 1 deletion bigframes/operations/datetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
_ONE_DAY = pandas.Timedelta("1d")
_ONE_SECOND = pandas.Timedelta("1s")
_ONE_MICRO = pandas.Timedelta("1us")
_SUPPORTED_FREQS = ("Y", "Q", "M", "W", "D", "h", "min", "s", "ms", "us")


@log_adapter.class_logger
Expand Down Expand Up @@ -155,4 +156,6 @@ def normalize(self) -> series.Series:
return self._apply_unary_op(ops.normalize_op)

def floor(self, freq: str) -> series.Series:
return self._apply_unary_op(ops.FloorDtOp(freq=freq))
if freq not in _SUPPORTED_FREQS:
raise ValueError(f"freq must be one of {_SUPPORTED_FREQS}")
return self._apply_unary_op(ops.FloorDtOp(freq=freq)) # type: ignore
15 changes: 14 additions & 1 deletion bigframes/operations/frequency_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,22 @@
@dataclasses.dataclass(frozen=True)
class FloorDtOp(base_ops.UnaryOp):
name: typing.ClassVar[str] = "floor_dt"
freq: str
freq: typing.Literal[
"Y",
"Q",
"M",
"W",
"D",
"h",
"min",
"s",
"ms",
"us",
]

def output_type(self, *input_types):
if not dtypes.is_datetime_like(input_types[0]):
raise TypeError("dt floor requires datetime-like arguments")
return input_types[0]


Expand Down
11 changes: 11 additions & 0 deletions bigframes/session/polars_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from bigframes.operations import (
bool_ops,
comparison_ops,
date_ops,
frequency_ops,
generic_ops,
numeric_ops,
string_ops,
Expand Down Expand Up @@ -60,6 +62,15 @@
comparison_ops.GtOp,
comparison_ops.LeOp,
comparison_ops.GeOp,
date_ops.YearOp,
date_ops.QuarterOp,
date_ops.MonthOp,
date_ops.DayOfWeekOp,
date_ops.DayOp,
date_ops.IsoYearOp,
date_ops.IsoWeekOp,
date_ops.IsoDayOp,
frequency_ops.FloorDtOp,
numeric_ops.AddOp,
numeric_ops.SubOp,
numeric_ops.MulOp,
Expand Down
66 changes: 66 additions & 0 deletions tests/system/small/engines/test_temporal_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from bigframes.core import array_value
import bigframes.operations as ops
from bigframes.session import polars_executor
from bigframes.testing.engine_utils import assert_equivalence_execution

pytest.importorskip("polars")

# Polars used as reference as its fast and local. Generally though, prefer gbq engine where they disagree.
REFERENCE_ENGINE = polars_executor.PolarsExecutor()


@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
def test_engines_dt_floor(scalars_array_value: array_value.ArrayValue, engine):
arr, _ = scalars_array_value.compute_values(
[
ops.FloorDtOp("us").as_expr("timestamp_col"),
ops.FloorDtOp("ms").as_expr("timestamp_col"),
ops.FloorDtOp("s").as_expr("timestamp_col"),
ops.FloorDtOp("min").as_expr("timestamp_col"),
ops.FloorDtOp("h").as_expr("timestamp_col"),
ops.FloorDtOp("D").as_expr("timestamp_col"),
ops.FloorDtOp("W").as_expr("timestamp_col"),
ops.FloorDtOp("M").as_expr("timestamp_col"),
ops.FloorDtOp("Q").as_expr("timestamp_col"),
ops.FloorDtOp("Y").as_expr("timestamp_col"),
ops.FloorDtOp("Q").as_expr("datetime_col"),
ops.FloorDtOp("us").as_expr("datetime_col"),
]
)
assert_equivalence_execution(arr.node, REFERENCE_ENGINE, engine)


@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
def test_engines_date_accessors(scalars_array_value: array_value.ArrayValue, engine):
datelike_cols = ["datetime_col", "timestamp_col", "date_col"]
accessors = [
ops.day_op,
ops.dayofweek_op,
ops.month_op,
ops.quarter_op,
ops.year_op,
ops.iso_day_op,
ops.iso_week_op,
ops.iso_year_op,
]

exprs = [acc.as_expr(col) for acc in accessors for col in datelike_cols]

arr, _ = scalars_array_value.compute_values(exprs)
assert_equivalence_execution(arr.node, REFERENCE_ENGINE, engine)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ WITH `bfcte_0` AS (
), `bfcte_1` AS (
SELECT
*,
TIMESTAMP_TRUNC(`bfcol_0`, DAY) AS `bfcol_1`
TIMESTAMP_TRUNC(`bfcol_0`, D) AS `bfcol_1`
FROM `bfcte_0`
)
SELECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def test_expm1(scalar_types_df: bpd.DataFrame, snapshot):

def test_floor_dt(scalar_types_df: bpd.DataFrame, snapshot):
bf_df = scalar_types_df[["timestamp_col"]]
sql = _apply_unary_op(bf_df, ops.FloorDtOp("DAY"), "timestamp_col")
sql = _apply_unary_op(bf_df, ops.FloorDtOp("D"), "timestamp_col")

snapshot.assert_match(sql, "out.sql")

Expand Down