Skip to content

Commit f95194b

Browse files
committed
Add Pandas.read_csv_prefix()
1 parent 5bff8c2 commit f95194b

File tree

3 files changed

+98
-9
lines changed

3 files changed

+98
-9
lines changed

awswrangler/pandas.py

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1647,7 +1647,7 @@ def read_sql_aurora(self,
16471647

16481648
def read_csv_list(
16491649
self,
1650-
paths,
1650+
paths: List[str],
16511651
max_result_size=None,
16521652
header: Optional[str] = "infer",
16531653
names=None,
@@ -1738,7 +1738,7 @@ def read_csv_list(
17381738

17391739
def _read_csv_list_iterator(
17401740
self,
1741-
paths,
1741+
paths: List[str],
17421742
max_result_size=None,
17431743
header="infer",
17441744
names=None,
@@ -1802,3 +1802,68 @@ def _read_csv_list_iterator(
18021802
infer_datetime_format=infer_datetime_format,
18031803
encoding=encoding,
18041804
converters=converters)
1805+
1806+
def read_csv_prefix(
1807+
self,
1808+
path_prefix: str,
1809+
max_result_size=None,
1810+
header: Optional[str] = "infer",
1811+
names=None,
1812+
usecols=None,
1813+
dtype=None,
1814+
sep=",",
1815+
thousands=None,
1816+
decimal=".",
1817+
lineterminator="\n",
1818+
quotechar='"',
1819+
quoting=csv.QUOTE_MINIMAL,
1820+
escapechar=None,
1821+
parse_dates: Union[bool, Dict, List] = False,
1822+
infer_datetime_format=False,
1823+
encoding="utf-8",
1824+
converters=None,
1825+
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
1826+
"""
1827+
Read CSV files from AWS S3 PREFIX using optimized strategies.
1828+
Try to mimic as most as possible pandas.read_csv()
1829+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
1830+
P.S. max_result_size != None tries to mimic the chunksize behaviour in pandas.read_sql()
1831+
1832+
:param path_prefix: AWS S3 path prefix (E.g. S3://BUCKET_NAME/PREFIX)
1833+
:param max_result_size: Max number of bytes on each request to S3
1834+
:param header: Same as pandas.read_csv()
1835+
:param names: Same as pandas.read_csv()
1836+
:param usecols: Same as pandas.read_csv()
1837+
:param dtype: Same as pandas.read_csv()
1838+
:param sep: Same as pandas.read_csv()
1839+
:param thousands: Same as pandas.read_csv()
1840+
:param decimal: Same as pandas.read_csv()
1841+
:param lineterminator: Same as pandas.read_csv()
1842+
:param quotechar: Same as pandas.read_csv()
1843+
:param quoting: Same as pandas.read_csv()
1844+
:param escapechar: Same as pandas.read_csv()
1845+
:param parse_dates: Same as pandas.read_csv()
1846+
:param infer_datetime_format: Same as pandas.read_csv()
1847+
:param encoding: Same as pandas.read_csv()
1848+
:param converters: Same as pandas.read_csv()
1849+
:return: Pandas Dataframe or Iterator of Pandas Dataframes if max_result_size != None
1850+
"""
1851+
paths: List[str] = self._session.s3.list_objects(path=path_prefix)
1852+
paths = [p for p in paths if not p.endswith("/")]
1853+
return self.read_csv_list(paths=paths,
1854+
max_result_size=max_result_size,
1855+
header=header,
1856+
names=names,
1857+
usecols=usecols,
1858+
dtype=dtype,
1859+
sep=sep,
1860+
thousands=thousands,
1861+
decimal=decimal,
1862+
lineterminator=lineterminator,
1863+
quotechar=quotechar,
1864+
quoting=quoting,
1865+
escapechar=escapechar,
1866+
parse_dates=parse_dates,
1867+
infer_datetime_format=infer_datetime_format,
1868+
encoding=encoding,
1869+
converters=converters)

awswrangler/s3.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -227,14 +227,15 @@ def delete_objects_batch(session_primitives: "SessionPrimitives", bucket, batch)
227227
for bounder in bounders:
228228
client_s3.delete_objects(Bucket=bucket, Delete={"Objects": batch[bounder[0]:bounder[1]]})
229229

230-
def list_objects(self, path):
230+
def list_objects(self, path: str) -> List[str]:
231+
bucket: str
231232
bucket, path = self.parse_path(path=path)
232-
args = {"Bucket": bucket, "MaxKeys": 1000, "Prefix": path}
233-
next_continuation_token = True
234-
keys = []
235-
while next_continuation_token:
236-
res = self._client_s3.list_objects_v2(**args)
237-
if not res.get("Contents"):
233+
args: Dict[str, Any] = {"Bucket": bucket, "MaxKeys": 1000, "Prefix": path}
234+
next_continuation_token: str = ""
235+
keys: List[str] = []
236+
while next_continuation_token is not None:
237+
res: Dict[str, Any] = self._client_s3.list_objects_v2(**args)
238+
if res.get("Contents") is None:
238239
break
239240
keys += [f"s3://{bucket}/{x.get('Key')}" for x in res.get("Contents")]
240241
next_continuation_token = res.get("NextContinuationToken")

testing/test_awswrangler/test_pandas.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1749,6 +1749,7 @@ def test_aurora_mysql_unload_simple(bucket, mysql_parameters):
17491749

17501750
@pytest.mark.parametrize("sample, row_num", [("data_samples/micro.csv", 30), ("data_samples/small.csv", 100)])
17511751
def test_read_csv_list(bucket, sample, row_num):
1752+
wr.s3.delete_objects(path=f"s3://{bucket}/")
17521753
n = 10
17531754
paths = []
17541755
for i in range(n):
@@ -1762,6 +1763,7 @@ def test_read_csv_list(bucket, sample, row_num):
17621763

17631764
@pytest.mark.parametrize("sample, row_num", [("data_samples/micro.csv", 30), ("data_samples/small.csv", 100)])
17641765
def test_read_csv_list_iterator(bucket, sample, row_num):
1766+
wr.s3.delete_objects(path=f"s3://{bucket}/")
17651767
n = 10
17661768
paths = []
17671769
for i in range(n):
@@ -2001,3 +2003,24 @@ def test_aurora_mysql_load_special(bucket, mysql_parameters):
20012003
assert rows[2][2] == "\\\\\\\\"
20022004
assert rows[3][2] == "\"\"\"\""
20032005
conn.close()
2006+
2007+
2008+
@pytest.mark.parametrize("sample, row_num", [("data_samples/micro.csv", 30), ("data_samples/small.csv", 100)])
2009+
def test_read_csv_prefix_iterator(bucket, sample, row_num):
2010+
wr.s3.delete_objects(path=f"s3://{bucket}/")
2011+
n = 10
2012+
paths = []
2013+
for i in range(n):
2014+
key = f"{sample}_{i}"
2015+
boto3.client("s3").upload_file(sample, bucket, key)
2016+
paths.append(f"s3://{bucket}/{key}")
2017+
sleep(15)
2018+
2019+
df_iter = wr.pandas.read_csv_prefix(path_prefix=f"s3://{bucket}/{sample}_", max_result_size=200)
2020+
total_count = 0
2021+
for df in df_iter:
2022+
count = len(df.index)
2023+
print(f"count: {count}")
2024+
total_count += count
2025+
wr.s3.delete_listed_objects(objects_paths=paths)
2026+
assert total_count == row_num * n

0 commit comments

Comments
 (0)