Skip to content

Commit 8544e15

Browse files
committed
ver_update
1 parent bb65c36 commit 8544e15

File tree

5 files changed

+190
-147
lines changed

5 files changed

+190
-147
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "dean_utils"
7-
version="0.0.46"
7+
version="0.0.47"
88
authors=[
99
{ name="Dean MacGregor", email="powertrading121@gmail.com"}
1010
]

src/dean_utils/__init__.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,33 +14,39 @@
1414
"pl_write_delta_append",
1515
"global_async_client",
1616
]
17+
import contextlib
18+
from collections.abc import Iterable
19+
from typing import cast
20+
21+
from dean_utils.polars_extras import (
22+
pl_scan_hive,
23+
pl_scan_pq,
24+
pl_write_pq,
25+
)
26+
27+
with contextlib.suppress(ImportError):
28+
from dean_utils.polars_extras import pl_write_delta_append
29+
1730
from dean_utils.utils.az_utils import (
1831
async_abfs,
19-
peek_messages,
32+
clear_messages,
33+
delete_message,
2034
get_queue_properties,
35+
peek_messages,
2136
send_message,
2237
update_queue,
23-
delete_message,
24-
clear_messages,
25-
)
26-
from dean_utils.utils.email_utility import send_email, az_send
27-
from dean_utils.polars_extras import (
28-
pl_scan_hive,
29-
pl_scan_pq,
30-
pl_write_pq,
31-
pl_write_delta_append,
3238
)
39+
from dean_utils.utils.email_utility import az_send, send_email
3340
from dean_utils.utils.httpx import global_async_client
34-
from typing import cast, Iterable
3541

3642

3743
def error_email(func):
3844
def wrapper(*args, **kwargs):
3945
try:
4046
return func(*args, **kwargs)
4147
except Exception as err:
42-
import os
4348
import inspect
49+
import os
4450
from traceback import format_exception
4551

4652
email_body = (
@@ -49,7 +55,7 @@ def wrapper(*args, **kwargs):
4955
+ "\n".join(format_exception(err))
5056
)
5157
az_send(
52-
os.getcwd(),
58+
os.getcwd(), # noqa: PTH109
5359
email_body,
5460
)
5561

src/dean_utils/polars_extras.py

Lines changed: 79 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,34 @@
1-
import polars as pl
2-
from polars.type_aliases import (
3-
ParallelStrategy,
4-
ParquetCompression,
5-
ColumnNameOrSelector,
6-
)
7-
import pyarrow.parquet as pq
8-
import fsspec
1+
from __future__ import annotations
2+
3+
import contextlib
94
import os
10-
from typing import Any, Sequence, cast
115
from inspect import signature
126
from pathlib import Path
7+
from typing import TYPE_CHECKING, Any, cast
138

14-
try:
15-
from deltalake import DeltaTable, WriterProperties
16-
except ModuleNotFoundError:
17-
pass
18-
abfs = fsspec.filesystem("abfss", connection_string=os.environ["Synblob"])
9+
import fsspec
10+
import polars as pl
11+
import pyarrow.parquet as pq
12+
13+
if TYPE_CHECKING:
14+
from collections.abc import Sequence
15+
16+
from deltalake import DeltaTable
17+
from polars.type_aliases import (
18+
ColumnNameOrSelector,
19+
ParallelStrategy,
20+
ParquetCompression,
21+
)
22+
23+
24+
abfs = fsspec.filesystem("abfss", connection_string=os.environ["Synblob"]) # noqa: SIM112
1925

2026
key_conv = {"AccountName": "account_name", "AccountKey": "account_key"}
21-
stor = {(splt := x.split("=", 1))[0]: splt[1] for x in os.environ["Synblob"].split(";")}
22-
stor = {key_conv[key]: val for key, val in stor.items() if key in key_conv.keys()}
27+
stor = {(splt := x.split("=", 1))[0]: splt[1] for x in os.environ["Synblob"].split(";")} # noqa: SIM112
28+
stor = {key_conv[key]: val for key, val in stor.items() if key in key_conv}
2329

