3
3
import os
4
4
import re
5
5
from pathlib import Path
6
- from typing import IO , Optional , Union
6
+ from typing import IO , Any , Optional , TypedDict , Union
7
7
from urllib .parse import unquote
8
8
9
9
from azure .core .credentials_async import AsyncTokenCredential
10
10
from azure .core .exceptions import ResourceNotFoundError
11
11
from azure .storage .blob .aio import BlobServiceClient
12
- from azure .storage .blob .aio import (
13
- StorageStreamDownloader as BlobStorageStreamDownloader ,
14
- )
15
- from azure .storage .filedatalake .aio import DataLakeDirectoryClient , FileSystemClient
16
12
from azure .storage .filedatalake .aio import (
17
- StorageStreamDownloader as AdlsBlobStorageStreamDownloader ,
13
+ DataLakeDirectoryClient ,
14
+ FileSystemClient ,
18
15
)
19
16
from PIL import Image , ImageDraw , ImageFont
20
17
23
20
logger = logging .getLogger ("scripts" )
24
21
25
22
23
+ class BlobProperties (TypedDict , total = False ):
24
+ """Properties of a blob, with optional fields for content settings"""
25
+
26
+ content_settings : dict [str , Any ]
27
+
28
+
26
29
class BaseBlobManager :
27
30
"""
28
31
Base class for Azure Storage operations, providing common file naming and path utilities
@@ -97,13 +100,13 @@ async def upload_document_image(
97
100
image_bytes : bytes ,
98
101
image_filename : str ,
99
102
image_page_num : int ,
100
- user_oid : str ,
103
+ user_oid : Optional [ str ] = None ,
101
104
) -> Optional [str ]:
102
105
raise NotImplementedError ("Subclasses must implement this method" )
103
106
104
107
async def download_blob (
105
- self , blob_path : str , user_oid : Optional [str ] = None , as_bytes : bool = False
106
- ) -> Optional [Union [ BlobStorageStreamDownloader , AdlsBlobStorageStreamDownloader , bytes ]]:
108
+ self , blob_path : str , user_oid : Optional [str ] = None
109
+ ) -> Optional [tuple [ bytes , BlobProperties ]]:
107
110
"""
108
111
Downloads a blob from Azure Storage.
109
112
If user_oid is provided, it checks if the blob belongs to the user.
@@ -113,7 +116,9 @@ async def download_blob(
113
116
user_oid: The user's object ID (optional)
114
117
115
118
Returns:
116
- bytes: The content of the blob, or None if not found or access denied
119
+ Optional[tuple[bytes, BlobProperties]]:
120
+ - A tuple containing the blob content as bytes and the blob properties
121
+ - None if blob not found or access denied
117
122
"""
118
123
raise NotImplementedError ("Subclasses must implement this method" )
119
124
@@ -225,7 +230,7 @@ async def upload_document_image(
225
230
image_bytes : bytes ,
226
231
image_filename : str ,
227
232
image_page_num : int ,
228
- user_oid : str ,
233
+ user_oid : Optional [ str ] = None ,
229
234
) -> Optional [str ]:
230
235
"""
231
236
Uploads an image from a document to ADLS in a directory structure:
@@ -242,6 +247,8 @@ async def upload_document_image(
242
247
Returns:
243
248
str: The URL of the uploaded file, with forward slashes (not URL-encoded)
244
249
"""
250
+ if user_oid is None :
251
+ raise ValueError ("user_oid must be provided for user-specific operations." )
245
252
await self ._ensure_directory (directory_path = user_oid , user_oid = user_oid )
246
253
image_directory_path = self ._get_image_directory_path (document_filename , user_oid , image_page_num )
247
254
image_directory_client = await self ._ensure_directory (directory_path = image_directory_path , user_oid = user_oid )
@@ -252,18 +259,19 @@ async def upload_document_image(
252
259
return unquote (file_client .url )
253
260
254
261
async def download_blob (
255
- self , blob_path : str , user_oid : Optional [str ] = None , as_bytes : bool = False
256
- ) -> Optional [Union [ AdlsBlobStorageStreamDownloader , bytes ]]:
262
+ self , blob_path : str , user_oid : Optional [str ] = None
263
+ ) -> Optional [tuple [ bytes , BlobProperties ]]:
257
264
"""
258
265
Downloads a blob from Azure Data Lake Storage.
259
266
260
267
Args:
261
268
blob_path: The path to the blob in the format {user_oid}/{document_name}/images/{image_name}
262
269
user_oid: The user's object ID
263
- as_bytes: If True, returns the blob as bytes, otherwise returns a stream downloader
264
270
265
271
Returns:
266
- Optional[Union[AdlsBlobStorageStreamDownloader, bytes]]: A stream downloader for the blob, or bytes if as_bytes=True, or None if not found
272
+ Optional[tuple[bytes, BlobProperties]]:
273
+ - A tuple containing the blob content as bytes and the blob properties as a dictionary
274
+ - None if blob not found or access denied
267
275
"""
268
276
if user_oid is None :
269
277
logger .warning ("user_oid must be provided for Data Lake Storage operations." )
@@ -289,11 +297,17 @@ async def download_blob(
289
297
try :
290
298
user_directory_client = await self ._ensure_directory (directory_path = directory_path , user_oid = user_oid )
291
299
file_client = user_directory_client .get_file_client (filename )
292
- blob = await file_client .download_file ()
293
- if as_bytes :
294
- return await blob .readall ()
295
- else :
296
- return blob
300
+ download_response = await file_client .download_file ()
301
+ content = await download_response .readall ()
302
+
303
+ # Convert FileProperties to our BlobProperties format
304
+ properties : BlobProperties = {
305
+ "content_settings" : {
306
+ "content_type" : download_response .properties .get ("content_type" , "application/octet-stream" )
307
+ }
308
+ }
309
+
310
+ return content , properties
297
311
except ResourceNotFoundError :
298
312
logger .warning (f"Directory or file not found: { directory_path } /{ filename } " )
299
313
return None
@@ -449,8 +463,23 @@ async def upload_document_image(
449
463
return blob_client .url
450
464
451
465
async def download_blob (
452
- self , blob_path : str , user_oid : Optional [str ] = None , as_bytes : bool = False
453
- ) -> Optional [Union [BlobStorageStreamDownloader , AdlsBlobStorageStreamDownloader , bytes ]]:
466
+ self , blob_path : str , user_oid : Optional [str ] = None
467
+ ) -> Optional [tuple [bytes , BlobProperties ]]:
468
+ """
469
+ Downloads a blob from Azure Blob Storage.
470
+
471
+ Args:
472
+ blob_path: The path to the blob in the storage
473
+ user_oid: Not used in BlobManager, but included for API compatibility
474
+
475
+ Returns:
476
+ Optional[tuple[bytes, BlobProperties]]:
477
+ - A tuple containing the blob content as bytes and the blob properties
478
+ - None if blob not found
479
+
480
+ Raises:
481
+ ValueError: If user_oid is provided (not supported for BlobManager)
482
+ """
454
483
if user_oid is not None :
455
484
raise ValueError (
456
485
"user_oid is not supported for BlobManager. Use AdlsBlobManager for user-specific operations."
@@ -461,16 +490,33 @@ async def download_blob(
461
490
if len (blob_path ) == 0 :
462
491
logger .warning ("Blob path is empty" )
463
492
return None
493
+
464
494
blob_client = container_client .get_blob_client (blob_path )
465
495
try :
466
- blob = await blob_client .download_blob ()
467
- if not blob .properties :
496
+ download_response = await blob_client .download_blob ()
497
+ if not download_response .properties :
468
498
logger .warning (f"No blob exists for { blob_path } " )
469
499
return None
470
- if as_bytes :
471
- return await blob .readall ()
472
- else :
473
- return blob
500
+
501
+ # Get the content as bytes
502
+ content = await download_response .readall ()
503
+
504
+ # Convert BlobProperties to our internal BlobProperties format
505
+ properties : BlobProperties = {
506
+ "content_settings" : {
507
+ "content_type" : (
508
+ download_response .properties .content_settings .content_type
509
+ if (
510
+ hasattr (download_response .properties , "content_settings" )
511
+ and download_response .properties .content_settings
512
+ and hasattr (download_response .properties .content_settings , "content_type" )
513
+ )
514
+ else "application/octet-stream"
515
+ )
516
+ }
517
+ }
518
+
519
+ return content , properties
474
520
except ResourceNotFoundError :
475
521
logger .warning ("Blob not found: %s" , blob_path )
476
522
return None
0 commit comments