@@ -234,10 +234,11 @@ def test_to_s3(
234234):
235235 dataframe = pd .read_csv ("data_samples/micro.csv" )
236236 func = session .pandas .to_csv if file_format == "csv" else session .pandas .to_parquet
237+ path = f"s3://{ bucket } /test/"
237238 objects_paths = func (
238239 dataframe = dataframe ,
239240 database = database ,
240- path = f"s3:// { bucket } /test/" ,
241+ path = path ,
241242 preserve_index = preserve_index ,
242243 mode = mode ,
243244 partition_cols = partition_cols ,
@@ -264,9 +265,10 @@ def test_to_parquet_with_cast_int(
264265 database ,
265266):
266267 dataframe = pd .read_csv ("data_samples/nano.csv" , dtype = {"id" : "Int64" }, parse_dates = ["date" , "time" ])
268+ path = f"s3://{ bucket } /test/"
267269 session .pandas .to_parquet (dataframe = dataframe ,
268270 database = database ,
269- path = f"s3:// { bucket } /test/" ,
271+ path = path ,
270272 preserve_index = False ,
271273 mode = "overwrite" ,
272274 procs_cpu_bound = 1 ,
@@ -277,6 +279,7 @@ def test_to_parquet_with_cast_int(
277279 dataframe2 = session .pandas .read_sql_athena (sql = "select * from test" , database = database )
278280 if len (dataframe .index ) == len (dataframe2 .index ):
279281 break
282+ session .s3 .delete_objects (path = path )
280283 assert len (dataframe .index ) == len (dataframe2 .index )
281284 assert len (list (dataframe .columns )) == len (list (dataframe2 .columns ))
282285 assert dataframe [dataframe ["id" ] == 0 ].iloc [0 ]["name" ] == dataframe2 [dataframe2 ["id" ] == 0 ].iloc [0 ]["name" ]
@@ -385,9 +388,10 @@ def test_etl_complex(session, bucket, database, max_result_size):
385388 dataframe = pd .read_csv ("data_samples/complex.csv" ,
386389 dtype = {"my_int_with_null" : "Int64" },
387390 parse_dates = ["my_timestamp" , "my_date" ])
391+ path = f"s3://{ bucket } /test/"
388392 session .pandas .to_parquet (dataframe = dataframe ,
389393 database = database ,
390- path = f"s3:// { bucket } /test/" ,
394+ path = path ,
391395 preserve_index = False ,
392396 mode = "overwrite" ,
393397 procs_cpu_bound = 1 )
@@ -412,6 +416,7 @@ def test_etl_complex(session, bucket, database, max_result_size):
412416 assert str (
413417 row .my_string
414418 ) == "foo\n boo\n bar\n FOO\n BOO\n BAR\n xxxxx\n ÁÃÀÂÇ\n 汉字汉字汉字汉字汉字汉字汉字æøåæøåæøåæøåæøåæøåæøåæøåæøåæøå汉字汉字汉字汉字汉字汉字汉字æøåæøåæøåæøåæøåæøåæøåæøåæøåæøå"
419+ session .s3 .delete_objects (path = path )
415420 assert count == len (dataframe .index )
416421
417422
@@ -423,9 +428,10 @@ def test_to_parquet_with_kms(
423428 extra_args = {"ServerSideEncryption" : "aws:kms" , "SSEKMSKeyId" : kms_key }
424429 session_inner = Session (s3_additional_kwargs = extra_args )
425430 dataframe = pd .read_csv ("data_samples/nano.csv" )
431+ path = f"s3://{ bucket } /test/"
426432 session_inner .pandas .to_parquet (dataframe = dataframe ,
427433 database = database ,
428- path = f"s3:// { bucket } /test/" ,
434+ path = path ,
429435 preserve_index = False ,
430436 mode = "overwrite" ,
431437 procs_cpu_bound = 1 )
@@ -435,6 +441,7 @@ def test_to_parquet_with_kms(
435441 dataframe2 = session_inner .pandas .read_sql_athena (sql = "select * from test" , database = database )
436442 if len (dataframe .index ) == len (dataframe2 .index ):
437443 break
444+ session_inner .s3 .delete_objects (path = path )
438445 assert len (dataframe .index ) == len (dataframe2 .index )
439446 assert len (list (dataframe .columns )) == len (list (dataframe2 .columns ))
440447 assert dataframe [dataframe ["id" ] == 0 ].iloc [0 ]["name" ] == dataframe2 [dataframe2 ["id" ] == 0 ].iloc [0 ]["name" ]
@@ -1196,3 +1203,49 @@ def test_nan_cast(session, bucket, database, partition_cols):
11961203 assert df2 .dtypes [4 ] == "Int64"
11971204 assert df2 .dtypes [5 ] == "object"
11981205 session .s3 .delete_objects (path = path )
1206+
1207+
1208+ def test_to_parquet_date_null (session , bucket , database ):
1209+ df = pd .DataFrame ({
1210+ "col1" : ["val1" , "val2" ],
1211+ "datecol" : [date (2019 , 11 , 9 ), None ],
1212+ })
1213+ path = f"s3://{ bucket } /test/"
1214+ session .pandas .to_parquet (dataframe = df ,
1215+ database = database ,
1216+ table = "test" ,
1217+ path = path ,
1218+ mode = "overwrite" ,
1219+ preserve_index = False ,
1220+ procs_cpu_bound = 1 )
1221+ df2 = None
1222+ for counter in range (10 ): # Retrying to workaround s3 eventual consistency
1223+ sleep (1 )
1224+ df2 = session .pandas .read_sql_athena (sql = "select * from test" , database = database )
1225+ if len (df .index ) == len (df2 .index ):
1226+ break
1227+ path = f"s3://{ bucket } /test2/"
1228+ session .pandas .to_parquet (dataframe = df2 ,
1229+ database = database ,
1230+ table = "test2" ,
1231+ path = path ,
1232+ mode = "overwrite" ,
1233+ preserve_index = False ,
1234+ procs_cpu_bound = 1 )
1235+ df3 = None
1236+ for counter in range (10 ): # Retrying to workaround s3 eventual consistency
1237+ sleep (1 )
1238+ df3 = session .pandas .read_sql_athena (sql = "select * from test2" , database = database )
1239+ if len (df2 .index ) == len (df3 .index ):
1240+ break
1241+
1242+ session .s3 .delete_objects (path = path )
1243+
1244+ assert len (list (df .columns )) == len (list (df2 .columns )) == len (list (df3 .columns ))
1245+ assert len (df .index ) == len (df2 .index ) == len (df3 .index )
1246+
1247+ assert df [df .col1 == "val1" ].iloc [0 ].datecol == df2 [df2 .col1 == "val1" ].iloc [0 ].datecol
1248+ assert df2 [df2 .col1 == "val1" ].iloc [0 ].datecol == df3 [df3 .col1 == "val1" ].iloc [0 ].datecol == date (2019 , 11 , 9 )
1249+
1250+ assert df [df .col1 == "val2" ].iloc [0 ].datecol == df2 [df2 .col1 == "val2" ].iloc [0 ].datecol
1251+ assert df2 [df2 .col1 == "val2" ].iloc [0 ].datecol == df3 [df3 .col1 == "val2" ].iloc [0 ].datecol is None
0 commit comments