Skip to content

Commit f13a004

Browse files
committed
fix_email_error_add_abfs_features
1 parent 4ff25b4 commit f13a004

File tree

5 files changed

+102
-129
lines changed

5 files changed

+102
-129
lines changed

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "dean_utils"
7-
version="0.0.51"
7+
version="0.0.52"
88
authors=[
99
{ name="Dean MacGregor", email="powertrading121@gmail.com"}
1010
]
@@ -80,7 +80,7 @@ ignore = [
8080
# Lints below are turned off because of conflicts with the ruff formatter
8181
"D206",
8282
"W191",
83-
"D100", "D101", "D102", "D103"
83+
"D100", "D101", "D102", "D103", "SIM112"
8484
]
8585

8686
[tool.ruff.lint.pycodestyle]

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ fsspec
66
azure-storage-queue
77
python-multipart
88
azure-communication-email
9-
httpx
9+
httpx
10+
aiopath

src/dean_utils/__init__.py

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,19 @@
11
__all__ = [
22
"async_abfs",
3+
"az_send",
34
"clear_messages",
4-
"peek_messages",
5-
"get_queue_properties",
6-
"send_message",
7-
"update_queue",
85
"delete_message",
9-
"send_email",
10-
"az_send",
6+
"get_queue_properties",
7+
"global_async_client",
8+
"peek_messages",
119
"pl_scan_hive",
1210
"pl_scan_pq",
13-
"pl_write_pq",
1411
"pl_write_delta_append",
15-
"global_async_client",
12+
"pl_write_pq",
13+
"send_message",
14+
"update_queue",
1615
]
1716
import contextlib
18-
from collections.abc import Iterable
19-
from typing import cast
2017

