Skip to content

Commit 6a8cc81

Browse files
committed
fix: not supported
1 parent c534843 commit 6a8cc81

File tree

2 files changed

+175
-5
lines changed

2 files changed

+175
-5
lines changed

awswrangler/s3/_write_deltalake.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,6 @@ def to_deltalake_streaming(
193193
boto3_session: boto3.Session | None = None,
194194
s3_additional_kwargs: dict[str, str] | None = None,
195195
batch_size: int | None = None,
196-
max_open_files: int | None = None,
197-
max_rows_per_file: int | None = None,
198196
target_file_size: int | None = None,
199197
) -> None:
200198
dtype = dtype or {}
@@ -224,7 +222,5 @@ def to_deltalake_streaming(
224222
mode=mode,
225223
schema_mode=schema_mode,
226224
storage_options=storage_options,
227-
max_open_files=max_open_files,
228-
max_rows_per_file=max_rows_per_file,
229225
target_file_size=target_file_size,
230226
)

tests/unit/test_s3_deltalake.py

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,36 @@
11
from __future__ import annotations
22

3-
from typing import Any, Iterator
3+
from typing import Any, Iterator, Iterable
44

55
import boto3
6+
import pyarrow as pa
67
import pytest
8+
from pandas.testing import assert_frame_equal
79

810
import awswrangler as wr
911
import awswrangler.pandas as pd
12+
from awswrangler.s3._write_deltalake import _df_iter_to_record_batch_reader
1013

1114
from .._utils import (
1215
get_time_str_with_random_suffix,
1316
)
1417

1518

19+
def assert_df_equal_unordered(left: pd.DataFrame, right: pd.DataFrame, by: list[str]) -> None:
20+
"""Compare two dataframes ignoring row order and dtypes."""
21+
l2 = left.sort_values(by).reset_index(drop=True)
22+
r2 = right.sort_values(by).reset_index(drop=True)
23+
24+
assert_frame_equal(l2, r2, check_dtype=False, check_like=True)
25+
26+
1627
@pytest.fixture(scope="session")
1728
def lock_dynamodb_table() -> Iterator[str]:
1829
name = f"deltalake_lock_{get_time_str_with_random_suffix()}"
1930
print(f"Table name: {name}")
2031

