Skip to content

Commit f02f895

Browse files
committed
feat(s3): add to_deltalake_streaming for single-commit Delta writes
1 parent 3521ddc commit f02f895

File tree

2 files changed

+122
-2
lines changed

2 files changed

+122
-2
lines changed

awswrangler/s3/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from awswrangler.s3._select import select_query
1414
from awswrangler.s3._upload import upload
1515
from awswrangler.s3._wait import wait_objects_exist, wait_objects_not_exist
16-
from awswrangler.s3._write_deltalake import to_deltalake
16+
from awswrangler.s3._write_deltalake import to_deltalake, to_deltalake_streaming
1717
from awswrangler.s3._write_excel import to_excel
1818
from awswrangler.s3._write_orc import to_orc
1919
from awswrangler.s3._write_parquet import store_parquet_metadata, to_parquet
@@ -49,6 +49,7 @@
4949
"to_csv",
5050
"to_json",
5151
"to_deltalake",
52+
"to_deltalake_streaming",
5253
"to_excel",
5354
"read_excel",
5455
"download",

awswrangler/s3/_write_deltalake.py

Lines changed: 120 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from __future__ import annotations
44

5-
from typing import TYPE_CHECKING, Any, Literal
5+
from typing import TYPE_CHECKING, Any, Iterable, Iterator, Literal
66

77
import boto3
88
import pandas as pd
@@ -30,6 +30,7 @@ def _set_default_storage_options_kwargs(
3030
defaults = {key.upper(): value for key, value in _utils.boto3_to_primitives(boto3_session=boto3_session).items()}
3131
defaults["AWS_REGION"] = defaults.pop("REGION_NAME")
3232
defaults["AWS_SESSION_TOKEN"] = "" if defaults["AWS_SESSION_TOKEN"] is None else defaults["AWS_SESSION_TOKEN"]
33+
3334
s3_additional_kwargs = s3_additional_kwargs or {}
3435

3536
s3_lock_arguments = {}
@@ -133,3 +134,121 @@ def to_deltalake(
133134
schema_mode=schema_mode,
134135
storage_options=storage_options,
135136
)
137+
138+
139+
140+
def _df_iter_to_record_batch_reader(
141+
df_iter: Iterable[pd.DataFrame],
142+
*,
143+
index: bool,
144+
dtype: dict[str, str],
145+
target_schema: pa.Schema | None = None,
146+
batch_size: int | None = None,
147+
) -> tuple[pa.RecordBatchReader, pa.Schema]:
148+
"""
149+
Convert an iterable of Pandas DataFrames into a single Arrow RecordBatchReader
150+
suitable for a single delta-rs commit. The first *non-empty* DataFrame fixes the schema.
151+
152+
Returns
153+
-------
154+
(reader, schema)
155+
reader: pa.RecordBatchReader streaming all chunks as Arrow batches
156+
schema: pa.Schema used for conversion
157+
"""
158+
it = iter(df_iter)
159+
160+
first_df: pd.DataFrame | None = None
161+
for df in it:
162+
if not df.empty:
163+
first_df = df
164+
break
165+
166+
if first_df is None:
167+
empty_schema = pa.schema([])
168+
empty_reader = pa.RecordBatchReader.from_batches(empty_schema, [])
169+
return empty_reader, empty_schema
170+
171+
schema = target_schema or _data_types.pyarrow_schema_from_pandas(
172+
df=first_df, index=index, ignore_cols=None, dtype=dtype
173+
)
174+
175+
def batches() -> Iterator[pa.RecordBatch]:
176+
first_tbl: pa.Table = _df_to_table(first_df, schema, index, dtype)
177+
for b in (first_tbl.to_batches(batch_size) if batch_size is not None else first_tbl.to_batches()):
178+
yield b
179+
180+
for df in it:
181+
if df.empty:
182+
continue
183+
tbl: pa.Table = _df_to_table(df, schema, index, dtype)
184+
for b in (tbl.to_batches(batch_size) if batch_size is not None else tbl.to_batches()):
185+
yield b
186+
187+
reader = pa.RecordBatchReader.from_batches(schema, batches())
188+
return reader, schema
189+
190+
191+
@_utils.check_optional_dependency(deltalake, "deltalake")
192+
@Experimental
193+
def to_deltalake_streaming(
194+
*,
195+
dfs: Iterable[pd.DataFrame],
196+
path: str,
197+
index: bool = False,
198+
mode: Literal["error", "append", "overwrite", "ignore"] = "append",
199+
dtype: dict[str, str] | None = None,
200+
partition_cols: list[str] | None = None,
201+
schema_mode: Literal["overwrite", "merge"] | None = None,
202+
lock_dynamodb_table: str | None = None,
203+
s3_allow_unsafe_rename: bool = False,
204+
boto3_session: boto3.Session | None = None,
205+
s3_additional_kwargs: dict[str, str] | None = None,
206+
batch_size: int | None = None,
207+
max_open_files: int | None = None,
208+
max_rows_per_file: int | None = None,
209+
target_file_size: int | None = None,
210+
) -> None:
211+
"""
212+
Write an iterable/generator of Pandas DataFrames to S3 as a Delta Lake table
213+
in a SINGLE atomic commit (one table version).
214+
215+
Use this for large "restatements" that are produced in chunks. Semantics mirror
216+
`to_deltalake` (partitioning, schema handling, S3 locking, etc.).
217+
218+
Notes
219+
-----
220+
- The schema is fixed by the first *non-empty* chunk (plus any `dtype` coercions).
221+
- All `partition_cols` must be present in every non-empty chunk.
222+
- Prefer `lock_dynamodb_table` over `s3_allow_unsafe_rename=True` on S3.
223+
"""
224+
dtype = dtype or {}
225+
226+
storage_options = _set_default_storage_options_kwargs(
227+
boto3_session=boto3_session,
228+
s3_additional_kwargs=s3_additional_kwargs,
229+
s3_allow_unsafe_rename=s3_allow_unsafe_rename,
230+
lock_dynamodb_table=lock_dynamodb_table,
231+
)
232+
233+
reader, schema = _df_iter_to_record_batch_reader(
234+
df_iter=dfs,
235+
index=index,
236+
dtype=dtype,
237+
target_schema=None,
238+
batch_size=batch_size,
239+
)
240+
241+
if len(schema) == 0:
242+
return
243+
244+
deltalake.write_deltalake(
245+
table_or_uri=path,
246+
data=reader,
247+
partition_by=partition_cols,
248+
mode=mode,
249+
schema_mode=schema_mode,
250+
storage_options=storage_options,
251+
max_open_files=max_open_files,
252+
max_rows_per_file=max_rows_per_file,
253+
target_file_size=target_file_size,
254+
)

0 commit comments

Comments
 (0)