11from __future__ import annotations
22
3- from typing import TYPE_CHECKING , Any , Callable , Iterable , Literal , Protocol
3+ from typing import TYPE_CHECKING , Any , Literal , Protocol
44
55import ibis
66from ibis .expr import datatypes as dt
1212if TYPE_CHECKING :
1313 from collections .abc import Mapping
1414
15-
16- class Filters :
17- """A simple namespace for filter functions."""
18-
19- @staticmethod
20- def all_changed (
21- subset : str | Iterable [str ] | None = None , /
22- ) -> Callable [[ir .Table ], Literal [True ] | ir .BooleanValue ]:
23- """
24- Make a Updates filter function that gives rows where all columns are different.
25-
26- Parameters
27- ----------
28- subset : str | Iterable[str] | None
29- The columns to consider.
30-
31- If None, and the schemas of the before and after tables are different,
32- then we return True (ie, keep all rows), because by definition
33- every row is different, since the schemas are different.
34-
35- If None, and the schemas of the before and after tables are the same,
36- then we consider all columns in both before and after tables.
37-
38- If you joined on an eg id column, you almost definitely want to exclude it here.
39-
40- Examples
41- --------
42- >>> u = Updates.from_tables(before, after, join_on="id") # doctest: +SKIP
43- >>> u.filter(u.filters.all_changed(["name", "age"])) # doctest: +SKIP
44- >>> u.filter(
45- ... u.filters.all_changed([c for c in u.columns if c != "my_id"])
46- ... ) # doctest: +SKIP
47- """ # noqa: E501
48-
49- def filter_func (table : ir .Table , / ) -> ir .BooleanValue | Literal [True ]:
50- nonlocal subset
51- if subset is None :
52- u = Updates (table , check_schemas = "lax" )
53- if u .before ().schema () != u .after ().schema ():
54- return True
55- subset = table .columns
56- if isinstance (subset , str ):
57- subset = [subset ]
58- return ibis .and_ (* (~ identical_to (table [col ]) for col in subset ))
59-
60- return filter_func
61-
62- @staticmethod
63- def any_changed (
64- subset : str | Iterable [str ] | None = None , /
65- ) -> Callable [[ir .Table ], ir .BooleanValue | Literal [True ]]:
66- """Make a Updates filter function that gives rows where any column's `after` is different
67- from its `before`.
68-
69- Parameters
70- ----------
71- subset : str | Iterable[str] | None
72- The columns to consider.
73-
74- If None, and the schemas of the before and after tables are different,
75- then we return True (ie, keep all rows), because by definition
76- every row is different, since the schemas are different.
77-
78- If None, and the schemas of the before and after tables are the same,
79- then we consider all columns in both before and after tables.
80-
81- Examples
82- --------
83- >>> u = Updates.from_tables(before, after, join_on="id") # doctest: +SKIP
84- >>> u.filter(u.filters.any_changed(["name", "age"])) # doctest: +SKIP
85- """
86-
87- def filter_func (table : ir .Table , / ) -> ir .BooleanValue | Literal [True ]:
88- nonlocal subset
89- if subset is None :
90- u = Updates (table , check_schemas = "lax" )
91- if u .before ().schema () != u .after ().schema ():
92- return True
93- subset = table .columns
94- if isinstance (subset , str ):
95- subset = [subset ]
96- return ibis .or_ (* (~ identical_to (table [col ]) for col in subset ))
97-
98- return filter_func
15+ ValueChangeType = Literal [
16+ "remained_null" , "became_null" , "became_nonnull" , "changed" , "unchanged"
17+ ]
9918
10019
10120class UpdatedColumn (StructWrapper ):
@@ -115,18 +34,67 @@ class UpdatedColumn(StructWrapper):
11534
11635 def __init__ (self , x : ir .StructValue , / ) -> None :
11736 super ().__init__ (x )
118- if "before" not in x .type ().names :
37+ if "before" not in x .type ().names : # ty:ignore[unresolved-attribute]
11938 object .__setattr__ (self , "before" , None )
120- if "after" not in x .type ().names :
39+ if "after" not in x .type ().names : # ty:ignore[unresolved-attribute]
12140 object .__setattr__ (self , "after" , None )
41+ if self .before is None and self .after is None :
42+ raise ValueError (
43+ "UpdatedColumn must have at least one of 'before' or 'after'"
44+ )
12245
123- def is_changed (self ) -> ir .BooleanValue | Literal [False ]:
46+ def is_changed (self ) -> ir .BooleanValue | Literal [True ]:
12447 """Is `before` different from `after`?
12548
12649 If this column does not have both before and after fields, returns False,
12750 because that implies the schema has changed.
12851 """
129- return identical_to (self )
52+ return is_changed (self )
53+
54+ def schema_change (self ) -> Literal ["added" , "removed" , "type_changed" , "unchanged" ]:
55+ """Was this column added, removed, have its type changed, or unchanged in the update?""" # noqa: E501
56+ if self .before is None and self .after is not None :
57+ return "added"
58+ elif self .before is not None and self .after is None :
59+ return "removed"
60+ else :
61+ if self .before .type () != self .after .type (): # ty:ignore[possibly-missing-attribute]
62+ return "type_changed"
63+ else :
64+ return "unchanged"
65+
66+ def value_change (self ) -> ibis .StringValue :
67+ """How did the value change?
68+
69+ Returns one of:
70+
71+ - "remained_null": both before and after are null
72+ - "became_null": before was not null, after is null
73+ - "became_nonnull": before was null, after is not null
74+ - "changed": both before and after are not null, but different
75+ - "unchanged": both before and after are identical
76+
77+ Throws
78+ ------
79+ ValueError
80+ If the column was added or removed (ie does not have both before and after fields).
81+ Check this first with `schema_change()`.
82+ """ # noqa: E501
83+ if self .schema_change () in ("added" , "removed" ):
84+ raise ValueError (
85+ "Cannot determine value change for columns that were added or removed"
86+ )
87+ before_is_null = self .before .isnull ()
88+ after_is_null = self .after .isnull ()
89+ return (
90+ ibis .cases (
91+ (before_is_null & after_is_null , "remained_null" ),
92+ (~ before_is_null & after_is_null , "became_null" ),
93+ (before_is_null & ~ after_is_null , "became_nonnull" ),
94+ (identical_to (self ), "unchanged" ),
95+ else_ = "changed" ,
96+ ),
97+ )
13098
13199
132100class Updates (TableWrapper ):
@@ -142,15 +110,6 @@ class Updates(TableWrapper):
142110 If a column has both 'before' and 'after' fields, it means this column was present in both tables.
143111 """ # noqa: E501
144112
145- filters = Filters
146- """A set of filters for convenience.
147-
148- Examples
149- --------
150- >>> u = Updates.from_tables(before, after, join_on="id") # doctest: +SKIP
151- >>> u.filter(u.filters.all_changed(["name", "age"])) # doctest: +SKIP
152- """
153-
154113 def __init__ (
155114 self ,
156115 diff_table : ibis .Table ,
@@ -407,3 +366,15 @@ def identical_to(val: HasBeforeAfter, /) -> ibis.ir.BooleanValue | Literal[False
407366 if (before := val .before ) is None or (after := val .after ) is None :
408367 return False
409368 return before .identical_to (after )
369+
370+
371+ def is_changed (val : HasBeforeAfter , / ) -> ir .BooleanValue | Literal [True ]:
372+ """Is `before` different from `after`?
373+
374+ If this column does not have both before and after fields, returns False,
375+ because that implies the schema has changed.
376+ """
377+ is_ident = identical_to (val )
378+ if is_ident is False :
379+ return True
380+ return ~ is_ident
0 commit comments