Skip to content

Commit 69c7378

Browse files
committed
Add skip_header_line_count arg to create_csv_table(). #338
1 parent e578a24 commit 69c7378

File tree

3 files changed

+43
-9
lines changed

3 files changed

+43
-9
lines changed

awswrangler/catalog.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1087,6 +1087,7 @@ def create_csv_table(
10871087
mode: str = "overwrite",
10881088
catalog_versioning: bool = False,
10891089
sep: str = ",",
1090+
skip_header_line_count: Optional[int] = None,
10901091
boto3_session: Optional[boto3.Session] = None,
10911092
projection_enabled: bool = False,
10921093
projection_types: Optional[Dict[str, str]] = None,
@@ -1125,6 +1126,8 @@ def create_csv_table(
11251126
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
11261127
sep : str
11271128
String of length 1. Field delimiter for the output file.
1129+
skip_header_line_count : Optional[int]
1130+
Number of Lines to skip regarding to the header.
11281131
projection_enabled : bool
11291132
Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html)
11301133
projection_types : Optional[Dict[str, str]]
@@ -1181,6 +1184,7 @@ def create_csv_table(
11811184
partitions_types=partitions_types,
11821185
compression=compression,
11831186
sep=sep,
1187+
skip_header_line_count=skip_header_line_count,
11841188
)
11851189
session: boto3.Session = _utils.ensure_session(session=boto3_session)
11861190
_create_table(
@@ -1338,20 +1342,24 @@ def _csv_table_definition(
13381342
partitions_types: Dict[str, str],
13391343
compression: Optional[str],
13401344
sep: str,
1345+
skip_header_line_count: Optional[int]
13411346
) -> Dict[str, Any]:
13421347
compressed: bool = compression is not None
1348+
parameters: Dict[str, str] = {
1349+
"classification": "csv",
1350+
"compressionType": str(compression).lower(),
1351+
"typeOfData": "file",
1352+
"delimiter": sep,
1353+
"columnsOrdered": "true",
1354+
"areColumnsQuoted": "false",
1355+
}
1356+
if skip_header_line_count is not None:
1357+
parameters["skip.header.line.count"] = "1"
13431358
return {
13441359
"Name": table,
13451360
"PartitionKeys": [{"Name": cname, "Type": dtype} for cname, dtype in partitions_types.items()],
13461361
"TableType": "EXTERNAL_TABLE",
1347-
"Parameters": {
1348-
"classification": "csv",
1349-
"compressionType": str(compression).lower(),
1350-
"typeOfData": "file",
1351-
"delimiter": sep,
1352-
"columnsOrdered": "true",
1353-
"areColumnsQuoted": "false",
1354-
},
1362+
"Parameters": parameters,
13551363
"StorageDescriptor": {
13561364
"Columns": [{"Name": cname, "Type": dtype} for cname, dtype in columns_types.items()],
13571365
"Location": path,

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
packages=["awswrangler", "awswrangler.s3", "awswrangler.quicksight", "awswrangler.athena"],
2424
include_package_data=True,
2525
python_requires=">=3.6, <3.9",
26-
install_requires=[open("requirements.txt").read().strip().split("\n")],
26+
install_requires=open("requirements.txt").read().strip().split("\n"),
2727
classifiers=[
2828
'Programming Language :: Python :: 3.6',
2929
'Programming Language :: Python :: 3.7',

tests/test_athena_csv.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,3 +319,29 @@ def test_athena_csv_types(path, glue_database, glue_table):
319319
ensure_data_types_csv(df2)
320320
wr.s3.delete_objects(path=paths)
321321
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True
322+
323+
324+
@pytest.mark.parametrize("use_threads", [True, False])
325+
@pytest.mark.parametrize("ctas_approach", [True, False])
326+
def test_skip_header(path, glue_database, glue_table, use_threads, ctas_approach):
327+
df = pd.DataFrame({"c0": [1, 2], "c1": [3.3, 4.4], "c2": ["foo", "boo"]})
328+
df["c0"] = df["c0"].astype("Int64")
329+
df["c2"] = df["c2"].astype("string")
330+
paths = wr.s3.to_csv(
331+
df=df,
332+
path=f"{path}0.csv",
333+
sep=",",
334+
index=False,
335+
header=True,
336+
use_threads=use_threads
337+
)["paths"]
338+
wr.s3.wait_objects_exist(paths=paths, use_threads=use_threads)
339+
wr.catalog.create_csv_table(
340+
database=glue_database,
341+
table=glue_table,
342+
path=path,
343+
columns_types={"c0": "bigint", "c1": "double", "c2": "string"},
344+
skip_header_line_count=1
345+
)
346+
df2 = wr.athena.read_sql_table(glue_table, glue_database, use_threads=use_threads, ctas_approach=ctas_approach)
347+
assert df.equals(df2)

0 commit comments

Comments
 (0)