22import platform
33
44_ = pytest .importorskip ("duckdb.experimental.spark" )
5- from spark_namespace .sql import functions as F
5+ from spark_namespace .sql import functions as sf
66from spark_namespace .sql .types import Row
77from spark_namespace import USE_ACTUAL_SPARK
88
@@ -19,7 +19,7 @@ def test_array_distinct(self, spark):
1919 ([2 , 4 , 5 ], 3 ),
2020 ]
2121 df = spark .createDataFrame (data , ["firstColumn" , "secondColumn" ])
22- df = df .withColumn ("distinct_values" , F .array_distinct (F .col ("firstColumn" )))
22+ df = df .withColumn ("distinct_values" , sf .array_distinct (sf .col ("firstColumn" )))
2323 res = df .select ("distinct_values" ).collect ()
2424 # Output order can vary across platforms which is why we sort it first
2525 assert len (res ) == 2
@@ -31,7 +31,7 @@ def test_array_intersect(self, spark):
3131 (["b" , "a" , "c" ], ["c" , "d" , "a" , "f" ]),
3232 ]
3333 df = spark .createDataFrame (data , ["c1" , "c2" ])
34- df = df .withColumn ("intersect_values" , F .array_intersect (F .col ("c1" ), F .col ("c2" )))
34+ df = df .withColumn ("intersect_values" , sf .array_intersect (sf .col ("c1" ), sf .col ("c2" )))
3535 res = df .select ("intersect_values" ).collect ()
3636 # Output order can vary across platforms which is why we sort it first
3737 assert len (res ) == 1
@@ -42,7 +42,7 @@ def test_array_union(self, spark):
4242 (["b" , "a" , "c" ], ["c" , "d" , "a" , "f" ]),
4343 ]
4444 df = spark .createDataFrame (data , ["c1" , "c2" ])
45- df = df .withColumn ("union_values" , F .array_union (F .col ("c1" ), F .col ("c2" )))
45+ df = df .withColumn ("union_values" , sf .array_union (sf .col ("c1" ), sf .col ("c2" )))
4646 res = df .select ("union_values" ).collect ()
4747 # Output order can vary across platforms which is why we sort it first
4848 assert len (res ) == 1
@@ -54,7 +54,7 @@ def test_array_max(self, spark):
5454 ([4 , 2 , 5 ], 5 ),
5555 ]
5656 df = spark .createDataFrame (data , ["firstColumn" , "secondColumn" ])
57- df = df .withColumn ("max_value" , F .array_max (F .col ("firstColumn" )))
57+ df = df .withColumn ("max_value" , sf .array_max (sf .col ("firstColumn" )))
5858 res = df .select ("max_value" ).collect ()
5959 assert res == [
6060 Row (max_value = 3 ),
@@ -67,7 +67,7 @@ def test_array_min(self, spark):
6767 ([2 , 4 , 5 ], 5 ),
6868 ]
6969 df = spark .createDataFrame (data , ["firstColumn" , "secondColumn" ])
70- df = df .withColumn ("min_value" , F .array_min (F .col ("firstColumn" )))
70+ df = df .withColumn ("min_value" , sf .array_min (sf .col ("firstColumn" )))
7171 res = df .select ("min_value" ).collect ()
7272 assert res == [
7373 Row (max_value = 1 ),
@@ -77,58 +77,58 @@ def test_array_min(self, spark):
7777 def test_get (self , spark ):
7878 df = spark .createDataFrame ([(["a" , "b" , "c" ], 1 )], ['data' , 'index' ])
7979
80- res = df .select (F .get (df .data , 1 ).alias ("r" )).collect ()
80+ res = df .select (sf .get (df .data , 1 ).alias ("r" )).collect ()
8181 assert res == [Row (r = "b" )]
8282
83- res = df .select (F .get (df .data , - 1 ).alias ("r" )).collect ()
83+ res = df .select (sf .get (df .data , - 1 ).alias ("r" )).collect ()
8484 assert res == [Row (r = None )]
8585
86- res = df .select (F .get (df .data , 3 ).alias ("r" )).collect ()
86+ res = df .select (sf .get (df .data , 3 ).alias ("r" )).collect ()
8787 assert res == [Row (r = None )]
8888
89- res = df .select (F .get (df .data , "index" ).alias ("r" )).collect ()
89+ res = df .select (sf .get (df .data , "index" ).alias ("r" )).collect ()
9090 assert res == [Row (r = 'b' )]
9191
92- res = df .select (F .get (df .data , F .col ("index" ) - 1 ).alias ("r" )).collect ()
92+ res = df .select (sf .get (df .data , sf .col ("index" ) - 1 ).alias ("r" )).collect ()
9393 assert res == [Row (r = 'a' )]
9494
9595 def test_flatten (self , spark ):
9696 df = spark .createDataFrame ([([[1 , 2 , 3 ], [4 , 5 ], [6 ]],), ([None , [4 , 5 ]],)], ['data' ])
9797
98- res = df .select (F .flatten (df .data ).alias ("r" )).collect ()
98+ res = df .select (sf .flatten (df .data ).alias ("r" )).collect ()
9999 assert res == [Row (r = [1 , 2 , 3 , 4 , 5 , 6 ]), Row (r = None )]
100100
101101 def test_array_compact (self , spark ):
102102 df = spark .createDataFrame ([([1 , None , 2 , 3 ],), ([4 , 5 , None , 4 ],)], ['data' ])
103103
104- res = df .select (F .array_compact (df .data ).alias ("v" )).collect ()
104+ res = df .select (sf .array_compact (df .data ).alias ("v" )).collect ()
105105 assert [Row (v = [1 , 2 , 3 ]), Row (v = [4 , 5 , 4 ])]
106106
107107 def test_array_remove (self , spark ):
108108 df = spark .createDataFrame ([([1 , 2 , 3 , 1 , 1 ],), ([],)], ['data' ])
109109
110- res = df .select (F .array_remove (df .data , 1 ).alias ("v" )).collect ()
110+ res = df .select (sf .array_remove (df .data , 1 ).alias ("v" )).collect ()
111111 assert res == [Row (v = [2 , 3 ]), Row (v = [])]
112112
113113 def test_array_agg (self , spark ):
114114 df = spark .createDataFrame ([[1 , "A" ], [1 , "A" ], [2 , "A" ]], ["c" , "group" ])
115115
116- res = df .groupBy ("group" ).agg (F .array_agg ("c" ).alias ("r" )).collect ()
116+ res = df .groupBy ("group" ).agg (sf .array_agg ("c" ).alias ("r" )).collect ()
117117 assert res [0 ] == Row (group = "A" , r = [1 , 1 , 2 ])
118118
119119 def test_collect_list (self , spark ):
120120 df = spark .createDataFrame ([[1 , "A" ], [1 , "A" ], [2 , "A" ]], ["c" , "group" ])
121121
122- res = df .groupBy ("group" ).agg (F .collect_list ("c" ).alias ("r" )).collect ()
122+ res = df .groupBy ("group" ).agg (sf .collect_list ("c" ).alias ("r" )).collect ()
123123 assert res [0 ] == Row (group = "A" , r = [1 , 1 , 2 ])
124124
125125 def test_array_append (self , spark ):
126126 df = spark .createDataFrame ([Row (c1 = ["b" , "a" , "c" ], c2 = "c" )], ["c1" , "c2" ])
127127
128- res = df .select (F .array_append (df .c1 , df .c2 ).alias ("r" )).collect ()
128+ res = df .select (sf .array_append (df .c1 , df .c2 ).alias ("r" )).collect ()
129129 assert res == [Row (r = ['b' , 'a' , 'c' , 'c' ])]
130130
131- res = df .select (F .array_append (df .c1 , 'x' )).collect ()
131+ res = df .select (sf .array_append (df .c1 , 'x' )).collect ()
132132 assert res == [Row (r = ['b' , 'a' , 'c' , 'x' ])]
133133
134134 def test_array_insert (self , spark ):
@@ -137,21 +137,21 @@ def test_array_insert(self, spark):
137137 ['data' , 'pos' , 'val' ],
138138 )
139139
140- res = df .select (F .array_insert (df .data , df .pos .cast ('integer' ), df .val ).alias ('data' )).collect ()
140+ res = df .select (sf .array_insert (df .data , df .pos .cast ('integer' ), df .val ).alias ('data' )).collect ()
141141 assert res == [
142142 Row (data = ['a' , 'd' , 'b' , 'c' ]),
143143 Row (data = ['a' , 'd' , 'b' , 'c' , 'e' ]),
144144 Row (data = ['c' , 'b' , 'd' , 'a' ]),
145145 ]
146146
147- res = df .select (F .array_insert (df .data , 5 , 'hello' ).alias ('data' )).collect ()
147+ res = df .select (sf .array_insert (df .data , 5 , 'hello' ).alias ('data' )).collect ()
148148 assert res == [
149149 Row (data = ['a' , 'b' , 'c' , None , 'hello' ]),
150150 Row (data = ['a' , 'b' , 'c' , 'e' , 'hello' ]),
151151 Row (data = ['c' , 'b' , 'a' , None , 'hello' ]),
152152 ]
153153
154- res = df .select (F .array_insert (df .data , - 5 , 'hello' ).alias ('data' )).collect ()
154+ res = df .select (sf .array_insert (df .data , - 5 , 'hello' ).alias ('data' )).collect ()
155155 assert res == [
156156 Row (data = ['hello' , None , 'a' , 'b' , 'c' ]),
157157 Row (data = ['hello' , 'a' , 'b' , 'c' , 'e' ]),
@@ -160,67 +160,67 @@ def test_array_insert(self, spark):
160160
161161 def test_slice (self , spark ):
162162 df = spark .createDataFrame ([([1 , 2 , 3 ],), ([4 , 5 ],)], ['x' ])
163- res = df .select (F .slice (df .x , 2 , 2 ).alias ("sliced" )).collect ()
163+ res = df .select (sf .slice (df .x , 2 , 2 ).alias ("sliced" )).collect ()
164164 assert res == [Row (sliced = [2 , 3 ]), Row (sliced = [5 ])]
165165
166166 def test_sort_array (self , spark ):
167167 df = spark .createDataFrame ([([2 , 1 , None , 3 ],), ([1 ],), ([],)], ['data' ])
168168
169- res = df .select (F .sort_array (df .data ).alias ('r' )).collect ()
169+ res = df .select (sf .sort_array (df .data ).alias ('r' )).collect ()
170170 assert res == [Row (r = [None , 1 , 2 , 3 ]), Row (r = [1 ]), Row (r = [])]
171171
172- res = df .select (F .sort_array (df .data , asc = False ).alias ('r' )).collect ()
172+ res = df .select (sf .sort_array (df .data , asc = False ).alias ('r' )).collect ()
173173 assert res == [Row (r = [3 , 2 , 1 , None ]), Row (r = [1 ]), Row (r = [])]
174174
175175 @pytest .mark .parametrize (("null_replacement" , "expected_joined_2" ), [(None , "a" ), ("replaced" , "a,replaced" )])
176176 def test_array_join (self , spark , null_replacement , expected_joined_2 ):
177177 df = spark .createDataFrame ([(["a" , "b" , "c" ],), (["a" , None ],)], ['data' ])
178178
179- res = df .select (F .array_join (df .data , "," , null_replacement = null_replacement ).alias ("joined" )).collect ()
179+ res = df .select (sf .array_join (df .data , "," , null_replacement = null_replacement ).alias ("joined" )).collect ()
180180 assert res == [Row (joined = 'a,b,c' ), Row (joined = expected_joined_2 )]
181181
182182 def test_array_position (self , spark ):
183183 df = spark .createDataFrame ([(["c" , "b" , "a" ],), ([],)], ['data' ])
184184
185- res = df .select (F .array_position (df .data , "a" ).alias ("pos" )).collect ()
185+ res = df .select (sf .array_position (df .data , "a" ).alias ("pos" )).collect ()
186186 assert res == [Row (pos = 3 ), Row (pos = 0 )]
187187
188188 def test_array_preprend (self , spark ):
189189 df = spark .createDataFrame ([([2 , 3 , 4 ],), ([],)], ['data' ])
190190
191- res = df .select (F .array_prepend (df .data , 1 ).alias ("pre" )).collect ()
191+ res = df .select (sf .array_prepend (df .data , 1 ).alias ("pre" )).collect ()
192192 assert res == [Row (pre = [1 , 2 , 3 , 4 ]), Row (pre = [1 ])]
193193
194194 def test_array_repeat (self , spark ):
195195 df = spark .createDataFrame ([('ab' ,)], ['data' ])
196196
197- res = df .select (F .array_repeat (df .data , 3 ).alias ('r' )).collect ()
197+ res = df .select (sf .array_repeat (df .data , 3 ).alias ('r' )).collect ()
198198 assert res == [Row (r = ['ab' , 'ab' , 'ab' ])]
199199
200200 def test_array_size (self , spark ):
201201 df = spark .createDataFrame ([([2 , 1 , 3 ],), (None ,)], ['data' ])
202202
203- res = df .select (F .array_size (df .data ).alias ('r' )).collect ()
203+ res = df .select (sf .array_size (df .data ).alias ('r' )).collect ()
204204 assert res == [Row (r = 3 ), Row (r = None )]
205205
206206 def test_array_sort (self , spark ):
207207 df = spark .createDataFrame ([([2 , 1 , None , 3 ],), ([1 ],), ([],)], ['data' ])
208208
209- res = df .select (F .array_sort (df .data ).alias ('r' )).collect ()
209+ res = df .select (sf .array_sort (df .data ).alias ('r' )).collect ()
210210 assert res == [Row (r = [1 , 2 , 3 , None ]), Row (r = [1 ]), Row (r = [])]
211211
212212 def test_arrays_overlap (self , spark ):
213213 df = spark .createDataFrame (
214214 [(["a" , "b" ], ["b" , "c" ]), (["a" ], ["b" , "c" ]), ([None , "c" ], ["a" ]), ([None , "c" ], [None ])], ['x' , 'y' ]
215215 )
216216
217- res = df .select (F .arrays_overlap (df .x , df .y ).alias ("overlap" )).collect ()
217+ res = df .select (sf .arrays_overlap (df .x , df .y ).alias ("overlap" )).collect ()
218218 assert res == [Row (overlap = True ), Row (overlap = False ), Row (overlap = None ), Row (overlap = None )]
219219
220220 def test_arrays_zip (self , spark ):
221221 df = spark .createDataFrame ([([1 , 2 , 3 ], [2 , 4 , 6 ], [3 , 6 ])], ['vals1' , 'vals2' , 'vals3' ])
222222
223- res = df .select (F .arrays_zip (df .vals1 , df .vals2 , df .vals3 ).alias ('zipped' )).collect ()
223+ res = df .select (sf .arrays_zip (df .vals1 , df .vals2 , df .vals3 ).alias ('zipped' )).collect ()
224224 # FIXME: The structure of the results should be the same
225225 if USE_ACTUAL_SPARK :
226226 assert res == [
0 commit comments