@@ -138,6 +138,14 @@ def __init__(self, endpoint: str, container: str, credential: AsyncTokenCredenti
138
138
self .endpoint = endpoint
139
139
self .container = container
140
140
self .credential = credential
141
+ self .file_system_client = FileSystemClient (
142
+ account_url = self .endpoint ,
143
+ file_system_name = self .container ,
144
+ credential = self .credential ,
145
+ )
146
+
147
+ async def close_clients (self ):
148
+ await self .file_system_client .close ()
141
149
142
150
async def _ensure_directory (self , directory_path : str , user_oid : str ) -> DataLakeDirectoryClient :
143
151
"""
@@ -148,23 +156,18 @@ async def _ensure_directory(self, directory_path: str, user_oid: str) -> DataLak
148
156
directory_path: Full path of directory to create (e.g., 'user123/images/mydoc')
149
157
user_oid: The owner to set for all created directories
150
158
"""
151
- async with FileSystemClient (
152
- account_url = self .endpoint ,
153
- file_system_name = self .container ,
154
- credential = self .credential ,
155
- ) as filesystem_client :
156
- directory_client = filesystem_client .get_directory_client (directory_path )
157
- try :
158
- await directory_client .get_directory_properties ()
159
- # Check directory properties to ensure it has the correct owner
160
- props = await directory_client .get_access_control ()
161
- if props .get ("owner" ) != user_oid :
162
- raise PermissionError (f"User { user_oid } does not have permission to access { directory_path } " )
163
- except ResourceNotFoundError :
164
- logger .info ("Creating directory path %s" , directory_path )
165
- await directory_client .create_directory ()
166
- await directory_client .set_access_control (owner = user_oid )
167
- return directory_client
159
+ directory_client = self .file_system_client .get_directory_client (directory_path )
160
+ try :
161
+ await directory_client .get_directory_properties ()
162
+ # Check directory properties to ensure it has the correct owner
163
+ props = await directory_client .get_access_control ()
164
+ if props .get ("owner" ) != user_oid :
165
+ raise PermissionError (f"User { user_oid } does not have permission to access { directory_path } " )
166
+ except ResourceNotFoundError :
167
+ logger .info ("Creating directory path %s" , directory_path )
168
+ await directory_client .create_directory ()
169
+ await directory_client .set_access_control (owner = user_oid )
170
+ return directory_client
168
171
169
172
async def upload_blob (self , file : Union [File , IO ], filename : str , user_oid : str ) -> str :
170
173
"""
@@ -390,20 +393,20 @@ def __init__(
390
393
self .resource_group = resource_group
391
394
self .subscription_id = subscription_id
392
395
self .image_container = image_container
396
+ self .blob_service_client = BlobServiceClient (
397
+ account_url = self .endpoint , credential = self .credential , max_single_put_size = 4 * 1024 * 1024
398
+ )
399
+
400
+ async def close_clients (self ):
401
+ await self .blob_service_client .close ()
393
402
394
403
def get_managedidentity_connectionstring (self ):
395
404
if not self .account or not self .resource_group or not self .subscription_id :
396
405
raise ValueError ("Account, resource group, and subscription ID must be set to generate connection string." )
397
406
return f"ResourceId=/subscriptions/{ self .subscription_id } /resourceGroups/{ self .resource_group } /providers/Microsoft.Storage/storageAccounts/{ self .account } ;"
398
407
399
- async def _get_service_client (self ) -> BlobServiceClient :
400
- return BlobServiceClient (
401
- account_url = self .endpoint , credential = self .credential , max_single_put_size = 4 * 1024 * 1024
402
- )
403
-
404
408
async def upload_blob (self , file : File ) -> Optional [list [str ]]:
405
- blob_service_client = await self ._get_service_client ()
406
- container_client = blob_service_client .get_container_client (self .container )
409
+ container_client = self .blob_service_client .get_container_client (self .container )
407
410
if not await container_client .exists ():
408
411
await container_client .create_container ()
409
412
@@ -414,7 +417,6 @@ async def upload_blob(self, file: File) -> Optional[list[str]]:
414
417
logger .info ("Uploading blob for document '%s'" , blob_name )
415
418
blob_client = await container_client .upload_blob (blob_name , reopened_file , overwrite = True )
416
419
file .url = blob_client .url
417
- blob_service_client .close ()
418
420
419
421
async def upload_document_image (
420
422
self ,
@@ -432,15 +434,13 @@ async def upload_document_image(
432
434
raise ValueError (
433
435
"user_oid is not supported for BlobManager. Use AdlsBlobManager for user-specific operations."
434
436
)
435
- blob_service_client = await self ._get_service_client ()
436
- container_client = blob_service_client .get_container_client (self .container )
437
+ container_client = self .blob_service_client .get_container_client (self .container )
437
438
if not await container_client .exists ():
438
439
await container_client .create_container ()
439
440
image_bytes = self .add_image_citation (image_bytes , document_filename , image_filename , image_page_num )
440
441
blob_name = f"{ self .blob_name_from_file_name (document_filename )} /page{ image_page_num } /{ image_filename } "
441
442
logger .info ("Uploading blob for document image '%s'" , blob_name )
442
443
blob_client = await container_client .upload_blob (blob_name , image_bytes , overwrite = True )
443
- blob_service_client .close ()
444
444
return blob_client .url
445
445
446
446
async def download_blob (
@@ -450,8 +450,7 @@ async def download_blob(
450
450
raise ValueError (
451
451
"user_oid is not supported for BlobManager. Use AdlsBlobManager for user-specific operations."
452
452
)
453
- blob_service_client = await self ._get_service_client ()
454
- container_client = blob_service_client .get_container_client (self .container )
453
+ container_client = self .blob_service_client .get_container_client (self .container )
455
454
if not await container_client .exists ():
456
455
return None
457
456
if len (blob_path ) == 0 :
@@ -467,11 +466,9 @@ async def download_blob(
467
466
except ResourceNotFoundError :
468
467
logger .warning ("Blob not found: %s" , blob_path )
469
468
return None
470
- # TODO: close client properly
471
469
472
470
async def remove_blob (self , path : Optional [str ] = None ):
473
- blob_service_client = await self ._get_service_client ()
474
- container_client = blob_service_client .get_container_client (self .container )
471
+ container_client = self .blob_service_client .get_container_client (self .container )
475
472
if not await container_client .exists ():
476
473
return
477
474
if path is None :
@@ -489,4 +486,3 @@ async def remove_blob(self, path: Optional[str] = None):
489
486
continue
490
487
logger .info ("Removing blob %s" , blob_path )
491
488
await container_client .delete_blob (blob_path )
492
- blob_service_client .close ()
0 commit comments