@@ -41,6 +41,37 @@ def test_parquet_cast_string_dataset(path, partition_cols):
4141 assert str (df [col ].iloc [row ]) == str (df2 [col ].iloc [row ])
4242
4343
44+ @pytest .mark .parametrize ("use_threads" , [True , False ])
45+ def test_read_parquet_filter_partitions (path , use_threads ):
46+ df = pd .DataFrame ({"c0" : [0 , 1 , 2 ], "c1" : [0 , 1 , 2 ], "c2" : [0 , 0 , 1 ]})
47+ paths = wr .s3 .to_parquet (df , path , dataset = True , partition_cols = ["c1" , "c2" ], use_threads = use_threads )["paths" ]
48+ wr .s3 .wait_objects_exist (paths = paths , use_threads = use_threads )
49+ df2 = wr .s3 .read_parquet (
50+ path , dataset = True , partition_filter = lambda x : True if x ["c1" ] == "0" else False , use_threads = use_threads
51+ )
52+ assert df2 .shape == (1 , 3 )
53+ assert df2 .c0 .iloc [0 ] == 0
54+ assert df2 .c1 .astype (int ).iloc [0 ] == 0
55+ assert df2 .c2 .astype (int ).iloc [0 ] == 0
56+ df2 = wr .s3 .read_parquet (
57+ path ,
58+ dataset = True ,
59+ partition_filter = lambda x : True if x ["c1" ] == "1" and x ["c2" ] == "0" else False ,
60+ use_threads = use_threads ,
61+ )
62+ assert df2 .shape == (1 , 3 )
63+ assert df2 .c0 .iloc [0 ] == 1
64+ assert df2 .c1 .astype (int ).iloc [0 ] == 1
65+ assert df2 .c2 .astype (int ).iloc [0 ] == 0
66+ df2 = wr .s3 .read_parquet (
67+ path , dataset = True , partition_filter = lambda x : True if x ["c2" ] == "0" else False , use_threads = use_threads
68+ )
69+ assert df2 .shape == (2 , 3 )
70+ assert df2 .c0 .astype (int ).sum () == 1
71+ assert df2 .c1 .astype (int ).sum () == 1
72+ assert df2 .c2 .astype (int ).sum () == 0
73+
74+
4475def test_parquet (path ):
4576 df_file = pd .DataFrame ({"id" : [1 , 2 , 3 ]})
4677 path_file = f"{ path } test_parquet_file.parquet"
@@ -173,37 +204,6 @@ def test_to_parquet_file_dtype(path, use_threads):
173204 assert str (df2 .c1 .dtype ) == "string"
174205
175206
176- @pytest .mark .parametrize ("use_threads" , [True , False ])
177- def test_read_parquet_filter_partitions (path , use_threads ):
178- df = pd .DataFrame ({"c0" : [0 , 1 , 2 ], "c1" : [0 , 1 , 2 ], "c2" : [0 , 0 , 1 ]})
179- paths = wr .s3 .to_parquet (df , path , dataset = True , partition_cols = ["c1" , "c2" ], use_threads = use_threads )["paths" ]
180- wr .s3 .wait_objects_exist (paths = paths , use_threads = use_threads )
181- df2 = wr .s3 .read_parquet (
182- path , dataset = True , partition_filter = lambda x : True if x ["c1" ] == "0" else False , use_threads = use_threads
183- )
184- assert df2 .shape == (1 , 3 )
185- assert df2 .c0 .iloc [0 ] == 0
186- assert df2 .c1 .astype (int ).iloc [0 ] == 0
187- assert df2 .c2 .astype (int ).iloc [0 ] == 0
188- df2 = wr .s3 .read_parquet (
189- path ,
190- dataset = True ,
191- partition_filter = lambda x : True if x ["c1" ] == "1" and x ["c2" ] == "0" else False ,
192- use_threads = use_threads ,
193- )
194- assert df2 .shape == (1 , 3 )
195- assert df2 .c0 .iloc [0 ] == 1
196- assert df2 .c1 .astype (int ).iloc [0 ] == 1
197- assert df2 .c2 .astype (int ).iloc [0 ] == 0
198- df2 = wr .s3 .read_parquet (
199- path , dataset = True , partition_filter = lambda x : True if x ["c2" ] == "0" else False , use_threads = use_threads
200- )
201- assert df2 .shape == (2 , 3 )
202- assert df2 .c0 .astype (int ).sum () == 1
203- assert df2 .c1 .astype (int ).sum () == 1
204- assert df2 .c2 .astype (int ).sum () == 0
205-
206-
207207@pytest .mark .parametrize ("use_threads" , [True , False ])
208208@pytest .mark .parametrize ("max_rows_by_file" , [None , 0 , 40 , 250 , 1000 ])
209209def test_parquet_with_size (path , use_threads , max_rows_by_file ):
0 commit comments