Skip to content

Commit 037433b

Browse files
committed
Improving NaN handling during Athena read.
1 parent 95f2b36 commit 037433b

File tree

3 files changed

+29
-4
lines changed

3 files changed

+29
-4
lines changed

awswrangler/glue.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ def csv_table_definition(table: str,
311311
refined_schema = [(name, dtype) if dtype in dtypes_allowed else (name, "string") for name, dtype in schema]
312312
else:
313313
raise InvalidSerDe(f"{serde} in not in the valid SerDe list.")
314-
if "columns" in extra_args:
314+
if "columns" in extra_args and extra_args["columns"] is not None:
315315
refined_schema = [(name, dtype) for name, dtype in refined_schema
316316
if name in extra_args["columns"]] # type: ignore
317317
return {

awswrangler/pandas.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,7 @@ def _read_sql_athena_ctas(self,
665665
workgroup=workgroup,
666666
encryption=encryption,
667667
kms_key=kms_key)
668+
logger.debug(f"query_id: {query_id}")
668669
self._session.athena.wait_query(query_execution_id=query_id)
669670
self._session.glue.delete_table_if_exists(database=database, table=name)
670671
manifest_path: str = f"{s3_output}/tables/{query_id}-manifest.csv"
@@ -709,7 +710,9 @@ def _read_sql_athena_regular(self,
709710
parse_dates=parse_timestamps,
710711
converters=converters,
711712
quoting=csv.QUOTE_ALL,
712-
max_result_size=max_result_size)
713+
max_result_size=max_result_size,
714+
keep_default_na=False,
715+
na_values=[""])
713716
logger.debug("Start type casting...")
714717
if max_result_size is None:
715718
if len(ret.index) > 0:

testing/test_awswrangler/test_pandas.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2207,7 +2207,6 @@ def test_to_csv_columns(bucket, database):
22072207
path = f"s3://{bucket}/test_to_csv_columns"
22082208
wr.s3.delete_objects(path=path)
22092209
df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6], "C": ["foo", "boo", "bar"]})
2210-
wr.s3.delete_objects(path=path)
22112210
wr.pandas.to_csv(dataframe=df,
22122211
database=database,
22132212
path=path,
@@ -2319,7 +2318,6 @@ def test_aurora_mysql_unload_null(bucket, mysql_parameters):
23192318
"c_int": [1, 2, None, 3, 4],
23202319
})
23212320
df["c_int"] = df["c_int"].astype("Int64")
2322-
print(df)
23232321
conn = Aurora.generate_connection(database="mysql",
23242322
host=mysql_parameters["MysqlAddress"],
23252323
port=3306,
@@ -2341,3 +2339,27 @@ def test_aurora_mysql_unload_null(bucket, mysql_parameters):
23412339
df2["c_int"] = df2["c_int"].astype("Int64")
23422340
assert df.equals(df2)
23432341
conn.close()
2342+
2343+
2344+
def test_s3_overall_nan(bucket, database):
2345+
path = f"s3://{bucket}/test_s3_overall_nan"
2346+
wr.s3.delete_objects(path=path)
2347+
df = pd.DataFrame({
2348+
"id": [1, 2, 3, 4, 5],
2349+
"c_str": ["foo", "", None, "bar", None],
2350+
"c_float": [1.1, None, 3.3, None, 5.5],
2351+
"c_int": [1, 2, None, 3, 4],
2352+
})
2353+
df["c_int"] = df["c_int"].astype("Int64")
2354+
print(df)
2355+
wr.pandas.to_parquet(dataframe=df,
2356+
database=database,
2357+
path=path,
2358+
mode="overwrite",
2359+
preserve_index=False,
2360+
procs_cpu_bound=1,
2361+
inplace=False)
2362+
sleep(15)
2363+
df2 = wr.pandas.read_sql_athena(database=database, sql="SELECT * FROM test_s3_overall_nan ORDER BY id", ctas_approach=True)
2364+
wr.s3.delete_objects(path=path)
2365+
assert df.equals(df2)

0 commit comments

Comments
 (0)