Skip to content

Commit d2635fd

Browse files
committed
Invalidating cache() for s3fs before reads
1 parent f2636f2 commit d2635fd

File tree

2 files changed

+13
-10
lines changed

2 files changed

+13
-10
lines changed

awswrangler/pandas.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1114,7 +1114,7 @@ def to_redshift(
11141114
:param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED" (https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html)
11151115
:param sortkey: List of columns to be sorted
11161116
:param preserve_index: Should we preserve the Dataframe index?
1117-
:param mode: append or overwrite
1117+
:param mode: append, overwrite or upsert
11181118
:param cast_columns: Dictionary of columns names and Redshift types to be casted. (E.g. {"col name": "SMALLINT", "col2 name": "FLOAT4"})
11191119
:return: None
11201120
"""
@@ -1348,6 +1348,7 @@ def _read_parquet_path(session_primitives: Any,
13481348
procs_cpu_bound = procs_cpu_bound if procs_cpu_bound is not None else session_primitives.procs_cpu_bound if session_primitives.procs_cpu_bound is not None else 1
13491349
use_threads: bool = True if procs_cpu_bound > 1 else False
13501350
fs: S3FileSystem = s3.get_fs(session_primitives=session_primitives)
1351+
fs.invalidate_cache()
13511352
fs = pa.filesystem._ensure_filesystem(fs)
13521353
logger.debug(f"Reading Parquet table: {path}")
13531354
table = pq.read_table(source=path, columns=columns, filters=filters, filesystem=fs, use_threads=use_threads)

awswrangler/s3.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ def mkdir_if_not_exists(fs, path):
2222

2323
def get_fs(session_primitives=None):
2424
aws_access_key_id, aws_secret_access_key, profile_name, config, s3_additional_kwargs = None, None, None, None, None
25+
args = {}
26+
2527
if session_primitives:
2628
if session_primitives.aws_access_key_id:
2729
aws_access_key_id = session_primitives.aws_access_key_id
@@ -33,17 +35,17 @@ def get_fs(session_primitives=None):
3335
config = {"retries": {"max_attempts": session_primitives.botocore_max_retries}}
3436
if session_primitives.s3_additional_kwargs:
3537
s3_additional_kwargs = session_primitives.s3_additional_kwargs
38+
3639
if profile_name:
37-
fs = s3fs.S3FileSystem(profile_name=profile_name,
38-
config_kwargs=config,
39-
s3_additional_kwargs=s3_additional_kwargs)
40+
args["profile_name"] = profile_name
4041
elif aws_access_key_id and aws_secret_access_key:
41-
fs = s3fs.S3FileSystem(key=aws_access_key_id,
42-
secret=aws_secret_access_key,
43-
config_kwargs=config,
44-
s3_additional_kwargs=s3_additional_kwargs)
45-
else:
46-
fs = s3fs.S3FileSystem(config_kwargs=config, s3_additional_kwargs=s3_additional_kwargs)
42+
args["key"] = aws_access_key_id,
43+
args["secret"] = aws_secret_access_key
44+
45+
args["config_kwargs"] = config,
46+
args["s3_additional_kwargs"] = s3_additional_kwargs
47+
fs = s3fs.S3FileSystem(**args)
48+
fs.invalidate_cache(path=None)
4749
return fs
4850

4951

0 commit comments

Comments
 (0)