Skip to content

Commit 3f3a9b7

Browse files
committed
Fix s3 filesystem abstraction bugs.
1 parent 5b39a17 commit 3f3a9b7

File tree

4 files changed

+23
-8
lines changed

4 files changed

+23
-8
lines changed

awswrangler/_utils.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,7 @@ def get_even_chunks_sizes(total_size: int, chunk_size: int, upper_bound: bool) -
282282
"""Calculate even chunks sizes (Best effort)."""
283283
round_func: Callable[[float], float] = math.ceil if upper_bound is True else math.floor
284284
num_chunks: int = int(round_func(float(total_size) / float(chunk_size)))
285-
if num_chunks < 1:
286-
raise ValueError("Invalid chunks size requirements.")
285+
num_chunks = 1 if num_chunks < 1 else num_chunks
287286
base_size: int = int(total_size / num_chunks)
288287
rest: int = total_size % num_chunks
289288
sizes: List[int] = list(itertools.repeat(base_size, num_chunks))

awswrangler/s3/_fs.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,9 +332,11 @@ def _fetch(self, start: int, end: int) -> None:
332332
)
333333

334334
# Calculating missing bytes in cache
335-
if (new_block_start < self._start and new_block_end > self._end) or (
336-
new_block_start > self._end and new_block_end < self._start
337-
): # Full block download
335+
if ( # Full block download
336+
(new_block_start < self._start and new_block_end > self._end)
337+
or new_block_start > self._end
338+
or new_block_end < self._start
339+
):
338340
self._cache = self._fetch_range_proxy(new_block_start, new_block_end)
339341
elif new_block_end > self._end:
340342
prune_diff: int = new_block_start - self._start

awswrangler/s3/_read_parquet.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def _read_parquet_metadata_file(
4040
path=path,
4141
mode="rb",
4242
use_threads=use_threads,
43-
s3_block_size=1_048_576, # 1 MB (1 * 2**20)
43+
s3_block_size=131_072, # 128 KB (128 * 2**10)
4444
s3_additional_kwargs=s3_additional_kwargs,
4545
boto3_session=boto3_session,
4646
) as f:
@@ -339,12 +339,14 @@ def _count_row_groups(
339339
path=path,
340340
mode="rb",
341341
use_threads=use_threads,
342-
s3_block_size=1_048_576, # 1 MB (1 * 2**20)
342+
s3_block_size=131_072, # 128 KB (128 * 2**10)
343343
s3_additional_kwargs=s3_additional_kwargs,
344344
boto3_session=boto3_session,
345345
) as f:
346346
pq_file: pyarrow.parquet.ParquetFile = pyarrow.parquet.ParquetFile(source=f, read_dictionary=categories)
347-
return cast(int, pq_file.num_row_groups)
347+
n: int = cast(int, pq_file.num_row_groups)
348+
_logger.debug("Row groups count: %d", n)
349+
return n
348350

349351

350352
def _read_parquet_row_group(

tests/test_fs.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,3 +174,15 @@ def test_cache(path, use_threads, block_size, text):
174174
assert value == text[i].encode("utf-8")
175175
assert len(s3obj._cache) in (block_size, block_size - 1, len(text))
176176
assert s3obj._cache == b""
177+
178+
179+
def test_cache_seek(path):
180+
client_s3 = boto3.client("s3")
181+
path = f"{path}0.txt"
182+
bucket, key = wr._utils.parse_path(path)
183+
text = "0" * 1_000_000 + "1" * 4
184+
client_s3.put_object(Body=text, Bucket=bucket, Key=key)
185+
with open_s3_object(path, mode="rb", s3_block_size=1_000) as s3obj:
186+
s3obj.seek(1_000_000)
187+
assert s3obj.read(100).decode("utf-8") == "1" * 4
188+
assert s3obj._cache == b""

0 commit comments

Comments
 (0)