Skip to content

Commit 014228f

Browse files
committed
Enabling readahead cache for s3fs.
1 parent da6c542 commit 014228f

File tree

3 files changed

+15
-8
lines changed

3 files changed

+15
-8
lines changed

awswrangler/_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ def get_fs(
136136
fs: s3fs.S3FileSystem = s3fs.S3FileSystem(
137137
anon=False,
138138
use_ssl=True,
139-
default_cache_type="none",
139+
default_cache_type="readahead",
140140
default_fill_cache=False,
141-
default_block_size=134_217_728, # 128 MB (50 * 2**20)
141+
default_block_size=1_073_741_824, # 1024 MB (1024 * 2**20)
142142
config_kwargs={"retries": {"max_attempts": 15}},
143143
session=ensure_session(session=session)._session, # pylint: disable=protected-access
144144
s3_additional_kwargs=s3_additional_kwargs,

awswrangler/s3.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1693,6 +1693,7 @@ def read_parquet(
16931693
boto3_session=boto3_session,
16941694
s3_additional_kwargs=s3_additional_kwargs,
16951695
)
1696+
_logger.debug("pyarrow.parquet.ParquetDataset initialized.")
16961697
if chunked is False:
16971698
return _read_parquet(
16981699
data=data, columns=columns, categories=categories, use_threads=use_threads, validate_schema=validate_schema
@@ -1710,13 +1711,17 @@ def _read_parquet(
17101711
validate_schema: bool = True,
17111712
) -> pd.DataFrame:
17121713
tables: List[pa.Table] = []
1714+
_logger.debug("Reading pieces...")
17131715
for piece in data.pieces:
17141716
table: pa.Table = piece.read(
17151717
columns=columns, use_threads=use_threads, partitions=data.partitions, use_pandas_metadata=False
17161718
)
1719+
_logger.debug("Appending piece in the list...")
17171720
tables.append(table)
17181721
promote: bool = not validate_schema
1722+
_logger.debug("Concating pieces...")
17191723
table = pa.lib.concat_tables(tables, promote=promote)
1724+
_logger.debug("Converting PyArrow table to Pandas DataFrame...")
17201725
return table.to_pandas(
17211726
use_threads=use_threads,
17221727
split_blocks=True,

testing/test_awswrangler/test_data_lake.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,18 +1300,18 @@ def test_catalog_versioning(bucket, database):
13001300

13011301
# Version 1
13021302
df = pd.DataFrame({"c1": ["foo", "boo"]})
1303-
paths = wr.s3.to_parquet(
1303+
paths1 = wr.s3.to_parquet(
13041304
df=df, path=path, dataset=True, database=database, table=table, mode="overwrite", catalog_versioning=True
13051305
)["paths"]
1306-
wr.s3.wait_objects_exist(paths=paths, use_threads=False)
1306+
wr.s3.wait_objects_exist(paths=paths1, use_threads=False)
13071307
df = wr.athena.read_sql_table(table=table, database=database)
13081308
assert len(df.index) == 2
13091309
assert len(df.columns) == 1
13101310
assert str(df.c1.dtype) == "string"
13111311

13121312
# Version 2
13131313
df = pd.DataFrame({"c1": [1.0, 2.0]})
1314-
paths = wr.s3.to_csv(
1314+
paths2 = wr.s3.to_csv(
13151315
df=df,
13161316
path=path,
13171317
dataset=True,
@@ -1321,15 +1321,16 @@ def test_catalog_versioning(bucket, database):
13211321
catalog_versioning=True,
13221322
index=False,
13231323
)["paths"]
1324-
wr.s3.wait_objects_exist(paths=paths, use_threads=False)
1324+
wr.s3.wait_objects_exist(paths=paths2, use_threads=False)
1325+
wr.s3.wait_objects_not_exist(paths=paths1, use_threads=False)
13251326
df = wr.athena.read_sql_table(table=table, database=database)
13261327
assert len(df.index) == 2
13271328
assert len(df.columns) == 1
13281329
assert str(df.c1.dtype).startswith("float")
13291330

13301331
# Version 3 (removing version 2)
13311332
df = pd.DataFrame({"c1": [True, False]})
1332-
paths = wr.s3.to_csv(
1333+
paths3 = wr.s3.to_csv(
13331334
df=df,
13341335
path=path,
13351336
dataset=True,
@@ -1339,7 +1340,8 @@ def test_catalog_versioning(bucket, database):
13391340
catalog_versioning=False,
13401341
index=False,
13411342
)["paths"]
1342-
wr.s3.wait_objects_exist(paths=paths, use_threads=False)
1343+
wr.s3.wait_objects_exist(paths=paths3, use_threads=False)
1344+
wr.s3.wait_objects_not_exist(paths=paths2, use_threads=False)
13431345
df = wr.athena.read_sql_table(table=table, database=database)
13441346
assert len(df.index) == 2
13451347
assert len(df.columns) == 1

0 commit comments

Comments
 (0)