2132
dynamodb_client = boto3.client("dynamodb")
33+
2234
dynamodb_client.create_table(
2335
TableName=name,
2436
BillingMode="PAY_PER_REQUEST",
@@ -94,3 +106,165 @@ def test_read_deltalake_partitions(path: str, lock_settings: dict[str, Any]) ->
94106

95107
df2 = wr.s3.read_deltalake(path=path, columns=["c0"], partitions=[("par0", "=", "foo"), ("par1", "=", "1")])
96108
assert df2.shape == (1, 1)
109+
110+
111+
@pytest.mark.parametrize("chunksize", [2, 10])
112+
def test_to_deltalake_streaming_single_commit_overwrite(
113+
path: str,
114+
lock_settings: dict[str, Any],
115+
chunksize: int,
116+
) -> None:
117+
df1 = pd.DataFrame({"c0": [1, 1], "c1": [10, 11], "v": [100, 200]})
118+
df2 = pd.DataFrame({"c0": [2, 2], "c1": [12, 13], "v": [300, 400]})
119+
120+
def dfs() -> Iterable[pd.DataFrame]:
121+
yield df1
122+
yield df2
123+
124+
wr.s3.to_deltalake_streaming(
125+
dfs=dfs(),
126+
path=path,
127+
mode="overwrite",
128+
partition_cols=["c0", "c1"],
129+
**lock_settings,
130+
)
131+
132+
out = wr.s3.read_deltalake(path=path)
133+
134+
expected = pd.concat([df1, df2], ignore_index=True)
135+
assert_df_equal_unordered(expected, out, by=["c0", "c1", "v"])
136+
137+
138+
def test_to_deltalake_streaming_creates_one_version_per_run(
139+
path: str,
140+
lock_settings: dict[str, Any],
141+
) -> None:
142+
df_run1_a = pd.DataFrame({"c0": [1], "c1": [10], "v": [111]})
143+
df_run1_b = pd.DataFrame({"c0": [1], "c1": [11], "v": [112]})
144+
145+
wr.s3.to_deltalake_streaming(
146+
dfs=[df_run1_a, df_run1_b],
147+
path=path,
148+
mode="overwrite",
149+
partition_cols=["c0", "c1"],
150+
**lock_settings,
151+
)
152+
153+
run1_expected = pd.concat([df_run1_a, df_run1_b], ignore_index=True)
154+
latest_v0 = wr.s3.read_deltalake(path=path)
155+
assert_df_equal_unordered(run1_expected, latest_v0, by=["c0", "c1", "v"])
156+
157+
df_run2_a = pd.DataFrame({"c0": [2], "c1": [12], "v": [221]})
158+
df_run2_b = pd.DataFrame({"c0": [2], "c1": [13], "v": [222]})
159+
160+
wr.s3.to_deltalake_streaming(
161+
dfs=[df_run2_a, df_run2_b],
162+
path=path,
163+
mode="overwrite",
164+
partition_cols=["c0", "c1"],
165+
**lock_settings,
166+
)
167+
168+
v0 = wr.s3.read_deltalake(path=path, version=0)
169+
v1 = wr.s3.read_deltalake(path=path, version=1)
170+
run2_expected = pd.concat([df_run2_a, df_run2_b], ignore_index=True)
171+
172+
assert_df_equal_unordered(run1_expected, v0, by=["c0", "c1", "v"])
173+
assert_df_equal_unordered(run2_expected, v1, by=["c0", "c1", "v"])
174+
175+
176+
def test_to_deltalake_streaming_partitions_and_filters(
177+
path: str,
178+
lock_settings: dict[str, Any],
179+
) -> None:
180+
df1 = pd.DataFrame({"c0": [1, 1, 2], "c1": [10, 11, 12], "v": [1, 2, 3]})
181+
df2 = pd.DataFrame({"c0": [2, 3, 3], "c1": [13, 14, 15], "v": [4, 5, 6]})
182+
183+
wr.s3.to_deltalake_streaming(
184+
dfs=[df1, df2],
185+
path=path,
186+
mode="overwrite",
187+
partition_cols=["c0", "c1"],
188+
**lock_settings,
189+
)
190+
191+
only_c02 = wr.s3.read_deltalake(
192+
path=path,
193+
partitions=[("c0", "=", "2")],
194+
columns=["v", "c1"],
195+
)
196+
assert set(only_c02["c1"].tolist()) == {12, 13}
197+
assert sorted(only_c02["v"].tolist()) == [3, 4]
198+
199+
200+
def test_to_deltalake_streaming_empty_iterator_is_noop(
201+
path: str,
202+
lock_settings: dict[str, Any],
203+
) -> None:
204+
wr.s3.to_deltalake_streaming(
205+
dfs=[pd.DataFrame({"c0": [1], "c1": [1], "v": [1]})],
206+
path=path,
207+
mode="overwrite",
208+
partition_cols=["c0", "c1"],
209+
**lock_settings,
210+
)
211+
baseline = wr.s3.read_deltalake(path=path)
212+
213+
def empty() -> Iterator[pd.DataFrame]:
214+
if False:
215+
yield pd.DataFrame() # pragma: no cover
216+
217+
wr.s3.to_deltalake_streaming(
218+
dfs=empty(),
219+
path=path,
220+
mode="overwrite",
221+
partition_cols=["c0", "c1"],
222+
**lock_settings,
223+
)
224+
after = wr.s3.read_deltalake(path=path)
225+
assert after.equals(baseline)
226+
227+
228+
def test_df_iter_to_record_batch_reader_schema_and_rows() -> None:
229+
df_empty = pd.DataFrame({"a": [], "b": []})
230+
df1 = pd.DataFrame({"a": [1, 2], "b": ["x", "y"]})
231+
df2 = pd.DataFrame({"a": [3], "b": ["z"]})
232+
233+
reader, schema = _df_iter_to_record_batch_reader(
234+
df_iter=[df_empty, df1, df2],
235+
index=False,
236+
dtype={},
237+
target_schema=None,
238+
batch_size=None,
239+
)
240+
241+
assert isinstance(schema, pa.Schema)
242+
assert {f.name for f in schema} == {"a", "b"}
243+
244+
table: pa.Table = reader.read_all()
245+
pdf = table.to_pandas()
246+
assert len(pdf) == 3
247+
assert sorted(pdf["a"].tolist()) == [1, 2, 3]
248+
assert set(pdf["b"].tolist()) == {"x", "y", "z"}
249+
250+
251+
def test_df_iter_to_record_batch_reader_respects_batch_size() -> None:
252+
df1 = pd.DataFrame({"a": list(range(5)), "b": ["x"] * 5})
253+
df2 = pd.DataFrame({"a": list(range(5, 9)), "b": ["y"] * 4})
254+
255+
reader, _ = _df_iter_to_record_batch_reader(
256+
df_iter=[df1, df2],
257+
index=False,
258+
dtype={},
259+
target_schema=None,
260+
batch_size=3,
261+
)
262+
263+
batch_count = 0
264+
row_count = 0
265+
for batch in reader:
266+
batch_count += 1
267+
row_count += batch.num_rows
268+
269+
assert batch_count >= 3
270+
assert row_count == 9

0 commit comments

Comments
 (0)