2118
from dean_utils.polars_extras import (
2219
pl_scan_hive,
@@ -27,6 +24,8 @@
2724
with contextlib.suppress(ImportError):
2825
from dean_utils.polars_extras import pl_write_delta_append
2926

27+
from pathlib import Path
28+
3029
from dean_utils.utils.az_utils import (
3130
async_abfs,
3231
clear_messages,
@@ -36,33 +35,33 @@
3635
send_message,
3736
update_queue,
3837
)
39-
from dean_utils.utils.email_utility import az_send, send_email
38+
from dean_utils.utils.email_utility import az_send
4039
from dean_utils.utils.httpx import global_async_client
4140

4241

4342
def error_email(func, attempts=1):
4443
def wrapper(*args, **kwargs):
45-
subject = None
44+
subject = Path.cwd()
4645
errors = []
4746
for _ in range(attempts):
4847
try:
4948
return func(*args, **kwargs)
5049
except Exception as err:
5150
import inspect
52-
from pathlib import Path
5351
from traceback import format_exception
5452

55-
if subject is None:
56-
subject = Path.cwd()
57-
errors.append(
58-
"\n".join(cast(Iterable[str], inspect.stack()))
59-
+ "\n\n"
60-
+ "\n".join(format_exception(err))
53+
filt_stack = "\n".join(
54+
[
55+
str(x)
56+
for x in inspect.stack()[1:]
57+
if "site-packages" not in x.filename
58+
]
6159
)
62-
if subject is not None:
63-
az_send(
64-
str(subject),
65-
"\n".join(errors),
66-
)
60+
errors.append("\n".join(["\n".join(format_exception(err)), filt_stack]))
61+
62+
az_send(
63+
str(subject),
64+
"\n".join(errors),
65+
)
6766

6867
return wrapper

src/dean_utils/utils/az_utils.py

Lines changed: 65 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import azure.storage.blob as asb
1212
import fsspec
13+
from aiopath import AsyncPath
1314
from azure.core.exceptions import HttpResponseError
1415
from azure.storage.blob import BlobBlock
1516
from azure.storage.blob.aio import BlobClient
@@ -18,10 +19,11 @@
1819

1920
if TYPE_CHECKING:
2021
import httpx
22+
from azure.storage.blob.aio import StorageStreamDownloader
2123
from azure.storage.queue import QueueMessage
2224

2325
HTTPX_METHODS: TypeAlias = Literal["GET", "POST"]
24-
AIO_SERVE = QSC.from_connection_string(conn_str=os.environ["AzureWebJobsStorage"]) # noqa: SIM112
26+
AIO_SERVE = QSC.from_connection_string(conn_str=os.environ["AzureWebJobsStorage"])
2527

2628

2729
async def peek_messages(queue: str, max_messages: int | None = None, **kwargs):
@@ -149,7 +151,7 @@ async def clear_messages(
149151

150152

151153
class async_abfs:
152-
def __init__(self, connection_string=os.environ["Synblob"]): # noqa: SIM112
154+
def __init__(self, connection_string=os.environ["Synblob"]):
153155
self.connection_string = connection_string
154156
self.sync = fsspec.filesystem("abfss", connection_string=self.connection_string)
155157
key_conv = {"AccountName": "account_name", "AccountKey": "account_key"}
@@ -189,13 +191,12 @@ async def stream_dl(
189191
**httpx_extras,
190192
) -> None:
191193
"""
192-
Help on method stream_dl.
194+
stream_dl will stream the contents of a url to a path in the cloud given an httpx Client.
193195
194-
async stream_dl(client, method, url, **httpx_extras)
196+
async stream_dl(client, method, url, path, recurs, **httpx_extras)
195197
Download file streaming in chunks in async as downloader and to a Blob
196198
197-
Parameters
198-
----------
199+
Args:
199200
client: httpx.AsyncClient
200201
The httpx Async Client object to use
201202
method:
@@ -204,7 +205,9 @@ async def stream_dl(
204205
The URL to download
205206
path:
206207
The full path to Azure file being saved
207-
**httpx_extras
208+
recurs:
209+
To try again recursively
210+
httpx_extras
208211
Any extra arguments to be sent to client.stream
209212
"""
210213
async with (
@@ -216,7 +219,7 @@ async def stream_dl(
216219
resp.raise_for_status()
217220
block_list = []
218221
async for chunk in resp.aiter_bytes():
219-
chunk = cast(IO, chunk)
222+
chunk = cast("IO", chunk)
220223
block_id = uuid4().hex
221224
try:
222225
await target.stage_block(block_id=block_id, data=chunk)
@@ -244,60 +247,63 @@ async def stream_dl(
244247

245248
async def stream_up(
246249
self,
247-
local_path: str | Path,
250+
local_path: str | Path | AsyncPath,
248251
remote_path: str,
249252
size: int = 16384,
250253
/,
251254
recurs=False,
252255
) -> None:
253256
"""
254-
Help on method stream_dl.
257+
Help on method stream_up.
255258
256-
async stream_dl(client, method, url, **httpx_extras)
259+
async stream_up(local_path, remote_path, size, recurs)
257260
Download file streaming in chunks in async as downloader and to a Blob
258261
259-
Parameters
260-
----------
262+
Args:
261263
local_path:
262264
The full path to local path as str or Path
263265
remote_path:
264266
The full path to remote path as str
265267
size:
266268
The number of bytes read per iteration in read
269+
recurs:
270+
To try again recursively
267271
"""
268-
if isinstance(local_path, str):
269-
local_path = Path(local_path)
270-
async with BlobClient.from_connection_string(
271-
self.connection_string, *(remote_path.split("/", maxsplit=1))
272-
) as target:
273-
with local_path.open("rb") as src:
274-
block_list = []
275-
while True:
276-
chunk = src.read(size)
277-
chunk = cast(IO, chunk)
278-
if not chunk:
279-
break
280-
block_id = uuid4().hex
281-
try:
282-
await target.stage_block(block_id=block_id, data=chunk)
283-
except HttpResponseError as err:
284-
if "The specified blob or block content is invalid." not in str(
285-
err
286-
):
287-
raise
288-
await asyncio.sleep(1)
289-
await target.commit_block_list([])
290-
await target.delete_blob()
291-
if recurs is False:
292-
await self.stream_up(
293-
local_path,
294-
remote_path,
295-
recurs=True,
296-
)
297-
else:
298-
raise
299-
block_list.append(BlobBlock(block_id=block_id))
300-
await target.commit_block_list(block_list)
272+
if isinstance(local_path, (str, Path)):
273+
local_path = AsyncPath(local_path)
274+
async with (
275+
BlobClient.from_connection_string(
276+
self.connection_string, *(remote_path.split("/", maxsplit=1))
277+
) as target,
278+
local_path.open("rb") as src,
279+
):
280+
block_list = []
281+
while True:
282+
chunk = await src.read(size)
283+
chunk = cast("IO", chunk)
284+
if not chunk:
285+
break
286+
block_id = uuid4().hex
287+
try:
288+
await target.stage_block(block_id=block_id, data=chunk)
289+
except HttpResponseError as err:
290+
if "The specified blob or block content is invalid." not in str(
291+
err
292+
):
293+
raise
294+
await asyncio.sleep(1)
295+
await target.commit_block_list([])
296+
await target.delete_blob()
297+
if recurs is False:
298+
await self.stream_up(
299+
local_path,
300+
remote_path,
301+
recurs=True,
302+
)
303+
else:
304+
raise
305+
block_list.append(BlobBlock(block_id=block_id))
306+
await target.commit_block_list(block_list)
301307

302308
async def walk(self, path: str, maxdepth=None, **kwargs):
303309
"""
@@ -312,16 +318,16 @@ async def walk(self, path: str, maxdepth=None, **kwargs):
312318
Note that the "files" outputted will include anything that is not
313319
a directory, such as links.
314320
315-
Parameters
316-
----------
321+
Args:
317322
path: str
318323
Root to recurse into
319324
320325
maxdepth: int
321326
Maximum recursion depth. None means limitless, but not recommended
322327
on link-based file-systems.
323328
324-
**kwargs are passed to ``ls``
329+
kwargs:
330+
dict of args passed to ``ls``
325331
"""
326332
this_fs = fsspec.filesystem(
327333
"abfss", connection_string=self.connection_string, asyncronous=True
@@ -359,8 +365,7 @@ async def details(
359365
AzureBlobFileSystem instance
360366
Return a list of dictionaries of specifying details about the contents
361367
362-
Parameters
363-
----------
368+
Args:
364369
contents
365370
366371
delimiter: str
@@ -442,8 +447,7 @@ async def ls(
442447
versions: bool = False, **kwargs) method of adlfs.spec.AzureBlobFileSystem instance
443448
Create a list of blob names from a blob container
444449
445-
Parameters
446-
----------
450+
Args:
447451
path: str
448452
Path to an Azure Blob with its container name
449453
@@ -486,8 +490,7 @@ async def rm(
486490
"""
487491
Delete files.
488492
489-
Parameters
490-
----------
493+
Args:
491494
path: str or list of str
492495
File(s) to delete.
493496
recursive: bool
@@ -538,3 +541,10 @@ def make_sas_link(self, filepath, expiry=None, write=False):
538541
expiry=expiry,
539542
)
540543
return f"https://{account_dict['AccountName']}.blob.core.windows.net/{filepath}?{sas}"
544+
545+
async def stream(self, path: str) -> StorageStreamDownloader[bytes]:
546+
blob = BlobClient.from_connection_string(
547+
self.connection_string, *(path.split("/", maxsplit=1))
548+
)
549+
stream = await blob.download_blob()
550+
return stream

0 commit comments

Comments
 (0)