|
4 | 4 | from azure.storage.queue.aio import QueueServiceClient as QSC |
5 | 5 | from azure.storage.queue import TextBase64EncodePolicy |
6 | 6 | import azure.storage.blob as asb |
| 7 | +from azure.storage.blob.aio import BlobClient |
| 8 | +from azure.storage.blob import BlobBlock |
| 9 | +from azure.core.exceptions import HttpResponseError |
7 | 10 | import asyncio |
8 | 11 | from typing import List, Optional |
9 | 12 | import fsspec |
| 13 | +import httpx |
10 | 14 | from datetime import datetime, timedelta, timezone |
| 15 | +from typing import TypeAlias, Literal |
| 16 | +from uuid import uuid4 |
| 17 | + |
| 18 | +HTTPX_METHODS: TypeAlias = Literal["GET", "POST"] |
11 | 19 |
|
12 | 20 |
|
13 | 21 | def def_cos(db_name, client_name): |
@@ -69,6 +77,66 @@ def __init__(self, connection_string=os.environ["Synblob"]): |
69 | 77 | } |
70 | 78 | self.stor = stor |
71 | 79 |
|
| 80 | + async def stream_dl( |
| 81 | + self, |
| 82 | + client: httpx.AsyncClient, |
| 83 | + method: HTTPX_METHODS, |
| 84 | + url: str, |
| 85 | + path: str, |
| 86 | + /, |
| 87 | + recurs=False, |
| 88 | + **httpx_extras, |
| 89 | + ) -> None: |
| 90 | + """ |
| 91 | + Help on method stream_dl |
| 92 | +
|
| 93 | + async stream_dl(client, method, url, **httpx_extras) |
| 94 | + Download file streaming in chunks in async as downloader and to a Blob |
| 95 | +
|
| 96 | + Parameters |
| 97 | + ---------- |
| 98 | + client: httpx.AsyncClient |
| 99 | + The httpx Async Client object to use |
| 100 | + method: |
| 101 | + The HTTP method whether GET or POST |
| 102 | + url: |
| 103 | + The URL to download |
| 104 | + path: |
| 105 | + The full path to Azure file being saved |
| 106 | + **httpx_extras |
| 107 | + Any extra arguments to be sent to client.stream |
| 108 | + """ |
| 109 | + async with BlobClient.from_connection_string( |
| 110 | + self.connection_string, *(path.split("/", maxsplit=1)) |
| 111 | + ) as target, client.stream(method, url, **httpx_extras) as resp: |
| 112 | + resp.raise_for_status() |
| 113 | + block_list = [] |
| 114 | + async for chunk in resp.aiter_bytes(): |
| 115 | + block_id = uuid4().hex |
| 116 | + try: |
| 117 | + await target.stage_block(block_id=block_id, data=chunk) |
| 118 | + except HttpResponseError as err: |
| 119 | + if "The specified blob or block content is invalid." not in str( |
| 120 | + err |
| 121 | + ): |
| 122 | + raise err |
| 123 | + await asyncio.sleep(1) |
| 124 | + await target.commit_block_list([]) |
| 125 | + await target.delete_blob() |
| 126 | + if recurs is False: |
| 127 | + await self.stream_dl( |
| 128 | + client, |
| 129 | + method, |
| 130 | + url, |
| 131 | + path, |
| 132 | + recurs=True, |
| 133 | + httpx_extras=httpx_extras, |
| 134 | + ) |
| 135 | + else: |
| 136 | + raise err |
| 137 | + block_list.append(BlobBlock(block_id=block_id)) |
| 138 | + await target.commit_block_list(block_list) |
| 139 | + |
72 | 140 | async def exists(self, path: str): |
73 | 141 | """ |
74 | 142 | Help on method _exists in module adlfs.spec: |
|
0 commit comments