Skip to content

Commit b34f648

Browse files
authored
Merge pull request #209 from awslabs/parquet-chunked
Add chunked=INTEGER option to ensure batch number of rows
2 parents 5821c9a + aeb8792 commit b34f648

File tree

4 files changed

+201
-41
lines changed

4 files changed

+201
-41
lines changed

awswrangler/athena.py

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
329329
database: str,
330330
ctas_approach: bool = True,
331331
categories: List[str] = None,
332-
chunksize: Optional[int] = None,
332+
chunksize: Optional[Union[int, bool]] = None,
333333
s3_output: Optional[str] = None,
334334
workgroup: Optional[str] = None,
335335
encryption: Optional[str] = None,
@@ -353,10 +353,6 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
353353
CONS: Slower (But stills faster than other libraries that uses the regular Athena API)
354354
and does not handle nested types at all.
355355
356-
Note
357-
----
358-
If `chunksize` is passed, then a Generator of DataFrames is returned.
359-
360356
Note
361357
----
362358
If `ctas_approach` is True, `chunksize` will return non deterministic chunks sizes,
@@ -367,6 +363,21 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
367363
Create the default Athena bucket if it doesn't exist and s3_output is None.
368364
(E.g. s3://aws-athena-query-results-ACCOUNT-REGION/)
369365
366+
Note
367+
----
368+
``Batching`` (`chunksize` argument) (Memory Friendly):
369+
370+
Will anable the function to return a Iterable of DataFrames instead of a regular DataFrame.
371+
372+
There are two batching strategies on Wrangler:
373+
374+
- If **chunksize=True**, a new DataFrame will be returned for each file in the query result.
375+
376+
- If **chunked=INTEGER**, Wrangler will iterate on the data by number of rows igual the received INTEGER.
377+
378+
`P.S.` `chunksize=True` if faster and uses less memory while `chunksize=INTEGER` is more precise
379+
in number of rows for each Dataframe.
380+
370381
Note
371382
----
372383
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
@@ -383,8 +394,10 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
383394
categories: List[str], optional
384395
List of columns names that should be returned as pandas.Categorical.
385396
Recommended for memory restricted environments.
386-
chunksize: int, optional
387-
If specified, return an generator where chunksize is the number of rows to include in each chunk.
397+
chunksize : Union[int, bool], optional
398+
If passed will split the data in a Iterable of DataFrames (Memory friendly).
399+
If `True` wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize.
400+
If an `INTEGER` is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.
388401
s3_output : str, optional
389402
AWS S3 path.
390403
workgroup : str, optional
@@ -454,7 +467,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
454467
catalog.delete_table_if_exists(database=database, table=name, boto3_session=session)
455468
manifest_path: str = f"{_s3_output}/tables/{query_id}-manifest.csv"
456469
paths: List[str] = _extract_ctas_manifest_paths(path=manifest_path, boto3_session=session)
457-
chunked: bool = chunksize is not None
470+
chunked: Union[bool, int] = False if chunksize is None else chunksize
458471
_logger.debug(f"chunked: {chunked}")
459472
if not paths:
460473
if chunked is False:
@@ -473,6 +486,8 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
473486
path = f"{_s3_output}/{query_id}.csv"
474487
s3.wait_objects_exist(paths=[path], use_threads=False, boto3_session=session)
475488
_logger.debug(f"Start CSV reading from {path}")
489+
_chunksize: Optional[int] = chunksize if isinstance(chunksize, int) else None
490+
_logger.debug(f"_chunksize: {_chunksize}")
476491
ret = s3.read_csv(
477492
path=[path],
478493
dtype=dtype,
@@ -481,7 +496,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals
481496
quoting=csv.QUOTE_ALL,
482497
keep_default_na=False,
483498
na_values=[""],
484-
chunksize=chunksize,
499+
chunksize=_chunksize,
485500
skip_blank_lines=False,
486501
use_threads=False,
487502
boto3_session=session,
@@ -565,7 +580,7 @@ def read_sql_table(
565580
database: str,
566581
ctas_approach: bool = True,
567582
categories: List[str] = None,
568-
chunksize: Optional[int] = None,
583+
chunksize: Optional[Union[int, bool]] = None,
569584
s3_output: Optional[str] = None,
570585
workgroup: Optional[str] = None,
571586
encryption: Optional[str] = None,
@@ -589,10 +604,6 @@ def read_sql_table(
589604
CONS: Slower (But stills faster than other libraries that uses the regular Athena API)
590605
and does not handle nested types at all
591606
592-
Note
593-
----
594-
If `chunksize` is passed, then a Generator of DataFrames is returned.
595-
596607
Note
597608
----
598609
If `ctas_approach` is True, `chunksize` will return non deterministic chunks sizes,
@@ -603,6 +614,21 @@ def read_sql_table(
603614
Create the default Athena bucket if it doesn't exist and s3_output is None.
604615
(E.g. s3://aws-athena-query-results-ACCOUNT-REGION/)
605616
617+
Note
618+
----
619+
``Batching`` (`chunksize` argument) (Memory Friendly):
620+
621+
Will anable the function to return a Iterable of DataFrames instead of a regular DataFrame.
622+
623+
There are two batching strategies on Wrangler:
624+
625+
- If **chunksize=True**, a new DataFrame will be returned for each file in the query result.
626+
627+
- If **chunked=INTEGER**, Wrangler will iterate on the data by number of rows igual the received INTEGER.
628+
629+
`P.S.` `chunksize=True` if faster and uses less memory while `chunksize=INTEGER` is more precise
630+
in number of rows for each Dataframe.
631+
606632
Note
607633
----
608634
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
@@ -619,8 +645,10 @@ def read_sql_table(
619645
categories: List[str], optional
620646
List of columns names that should be returned as pandas.Categorical.
621647
Recommended for memory restricted environments.
622-
chunksize: int, optional
623-
If specified, return an generator where chunksize is the number of rows to include in each chunk.
648+
chunksize : Union[int, bool], optional
649+
If passed will split the data in a Iterable of DataFrames (Memory friendly).
650+
If `True` wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize.
651+
If an `INTEGER` is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.
624652
s3_output : str, optional
625653
AWS S3 path.
626654
workgroup : str, optional
@@ -646,6 +674,7 @@ def read_sql_table(
646674
>>> df = wr.athena.read_sql_table(table='...', database='...')
647675
648676
"""
677+
table = catalog.sanitize_table_name(table=table)
649678
return read_sql_query(
650679
sql=f'SELECT * FROM "{table}"',
651680
database=database,

awswrangler/db.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,7 @@ def unload_redshift(
888888
con: sqlalchemy.engine.Engine,
889889
iam_role: str,
890890
categories: List[str] = None,
891-
chunked: bool = False,
891+
chunked: Union[bool, int] = False,
892892
keep_files: bool = False,
893893
use_threads: bool = True,
894894
boto3_session: Optional[boto3.Session] = None,
@@ -906,6 +906,22 @@ def unload_redshift(
906906
907907
https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html
908908
909+
Note
910+
----
911+
``Batching`` (`chunked` argument) (Memory Friendly):
912+
913+
Will anable the function to return a Iterable of DataFrames instead of a regular DataFrame.
914+
915+
There are two batching strategies on Wrangler:
916+
917+
- If **chunked=True**, a new DataFrame will be returned for each file in your path/dataset.
918+
919+
- If **chunked=INTEGER**, Wrangler will iterate on the data by number of rows igual the received INTEGER.
920+
921+
`P.S.` `chunked=True` if faster and uses less memory while `chunked=INTEGER` is more precise
922+
in number of rows for each Dataframe.
923+
924+
909925
Note
910926
----
911927
In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count().
@@ -926,9 +942,10 @@ def unload_redshift(
926942
Recommended for memory restricted environments.
927943
keep_files : bool
928944
Should keep the stage files?
929-
chunked : bool
930-
If True will break the data in smaller DataFrames (Non deterministic number of lines).
931-
Otherwise return a single DataFrame with the whole data.
945+
chunked : Union[int, bool]
946+
If passed will split the data in a Iterable of DataFrames (Memory friendly).
947+
If `True` wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize.
948+
If an `INTEGER` is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.
932949
use_threads : bool
933950
True to enable concurrent requests, False to disable multiple threads.
934951
If enabled os.cpu_count() will be used as the max number of threads.
@@ -979,6 +996,7 @@ def unload_redshift(
979996
return _read_parquet_iterator(
980997
paths=paths,
981998
categories=categories,
999+
chunked=chunked,
9821000
use_threads=use_threads,
9831001
boto3_session=session,
9841002
s3_additional_kwargs=s3_additional_kwargs,
@@ -991,13 +1009,14 @@ def _read_parquet_iterator(
9911009
keep_files: bool,
9921010
use_threads: bool,
9931011
categories: List[str] = None,
1012+
chunked: Union[bool, int] = True,
9941013
boto3_session: Optional[boto3.Session] = None,
9951014
s3_additional_kwargs: Optional[Dict[str, str]] = None,
9961015
) -> Iterator[pd.DataFrame]:
9971016
dfs: Iterator[pd.DataFrame] = s3.read_parquet(
9981017
path=paths,
9991018
categories=categories,
1000-
chunked=True,
1019+
chunked=chunked,
10011020
dataset=False,
10021021
use_threads=use_threads,
10031022
boto3_session=boto3_session,

0 commit comments

Comments
 (0)