Skip to content

feat: Add isin local execution impl #1993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ def isin_op_impl(x: ibis_types.Value, op: ops.IsInOp):
if op.match_nulls and contains_nulls:
return x.isnull() | x.isin(matchable_ibis_values)
else:
return x.isin(matchable_ibis_values)
return x.isin(matchable_ibis_values).fillna(False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change any existing behavior? Were we missing a test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, it looks like we were handling this higher up. removed that code, and now the op itself is purely non-null. also added a test



@scalar_op_compiler.register_unary_op(ops.ToDatetimeOp, pass_op=True)
Expand Down
8 changes: 3 additions & 5 deletions bigframes/core/compile/polars/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,9 @@ def _(self, op: ops.ScalarOp, l_input: pl.Expr, r_input: pl.Expr) -> pl.Expr:
def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr:
# TODO: Filter out types that can't be coerced to right type
assert isinstance(op, gen_ops.IsInOp)
if op.match_nulls or not any(map(pd.isna, op.values)):
# newer polars version have nulls_equal arg
return input.is_in(op.values)
else:
return input.is_in(op.values) or input.is_null()
assert not op.match_nulls # should be stripped by a lowering step rn
values = pl.Series(op.values, strict=False)
return input.is_in(values)

@compile_op.register(gen_ops.FillNaOp)
@compile_op.register(gen_ops.CoalesceOp)
Expand Down
32 changes: 32 additions & 0 deletions bigframes/core/compile/polars/lowering.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.

import dataclasses
from typing import cast

import numpy as np
import pandas as pd

from bigframes import dtypes
from bigframes.core import bigframe_node, expression
Expand Down Expand Up @@ -316,6 +318,35 @@ def lower(self, expr: expression.OpExpression) -> expression.Expression:
return expr


class LowerIsinOp(op_lowering.OpLoweringRule):
@property
def op(self) -> type[ops.ScalarOp]:
return generic_ops.IsInOp

def lower(self, expr: expression.OpExpression) -> expression.Expression:
assert isinstance(expr.op, generic_ops.IsInOp)
arg = expr.children[0]
new_values = []
match_nulls = False
for val in expr.op.values:
# coercible, non-coercible
# float NaN/inf should be treated as distinct from 'true' null values
if cast(bool, pd.isna(val)) and not isinstance(val, float):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can fold this into one statement:

match_nulls = cast(bool, pd.isna(val)) and not isinstance(val, float) and expr.op.match_nulls

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do to the elif statement, that wouldn't quite be the same

if expr.op.match_nulls:
match_nulls = True
elif dtypes.is_compatible(val, arg.output_type):
new_values.append(val)
else:
pass

new_isin = ops.IsInOp(tuple(new_values), match_nulls=False).as_expr(arg)
if match_nulls:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So whether we match nulls partially depends on the last values in expr.op.values? Is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

return ops.coalesce_op.as_expr(new_isin, expression.const(True))
else:
# polars propagates nulls, so need to coalesce to false
return ops.coalesce_op.as_expr(new_isin, expression.const(False))


def _coerce_comparables(
expr1: expression.Expression,
expr2: expression.Expression,
Expand Down Expand Up @@ -414,6 +445,7 @@ def _lower_cast(cast_op: ops.AsTypeOp, arg: expression.Expression):
LowerModRule(),
LowerAsTypeRule(),
LowerInvertOp(),
LowerIsinOp(),
)


Expand Down
1 change: 1 addition & 0 deletions bigframes/session/polars_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
generic_ops.FillNaOp,
generic_ops.CaseWhenOp,
generic_ops.InvertOp,
generic_ops.IsInOp,
)
_COMPATIBLE_AGG_OPS = (
agg_ops.SizeOp,
Expand Down
32 changes: 32 additions & 0 deletions tests/system/small/engines/test_generic_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,3 +390,35 @@ def test_engines_invert_op(scalars_array_value: array_value.ArrayValue, engine):
)

assert_equivalence_execution(arr.node, REFERENCE_ENGINE, engine)


@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
def test_engines_isin_op(scalars_array_value: array_value.ArrayValue, engine):
arr, col_ids = scalars_array_value.compute_values(
[
ops.IsInOp((1, 2, 3)).as_expr(expression.deref("int64_col")),
ops.IsInOp((None, 123456)).as_expr(expression.deref("int64_col")),
ops.IsInOp((None, 123456), match_nulls=False).as_expr(
expression.deref("int64_col")
),
ops.IsInOp((1.0, 2.0, 3.0)).as_expr(expression.deref("int64_col")),
ops.IsInOp(("1.0", "2.0")).as_expr(expression.deref("int64_col")),
ops.IsInOp(("1.0", 2.5, 3)).as_expr(expression.deref("int64_col")),
ops.IsInOp(()).as_expr(expression.deref("int64_col")),
ops.IsInOp((1, 2, 3, None)).as_expr(expression.deref("float64_col")),
]
)
new_names = (
"int in ints",
"int in ints w null",
"int in ints w null wo match nulls",
"int in floats",
"int in strings",
"int in mixed",
"int in empty",
"float in ints",
)
arr = arr.rename_columns(
{old_name: new_names[i] for i, old_name in enumerate(col_ids)}
)
assert_equivalence_execution(arr.node, REFERENCE_ENGINE, engine)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line above assertion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added