Skip to content

Commit fec1f7e

Browse files
authored
Add deltalake support in AWS S3 with Pandas (#1834)
* Add deltalake support in S3
1 parent 90ec757 commit fec1f7e

File tree

10 files changed

+185
-27
lines changed

10 files changed

+185
-27
lines changed

.github/workflows/minimal-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ jobs:
3232
python -m pip install --upgrade pip
3333
python -m pip install poetry==1.2.2
3434
poetry config virtualenvs.create false --local
35-
poetry install --extras "sqlserver" -vvv
35+
poetry install --extras "sqlserver deltalake" -vvv
3636
- name: Test Metadata
3737
run: pytest tests/test_metadata.py
3838
- name: Test Session

.github/workflows/static-checking.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ jobs:
3131
python -m pip install --upgrade pip
3232
python -m pip install poetry==1.2.2
3333
poetry config virtualenvs.create false --local
34-
poetry install --extras "sqlserver" -vvv
34+
poetry install --extras "sqlserver deltalake" -vvv
3535
- name: mypy check
3636
run: mypy --install-types --non-interactive awswrangler
3737
- name: Flake8 Lint

CONTRIBUTING.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ or
104104

105105
* Install dependencies:
106106

107-
``poetry install --extras "sqlserver oracle sparql"``
107+
``poetry install --extras "sqlserver oracle sparql deltalake"``
108108

109109
* Run the validation script:
110110

@@ -135,7 +135,7 @@ or
135135

136136
* Install dependencies:
137137

138-
``poetry install --extras "sqlserver oracle sparql"``
138+
``poetry install --extras "sqlserver oracle sparql deltalake"``
139139

140140
* Go to the ``test_infra`` directory
141141

awswrangler/s3/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from awswrangler.s3._download import download # noqa
77
from awswrangler.s3._list import does_object_exist, list_buckets, list_directories, list_objects # noqa
88
from awswrangler.s3._merge_upsert_table import merge_upsert_table # noqa
9+
from awswrangler.s3._read_deltalake import read_deltalake # noqa
910
from awswrangler.s3._read_excel import read_excel # noqa
1011
from awswrangler.s3._read_parquet import read_parquet, read_parquet_metadata, read_parquet_table # noqa
1112
from awswrangler.s3._read_text import read_csv, read_fwf, read_json # noqa
@@ -27,6 +28,7 @@
2728
"list_buckets",
2829
"list_directories",
2930
"list_objects",
31+
"read_deltalake",
3032
"read_parquet",
3133
"read_parquet_metadata",
3234
"read_parquet_table",

awswrangler/s3/_read_deltalake.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""Amazon S3 Read Delta Lake Module (PRIVATE)."""
2+
import importlib.util
3+
from typing import Any, Dict, List, Optional, Tuple
4+
5+
import boto3
6+
import pandas as pd
7+
8+
from awswrangler import _utils
9+
10+
_deltalake_found = importlib.util.find_spec("deltalake")
11+
if _deltalake_found:
12+
from deltalake import DeltaTable # pylint: disable=import-error
13+
14+
15+
def _set_default_storage_options_kwargs(
16+
session: boto3.Session, s3_additional_kwargs: Optional[Dict[str, Any]]
17+
) -> Dict[str, Any]:
18+
defaults = {key.upper(): value for key, value in _utils.boto3_to_primitives(boto3_session=session).items()}
19+
s3_additional_kwargs = s3_additional_kwargs or {}
20+
return {
21+
**defaults,
22+
**s3_additional_kwargs,
23+
}
24+
25+
26+
def read_deltalake(
27+
path: Optional[str] = None,
28+
version: Optional[int] = None,
29+
partitions: Optional[List[Tuple[str, str, Any]]] = None,
30+
columns: Optional[List[str]] = None,
31+
without_files: bool = False,
32+
boto3_session: Optional[boto3.Session] = None,
33+
s3_additional_kwargs: Optional[Dict[str, str]] = None,
34+
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
35+
) -> pd.DataFrame:
36+
"""Load a Deltalake table data from an S3 path.
37+
38+
This function requires the `deltalake package
39+
<https://delta-io.github.io/delta-rs/python>`__.
40+
See the `How to load a Delta table
41+
<https://delta-io.github.io/delta-rs/python/usage.html#loading-a-delta-table>`__
42+
guide for loading instructions.
43+
44+
Parameters
45+
----------
46+
path: Optional[str]
47+
The path of the DeltaTable.
48+
version: Optional[int]
49+
The version of the DeltaTable.
50+
partitions: Optional[List[Tuple[str, str, Any]]
51+
A list of partition filters, see help(DeltaTable.files_by_partitions)
52+
for filter syntax.
53+
columns: Optional[List[str]]
54+
The columns to project. This can be a list of column names to include
55+
(order and duplicates are preserved).
56+
without_files: bool
57+
If True, load the table without tracking files (memory-friendly).
58+
Some append-only applications might not need to track files.
59+
boto3_session: Optional[boto3.Session()]
60+
Boto3 Session. If None, the default boto3 session is used.
61+
s3_additional_kwargs: Optional[Dict[str, str]]
62+
Forwarded to the Delta Table class for the storage options of the S3 backend.
63+
pyarrow_additional_kwargs: Optional[Dict[str, str]]
64+
Forwarded to the PyArrow to_pandas method.
65+
66+
Returns
67+
-------
68+
df: pd.DataFrame
69+
DataFrame with the results.
70+
71+
See Also
72+
--------
73+
deltalake.DeltaTable : Create a DeltaTable instance with the deltalake library.
74+
"""
75+
pyarrow_additional_kwargs = pyarrow_additional_kwargs or {} # TODO: Use defaults in 3.0.0 # pylint: disable=fixme
76+
storage_options = _set_default_storage_options_kwargs(_utils.ensure_session(boto3_session), s3_additional_kwargs)
77+
78+
return (
79+
DeltaTable(
80+
table_uri=path,
81+
version=version,
82+
storage_options=storage_options,
83+
without_files=without_files,
84+
)
85+
.to_pyarrow_table(partitions=partitions, columns=columns)
86+
.to_pandas(**pyarrow_additional_kwargs)
87+
)

docs/source/api.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ Amazon S3
4444
merge_datasets
4545
merge_upsert_table
4646
read_csv
47+
read_deltalake
4748
read_excel
4849
read_fwf
4950
read_json

poetry.lock

Lines changed: 40 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,13 @@ backoff = ">=1.11.1,<3.0.0"
4848
SPARQLWrapper = { version = ">=1.8.5,<3.0.0", optional = true }
4949
pyodbc = { version = "~4.0.32", optional = true }
5050
oracledb = { version = "~1.0.0", optional = true }
51+
deltalake = { version = "~0.6.4", optional = true }
5152

5253
[tool.poetry.extras]
5354
sqlserver = ["pyodbc"]
5455
oracle = ["oracledb"]
5556
sparql = ["SPARQLWrapper"]
57+
deltalake = ["deltalake"]
5658

5759
[tool.poetry.dev-dependencies]
5860
wheel = "^0.37.1"

tests/test_s3_deltalake.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import pandas as pd
2+
import pytest
3+
from deltalake import DeltaTable, write_deltalake
4+
5+
import awswrangler as wr
6+
7+
8+
@pytest.fixture(scope="session")
9+
def storage_options():
10+
return {"AWS_S3_ALLOW_UNSAFE_RENAME": "TRUE"}
11+
12+
13+
@pytest.mark.parametrize("s3_additional_kwargs", [None, {"ServerSideEncryption": "AES256"}])
14+
@pytest.mark.parametrize("pyarrow_additional_kwargs", [None, {"safe": True, "deduplicate_objects": False}])
15+
def test_read_deltalake(path, s3_additional_kwargs, pyarrow_additional_kwargs, storage_options):
16+
df = pd.DataFrame({"c0": [1, 2, 3], "c1": ["foo", None, "bar"], "c2": [3.0, 4.0, 5.0], "c3": [True, False, None]})
17+
write_deltalake(table_or_uri=path, data=df, storage_options=storage_options)
18+
19+
df2 = wr.s3.read_deltalake(
20+
path=path, s3_additional_kwargs=s3_additional_kwargs, pyarrow_additional_kwargs=pyarrow_additional_kwargs
21+
)
22+
assert df2.equals(df)
23+
24+
25+
def test_read_deltalake_versioned(path, storage_options):
26+
df = pd.DataFrame({"c0": [1, 2, 3], "c1": ["foo", "baz", "bar"]})
27+
write_deltalake(table_or_uri=path, data=df, storage_options=storage_options)
28+
table = DeltaTable(path, version=0, storage_options=storage_options)
29+
30+
df2 = wr.s3.read_deltalake(path=path)
31+
assert df2.equals(df)
32+
33+
df["c2"] = [True, False, True]
34+
write_deltalake(table_or_uri=table, data=df, mode="overwrite", overwrite_schema=True)
35+
36+
df3 = wr.s3.read_deltalake(path=path, version=0)
37+
assert df3.equals(df.drop("c2", axis=1))
38+
39+
df4 = wr.s3.read_deltalake(path=path, version=1)
40+
assert df4.equals(df)
41+
42+
43+
def test_read_deltalake_partitions(path, storage_options):
44+
df = pd.DataFrame({"c0": [1, 2, 3], "c1": [True, False, True], "par0": ["foo", "foo", "bar"], "par1": [1, 2, 2]})
45+
write_deltalake(table_or_uri=path, data=df, partition_by=["par0", "par1"], storage_options=storage_options)
46+
47+
df2 = wr.s3.read_deltalake(path=path, columns=["c0"], partitions=[("par0", "=", "foo"), ("par1", "=", "1")])
48+
assert df2.shape == (1, 1)

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ deps =
88
.[sqlserver]
99
.[oracle]
1010
.[sparql]
11+
.[deltalake]
1112
pytest==7.1.2
1213
pytest-rerunfailures==10.2
1314
pytest-xdist==3.0.2

0 commit comments

Comments
 (0)