Skip to content

Commit 6214cd2

Browse files
committed
Add Excel support and some refactoring.
1 parent 8cb0b79 commit 6214cd2

17 files changed

+363
-83
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
> An [AWS Professional Service](https://aws.amazon.com/professional-services/) open source initiative | [email protected]
88
9-
[![Release](https://img.shields.io/badge/release-2.2.0-brightgreen.svg)](https://pypi.org/project/awswrangler/)
9+
[![Release](https://img.shields.io/badge/release-2.3.0-brightgreen.svg)](https://pypi.org/project/awswrangler/)
1010
[![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-brightgreen.svg)](https://anaconda.org/conda-forge/awswrangler)
1111
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
1212
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)

awswrangler/__metadata__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@
77

88
__title__: str = "awswrangler"
99
__description__: str = "Pandas on AWS."
10-
__version__: str = "2.2.0"
10+
__version__: str = "2.3.0"
1111
__license__: str = "Apache License 2.0"

awswrangler/s3/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
from awswrangler.s3._delete import delete_objects # noqa
55
from awswrangler.s3._describe import describe_objects, get_bucket_region, size_objects # noqa
66
from awswrangler.s3._list import does_object_exist, list_directories, list_objects # noqa
7+
from awswrangler.s3._read_excel import read_excel # noqa
78
from awswrangler.s3._read_parquet import read_parquet, read_parquet_metadata, read_parquet_table # noqa
89
from awswrangler.s3._read_text import read_csv, read_fwf, read_json # noqa
910
from awswrangler.s3._wait import wait_objects_exist, wait_objects_not_exist # noqa
11+
from awswrangler.s3._write_excel import to_excel # noqa
1012
from awswrangler.s3._write_parquet import store_parquet_metadata, to_parquet # noqa
1113
from awswrangler.s3._write_text import to_csv, to_json # noqa
1214

@@ -32,4 +34,6 @@
3234
"to_parquet",
3335
"to_csv",
3436
"to_json",
37+
"to_excel",
38+
"read_excel",
3539
]

awswrangler/s3/_fs.py

Lines changed: 17 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import socket
99
from contextlib import contextmanager
1010
from errno import ESPIPE
11-
from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Set, Tuple, Type, Union, cast
11+
from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Set, Tuple, Union, cast
1212

1313
import boto3
1414
from botocore.exceptions import ReadTimeoutError
@@ -188,7 +188,7 @@ def close(self) -> List[Dict[str, Union[str, int]]]:
188188
return self._sort_by_part_number(parts=self._results)
189189

190190

191-
class _S3ObjectBase: # pylint: disable=too-many-instance-attributes
191+
class _S3ObjectBase(io.RawIOBase): # pylint: disable=too-many-instance-attributes
192192
"""Class to abstract S3 objects as ordinary files."""
193193

194194
def __init__(
@@ -201,12 +201,8 @@ def __init__(
201201
boto3_session: Optional[boto3.Session],
202202
newline: Optional[str],
203203
encoding: Optional[str],
204-
raw_buffer: bool,
205204
) -> None:
206-
if raw_buffer is True and "w" not in mode:
207-
raise exceptions.InvalidArgumentValue("raw_buffer=True is only acceptable on write mode.")
208-
self._raw_buffer: bool = raw_buffer
209-
self.closed: bool = False
205+
super().__init__()
210206
self._use_threads = use_threads
211207
self._newline: str = "\n" if newline is None else newline
212208
self._encoding: str = "utf-8" if encoding is None else encoding
@@ -408,9 +404,9 @@ def seek(self, loc: int, whence: int = 0) -> int:
408404

409405
def flush(self, force: bool = False) -> None:
410406
"""Write buffered data to S3."""
411-
if self.closed:
407+
if self.closed: # pylint: disable=using-constant-test
412408
raise RuntimeError("I/O operation on closed file.")
413-
if self.writable():
409+
if self.writable() and self._buffer.closed is False:
414410
total_size: int = self._buffer.tell()
415411
if total_size < _MIN_WRITE_BLOCK and force is False:
416412
return None
@@ -465,7 +461,7 @@ def writable(self) -> bool:
465461

466462
def close(self) -> None:
467463
"""Clean up the cache."""
468-
if self.closed:
464+
if self.closed: # pylint: disable=using-constant-test
469465
return None
470466
if self.writable():
471467
_logger.debug("Closing: %s parts", self._parts_count)
@@ -488,7 +484,7 @@ def close(self) -> None:
488484
),
489485
)
490486
_logger.debug("complete_multipart_upload done!")
491-
elif self._buffer.tell() > 0 or self._raw_buffer is True:
487+
elif self._buffer.tell() > 0:
492488
_logger.debug("put_object")
493489
_utils.try_it(
494490
f=self._client.put_object,
@@ -503,23 +499,17 @@ def close(self) -> None:
503499
),
504500
)
505501
self._parts_count = 0
502+
self._upload_proxy.close()
506503
self._buffer.seek(0)
507504
self._buffer.truncate(0)
508-
self._upload_proxy.close()
509505
self._buffer.close()
510506
elif self.readable():
511507
self._cache = b""
512508
else:
513509
raise RuntimeError(f"Invalid mode: {self._mode}")
514-
self.closed = True
510+
super().close()
515511
return None
516512

517-
def get_raw_buffer(self) -> io.BytesIO:
518-
"""Return the Raw Buffer if it is possible."""
519-
if self._raw_buffer is False:
520-
raise exceptions.InvalidArgumentValue("Trying to get raw buffer with raw_buffer=False.")
521-
return self._buffer
522-
523513
def read(self, length: int = -1) -> bytes:
524514
"""Return cached data and fetch on demand chunks."""
525515
if self.readable() is False:
@@ -534,8 +524,9 @@ def read(self, length: int = -1) -> bytes:
534524
self._loc += len(out)
535525
return out
536526

537-
def readline(self, length: int = -1) -> bytes:
527+
def readline(self, length: Optional[int] = -1) -> bytes:
538528
"""Read until the next line terminator."""
529+
length = -1 if length is None else length
539530
end: int = self._loc + self._s3_block_size
540531
end = self._size if end > self._size else end
541532
self._fetch(self._loc, end)
@@ -553,17 +544,11 @@ def readline(self, length: int = -1) -> bytes:
553544
end = self._size if end > self._size else end
554545
self._fetch(self._loc, end)
555546

556-
def readlines(self) -> List[bytes]:
557-
"""Return all lines as list."""
558-
return list(self)
559-
560-
561-
class _S3ObjectWriter(_S3ObjectBase):
562-
def write(self, data: bytes) -> int:
547+
def write(self, data: Union[bytes, bytearray, memoryview]) -> int: # type: ignore
563548
"""Write data to buffer and only upload on close() or if buffer is greater than or equal to _MIN_WRITE_BLOCK."""
564549
if self.writable() is False:
565550
raise RuntimeError("File not in write mode.")
566-
if self.closed:
551+
if self.closed: # pylint: disable=using-constant-test
567552
raise RuntimeError("I/O operation on closed file.")
568553
n: int = self._buffer.write(data)
569554
self._loc += n
@@ -583,14 +568,12 @@ def open_s3_object(
583568
boto3_session: Optional[boto3.Session] = None,
584569
newline: Optional[str] = "\n",
585570
encoding: Optional[str] = "utf-8",
586-
raw_buffer: bool = False,
587-
) -> Iterator[Union[_S3ObjectBase, _S3ObjectWriter, io.TextIOWrapper, io.BytesIO]]:
571+
) -> Iterator[Union[_S3ObjectBase, io.TextIOWrapper]]:
588572
"""Return a _S3Object or TextIOWrapper based in the received mode."""
589-
s3obj: Optional[Union[_S3ObjectBase, _S3ObjectWriter]] = None
573+
s3obj: Optional[_S3ObjectBase] = None
590574
text_s3obj: Optional[io.TextIOWrapper] = None
591-
s3_class: Union[Type[_S3ObjectBase], Type[_S3ObjectWriter]] = _S3ObjectWriter if "w" in mode else _S3ObjectBase
592575
try:
593-
s3obj = s3_class(
576+
s3obj = _S3ObjectBase(
594577
path=path,
595578
s3_block_size=s3_block_size,
596579
mode=mode,
@@ -599,11 +582,8 @@ def open_s3_object(
599582
boto3_session=boto3_session,
600583
encoding=encoding,
601584
newline=newline,
602-
raw_buffer=raw_buffer,
603585
)
604-
if raw_buffer is True: # Only useful for plain io.BytesIO write
605-
yield s3obj.get_raw_buffer()
606-
elif "b" in mode: # binary
586+
if "b" in mode: # binary
607587
yield s3obj
608588
else: # text
609589
text_s3obj = io.TextIOWrapper(

awswrangler/s3/_read_excel.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Amazon S3 Excel Read Module (PRIVATE)."""
2+
3+
import logging
4+
from typing import Any, Dict, Optional
5+
6+
import boto3
7+
import pandas as pd
8+
9+
from awswrangler import _utils, exceptions
10+
from awswrangler.s3._fs import open_s3_object
11+
12+
_logger: logging.Logger = logging.getLogger(__name__)
13+
14+
15+
def read_excel(
16+
path: str,
17+
use_threads: bool = True,
18+
boto3_session: Optional[boto3.Session] = None,
19+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
20+
**pandas_kwargs: Any,
21+
) -> pd.DataFrame:
22+
"""Read EXCEL file(s) from from a received S3 path.
23+
24+
Note
25+
----
26+
This function accepts any Pandas's read_excel() argument.
27+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_excel.html
28+
29+
Note
30+
----
31+
In case of `use_threads=True` the number of threads
32+
that will be spawned will be gotten from os.cpu_count().
33+
34+
Parameters
35+
----------
36+
path : Union[str, List[str]]
37+
S3 path (e.g. ``[s3://bucket/key0``).
38+
use_threads : bool
39+
True to enable concurrent requests, False to disable multiple threads.
40+
If enabled os.cpu_count() will be used as the max number of threads.
41+
boto3_session : boto3.Session(), optional
42+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
43+
s3_additional_kwargs : Optional[Dict[str, Any]]
44+
Forward to botocore requests, only "SSECustomerAlgorithm" and "SSECustomerKey" arguments will be considered.
45+
pandas_kwargs:
46+
KEYWORD arguments forwarded to pandas.read_excel(). You can NOT pass `pandas_kwargs` explicit, just add valid
47+
Pandas arguments in the function call and Wrangler will accept it.
48+
e.g. wr.s3.read_excel(path, na_rep="", verbose=True)
49+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_excel.html
50+
51+
Returns
52+
-------
53+
pandas.DataFrame
54+
Pandas DataFrame.
55+
56+
Examples
57+
--------
58+
Reading an EXCEL file
59+
60+
>>> import awswrangler as wr
61+
>>> df = wr.s3.read_excel('s3://bucket/key')
62+
63+
"""
64+
if "pandas_kwargs" in pandas_kwargs:
65+
raise exceptions.InvalidArgument(
66+
"You can NOT pass `pandas_kwargs` explicit, just add valid "
67+
"Pandas arguments in the function call and Wrangler will accept it."
68+
"e.g. wr.s3.read_excel(path, na_rep="
69+
", verbose=True)"
70+
)
71+
session: boto3.Session = _utils.ensure_session(session=boto3_session)
72+
with open_s3_object(
73+
path=path,
74+
mode="rb",
75+
use_threads=use_threads,
76+
s3_block_size=-1, # One shot download
77+
s3_additional_kwargs=s3_additional_kwargs,
78+
boto3_session=session,
79+
) as f:
80+
pandas_kwargs["engine"] = "openpyxl"
81+
_logger.debug("pandas_kwargs: %s", pandas_kwargs)
82+
return pd.read_excel(f, **pandas_kwargs)

awswrangler/s3/_write_excel.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""Amazon S3 Excel Write Module (PRIVATE)."""
2+
3+
import logging
4+
from typing import Any, Dict, Optional
5+
6+
import boto3
7+
import pandas as pd
8+
9+
from awswrangler import _utils, exceptions
10+
from awswrangler.s3._fs import open_s3_object
11+
12+
_logger: logging.Logger = logging.getLogger(__name__)
13+
14+
15+
def to_excel(
16+
df: pd.DataFrame,
17+
path: str,
18+
boto3_session: Optional[boto3.Session] = None,
19+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
20+
use_threads: bool = True,
21+
**pandas_kwargs: Any,
22+
) -> str:
23+
"""Write EXCEL file on Amazon S3.
24+
25+
Note
26+
----
27+
This function accepts any Pandas's read_excel() argument.
28+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_excel.html
29+
30+
Note
31+
----
32+
In case of `use_threads=True` the number of threads
33+
that will be spawned will be gotten from os.cpu_count().
34+
35+
Parameters
36+
----------
37+
df: pandas.DataFrame
38+
Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
39+
path : str
40+
Amazon S3 path (e.g. s3://bucket/filename.xlsx).
41+
boto3_session : boto3.Session(), optional
42+
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.
43+
s3_additional_kwargs : Optional[Dict[str, Any]]
44+
Forward to botocore requests. Valid parameters: "ACL", "Metadata", "ServerSideEncryption", "StorageClass",
45+
"SSECustomerAlgorithm", "SSECustomerKey", "SSEKMSKeyId", "SSEKMSEncryptionContext", "Tagging".
46+
e.g. s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'}
47+
use_threads : bool
48+
True to enable concurrent requests, False to disable multiple threads.
49+
If enabled os.cpu_count() will be used as the max number of threads.
50+
pandas_kwargs:
51+
KEYWORD arguments forwarded to pandas.DataFrame.to_excel(). You can NOT pass `pandas_kwargs` explicit, just add
52+
valid Pandas arguments in the function call and Wrangler will accept it.
53+
e.g. wr.s3.to_excel(df, path, na_rep="", index=False)
54+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_excel.html
55+
56+
Returns
57+
-------
58+
str
59+
Written S3 path.
60+
61+
Examples
62+
--------
63+
Writing EXCEL file
64+
65+
>>> import awswrangler as wr
66+
>>> import pandas as pd
67+
>>> wr.s3.to_excel(df, 's3://bucket/filename.xlsx')
68+
69+
"""
70+
if "pandas_kwargs" in pandas_kwargs:
71+
raise exceptions.InvalidArgument(
72+
"You can NOT pass `pandas_kwargs` explicit, just add valid "
73+
"Pandas arguments in the function call and Wrangler will accept it."
74+
"e.g. wr.s3.to_excel(df, path, na_rep="
75+
", index=False)"
76+
)
77+
session: boto3.Session = _utils.ensure_session(session=boto3_session)
78+
with open_s3_object(
79+
path=path,
80+
mode="wb",
81+
use_threads=use_threads,
82+
s3_additional_kwargs=s3_additional_kwargs,
83+
boto3_session=session,
84+
) as f:
85+
pandas_kwargs["engine"] = "openpyxl"
86+
_logger.debug("pandas_kwargs: %s", pandas_kwargs)
87+
df.to_excel(f, **pandas_kwargs)
88+
return path

0 commit comments

Comments
 (0)