2430

25-
def pl_scan_pq(
31+
def pl_scan_pq( # noqa: D417
2632
source: str,
2733
*,
2834
n_rows: int | None = None,
@@ -39,7 +45,10 @@ def pl_scan_pq(
3945
**kwargs,
4046
) -> pl.LazyFrame:
4147
"""
42-
# wrapper for pl.scan_parquet that prepends abfs:// to the path, injects user credentials from Synblob env variable, and sets hive to False
48+
Wrapper for pl.scan_parquet.
49+
50+
Prepends abfs:// to the path, injects user credentials from
51+
Synblob env variable, and sets hive to False.
4352
4453
Parameters
4554
----------
@@ -69,31 +78,32 @@ def pl_scan_pq(
6978
retries
7079
Number of retries if accessing a cloud instance fails.
7180
include_file_paths
72-
Include the path of the source file(s) as a column with this name."""
81+
Include the path of the source file(s) as a column with this name.
82+
"""
7383
if storage_options is None:
7484
storage_options = stor
75-
named = dict(
76-
n_rows=n_rows,
77-
cache=cache,
78-
parallel=parallel,
79-
rechunk=rechunk,
80-
row_index_name=row_index_name,
81-
row_index_offset=row_index_offset,
82-
low_memory=low_memory,
83-
use_statistics=use_statistics,
84-
retries=retries,
85-
storage_options=storage_options,
86-
hive_partitioning=False,
87-
include_file_paths=include_file_paths,
88-
)
85+
named = {
86+
"n_rows": n_rows,
87+
"cache": cache,
88+
"parallel": parallel,
89+
"rechunk": rechunk,
90+
"row_index_name": row_index_name,
91+
"row_index_offset": row_index_offset,
92+
"low_memory": low_memory,
93+
"use_statistics": use_statistics,
94+
"retries": retries,
95+
"storage_options": storage_options,
96+
"hive_partitioning": False,
97+
"include_file_paths": include_file_paths,
98+
}
8999
renamed = [
90100
("row_index_name", "row_count_name"),
91101
("row_index_offset", "row_count_offset"),
92102
]
93103
for rename in renamed:
94104
for ordered in [-1, 1]:
95105
if (
96-
rename[::ordered][0] in signature(pl.scan_parquet).parameters.keys()
106+
rename[::ordered][0] in signature(pl.scan_parquet).parameters
97107
and rename[::ordered][1] in kwargs
98108
):
99109
named[rename[::ordered][0]] = kwargs[rename[::ordered][1]]
@@ -105,7 +115,7 @@ def pl_scan_pq(
105115
)
106116

107117

108-
def pl_scan_hive(
118+
def pl_scan_hive( # noqa: D417
109119
source: str,
110120
*,
111121
n_rows: int | None = None,
@@ -122,7 +132,9 @@ def pl_scan_hive(
122132
**kwargs,
123133
) -> pl.LazyFrame:
124134
"""
125-
# wrapper for pl.scan_parquet that prepends abfs:// to the path, injects user credentials from Synblob env variable, and sets hive to False
135+
Wrapper for pl.scan_parquet that prepends abfs:// to the path.
136+
137+
Injects user credentials from Synblob env variable, and sets hive to False.
126138
127139
Parameters
128140
----------
@@ -150,31 +162,32 @@ def pl_scan_hive(
150162
cache
151163
Cache the result after reading.
152164
retries
153-
Number of retries if accessing a cloud instance fails."""
165+
Number of retries if accessing a cloud instance fails.
166+
"""
154167
if storage_options is None:
155168
storage_options = stor
156-
named = dict(
157-
n_rows=n_rows,
158-
cache=cache,
159-
parallel=parallel,
160-
rechunk=rechunk,
161-
row_count_name=row_count_name,
162-
row_count_offset=row_count_offset,
163-
low_memory=low_memory,
164-
use_statistics=use_statistics,
165-
retries=retries,
166-
storage_options=storage_options,
167-
hive_partitioning=True,
168-
include_file_paths=include_file_paths,
169-
)
169+
named = {
170+
"n_rows": n_rows,
171+
"cache": cache,
172+
"parallel": parallel,
173+
"rechunk": rechunk,
174+
"row_count_name": row_count_name,
175+
"row_count_offset": row_count_offset,
176+
"low_memory": low_memory,
177+
"use_statistics": use_statistics,
178+
"retries": retries,
179+
"storage_options": storage_options,
180+
"hive_partitioning": True,
181+
"include_file_paths": include_file_paths,
182+
}
170183
renamed = [
171184
("row_index_name", "row_count_name"),
172185
("row_index_offset", "row_count_offset"),
173186
]
174187
for rename in renamed:
175188
for ordered in [-1, 1]:
176189
if (
177-
rename[::ordered][0] in signature(pl.scan_parquet).parameters.keys()
190+
rename[::ordered][0] in signature(pl.scan_parquet).parameters
178191
and rename[::ordered][1] in kwargs
179192
):
180193
named[rename[::ordered][0]] = kwargs[rename[::ordered][1]]
@@ -184,7 +197,7 @@ def pl_scan_hive(
184197
)
185198

186199

187-
def pl_write_pq(
200+
def pl_write_pq( # noqa: D417
188201
self,
189202
file: str,
190203
*,
@@ -196,6 +209,7 @@ def pl_write_pq(
196209
) -> None:
197210
"""
198211
Write to Apache Parquet file with pyarrow writer.
212+
199213
Defaults to writing to Azure cloud as defined by Synblob env variable.
200214
201215
Parameters
@@ -240,7 +254,7 @@ def pl_write_pq(
240254
writer.write_table(row_group.to_arrow())
241255

242256

243-
def pl_write_delta_append(
257+
def pl_write_delta_append( # noqa: D417
244258
df: pl.DataFrame,
245259
target: str | Path | DeltaTable,
246260
*,
@@ -250,8 +264,10 @@ def pl_write_delta_append(
250264
"""
251265
Appends DataFrame to delta table and auto computes range partition.
252266
253-
Parameters
254-
----------
267+
Parameters
268+
----------
269+
df
270+
df to be saved
255271
target
256272
URI of a table or a DeltaTable object.
257273
storage_options
@@ -265,6 +281,8 @@ def pl_write_delta_append(
265281
Additional keyword arguments while writing a Delta lake Table.
266282
See a list of supported write options `here <https://delta-io.github.io/delta-rs/api/delta_writer/#deltalake.write_deltalake>`__.
267283
"""
284+
from deltalake import DeltaTable, WriterProperties
285+
268286
if isinstance(target, (str, Path)):
269287
target = DeltaTable(target, storage_options=storage_options)
270288
add_actions = cast(pl.DataFrame, pl.from_arrow(target.get_add_actions()))
@@ -294,7 +312,7 @@ def pl_write_delta_append(
294312
ranges, left_on=partition_col, right_on="min_id"
295313
)
296314
assert df.height == initial_height
297-
assert df.filter((pl.col(partition_col) < pl.col("min_id"))).height == 0
315+
assert df.filter(pl.col(partition_col) < pl.col("min_id")).height == 0
298316
df = df.drop("min_id", "max_id")
299317
assert isinstance(target, DeltaTable)
300318

@@ -321,8 +339,8 @@ def pl_write_delta_append(
321339
)
322340

323341

324-
setattr(pl, "scan_pq", pl_scan_pq)
325-
setattr(pl, "scan_hive", pl_scan_hive)
342+
setattr(pl, "scan_pq", pl_scan_pq) # noqa: B010
343+
setattr(pl, "scan_hive", pl_scan_hive) # noqa: B010
326344
DF = pl.DataFrame
327-
setattr(DF, "write_pq", pl_write_pq)
328-
setattr(pl, "DataFrame", DF)
345+
setattr(DF, "write_pq", pl_write_pq) # noqa: B010
346+
setattr(pl, "DataFrame", DF) # noqa: B010

0 commit comments

Comments
 (0)