16
16
17
17
import bigframes
18
18
from bigframes .core import identifiers , local_data , nodes
19
- from bigframes .session import polars_executor , semi_executor
19
+ from bigframes .session import polars_executor
20
+ from tests .system .small .engines .engine_utils import assert_equivalence_execution
20
21
21
22
pytest .importorskip ("polars" )
22
23
23
24
# Polars used as reference as its fast and local. Generally though, prefer gbq engine where they disagree.
24
25
REFERENCE_ENGINE = polars_executor .PolarsExecutor ()
25
26
26
27
27
- def ensure_equivalence (
28
- node : nodes .BigFrameNode ,
29
- engine1 : semi_executor .SemiExecutor ,
30
- engine2 : semi_executor .SemiExecutor ,
31
- ):
32
- e1_result = engine1 .execute (node , ordered = True )
33
- e2_result = engine2 .execute (node , ordered = True )
34
- assert e1_result is not None
35
- assert e2_result is not None
36
- # Schemas might have extra nullity markers, normalize to node expected schema, which should be looser
37
- e1_table = e1_result .to_arrow_table ().cast (node .schema .to_pyarrow ())
38
- e2_table = e2_result .to_arrow_table ().cast (node .schema .to_pyarrow ())
39
- assert e1_table .equals (e2_table ), f"{ e1_table } is not equal to { e2_table } "
40
-
41
-
42
28
def test_engines_read_local (
43
29
fake_session : bigframes .Session ,
44
30
managed_data_source : local_data .ManagedArrowTable ,
@@ -51,7 +37,7 @@ def test_engines_read_local(
51
37
local_node = nodes .ReadLocalNode (
52
38
managed_data_source , scan_list , fake_session , offsets_col = None
53
39
)
54
- ensure_equivalence (local_node , REFERENCE_ENGINE , engine )
40
+ assert_equivalence_execution (local_node , REFERENCE_ENGINE , engine )
55
41
56
42
57
43
def test_engines_read_local_w_offsets (
@@ -69,7 +55,7 @@ def test_engines_read_local_w_offsets(
69
55
fake_session ,
70
56
offsets_col = identifiers .ColumnId ("offsets" ),
71
57
)
72
- ensure_equivalence (local_node , REFERENCE_ENGINE , engine )
58
+ assert_equivalence_execution (local_node , REFERENCE_ENGINE , engine )
73
59
74
60
75
61
def test_engines_read_local_w_col_subset (
@@ -84,7 +70,7 @@ def test_engines_read_local_w_col_subset(
84
70
local_node = nodes .ReadLocalNode (
85
71
managed_data_source , scan_list , fake_session , offsets_col = None
86
72
)
87
- ensure_equivalence (local_node , REFERENCE_ENGINE , engine )
73
+ assert_equivalence_execution (local_node , REFERENCE_ENGINE , engine )
88
74
89
75
90
76
def test_engines_read_local_w_zero_row_source (
@@ -99,7 +85,7 @@ def test_engines_read_local_w_zero_row_source(
99
85
local_node = nodes .ReadLocalNode (
100
86
zero_row_source , scan_list , fake_session , offsets_col = None
101
87
)
102
- ensure_equivalence (local_node , REFERENCE_ENGINE , engine )
88
+ assert_equivalence_execution (local_node , REFERENCE_ENGINE , engine )
103
89
104
90
105
91
def test_engines_read_local_w_nested_source (
@@ -114,7 +100,7 @@ def test_engines_read_local_w_nested_source(
114
100
local_node = nodes .ReadLocalNode (
115
101
nested_data_source , scan_list , fake_session , offsets_col = None
116
102
)
117
- ensure_equivalence (local_node , REFERENCE_ENGINE , engine )
103
+ assert_equivalence_execution (local_node , REFERENCE_ENGINE , engine )
118
104
119
105
120
106
def test_engines_read_local_w_repeated_source (
@@ -129,4 +115,4 @@ def test_engines_read_local_w_repeated_source(
129
115
local_node = nodes .ReadLocalNode (
130
116
repeated_data_source , scan_list , fake_session , offsets_col = None
131
117
)
132
- ensure_equivalence (local_node , REFERENCE_ENGINE , engine )
118
+ assert_equivalence_execution (local_node , REFERENCE_ENGINE , engine )
0 commit comments