33import logging
44import os
55import re
6- from typing import List , Optional , Union
6+ from typing import List , Optional , Union , NamedTuple , Tuple
77
88import fitz # type: ignore
99from azure .core .credentials_async import AsyncTokenCredential
1010from azure .storage .blob import (
1111 BlobSasPermissions ,
1212 UserDelegationKey ,
13- generate_blob_sas ,
13+ generate_blob_sas ,
14+ BlobClient
1415)
1516from azure .storage .blob .aio import BlobServiceClient , ContainerClient
1617from PIL import Image , ImageDraw , ImageFont
2021
2122logger = logging .getLogger ("scripts" )
2223
23-
2424class BlobManager :
2525 """
2626 Class to manage uploading and deleting blobs containing citation information from a blob storage account
@@ -45,29 +45,63 @@ def __init__(
4545 self .subscriptionId = subscriptionId
4646 self .user_delegation_key : Optional [UserDelegationKey ] = None
4747
48+ #async def upload_blob(self, file: File, container_client:ContainerClient) -> Optional[List[str]]:
49+
50+ async def _create_new_blob (self , file : File , container_client :ContainerClient ) -> BlobClient :
51+ with open (file .content .name , "rb" ) as reopened_file :
52+ blob_name = BlobManager .blob_name_from_file_name (file .content .name )
53+ logger .info ("Uploading blob for whole file -> %s" , blob_name )
54+ return await container_client .upload_blob (blob_name , reopened_file , overwrite = True , metadata = file .metadata )
55+
56+ async def _file_blob_update_needed (self , blob_client : BlobClient , file : File ) -> bool :
57+ md5_check : int = 0 # 0= not done, 1, positive,. 2 negative
58+ # Get existing blob properties
59+ blob_properties = await blob_client .get_blob_properties ()
60+ blob_metadata = blob_properties .metadata
61+
62+ # Check if the md5 values are the same
63+ file_md5 = file .metadata .get ('md5' )
64+ blob_md5 = blob_metadata .get ('md5' )
65+
66+ # Remove md5 from file metadata if it matches the blob metadata
67+ if file_md5 and file_md5 != blob_md5 :
68+ return True
69+ else :
70+ return False
71+
4872 async def upload_blob (self , file : File ) -> Optional [List [str ]]:
4973 async with BlobServiceClient (
5074 account_url = self .endpoint , credential = self .credential , max_single_put_size = 4 * 1024 * 1024
5175 ) as service_client , service_client .get_container_client (self .container ) as container_client :
5276 if not await container_client .exists ():
5377 await container_client .create_container ()
54-
55- # Re-open and upload the original file
78+
79+ # Re-open and upload the original file
80+ md5_check : int = 0 # 0= not done, 1, positive,. 2 negative
81+
82+ # upload the file local storage zu azure storage
83+ # file.url is only None if files are not uploaded yet, for datalake it is set
5684 if file .url is None :
57- with open (file .content .name , "rb" ) as reopened_file :
58- blob_name = BlobManager .blob_name_from_file_name (file .content .name )
59- logger .info ("Uploading blob for whole file -> %s" , blob_name )
60- blob_client = await container_client .upload_blob (blob_name , reopened_file , overwrite = True )
61- file .url = blob_client .url
85+ blob_client = container_client .get_blob_client (file .url )
6286
63- if self .store_page_images :
87+ if not await blob_client .exists ():
88+ blob_client = await self ._create_new_blob (file , container_client )
89+ else :
90+ if self ._blob_update_needed (blob_client , file ):
91+ md5_check = 2
92+ # Upload the file with the updated metadata
93+ with open (file .content .name , "rb" ) as data :
94+ await blob_client .upload_blob (data , overwrite = True , metadata = file .metadata )
95+ else :
96+ md5_check = 1
97+ file .url = blob_client .url
98+
99+ if md5_check != 1 and self .store_page_images :
64100 if os .path .splitext (file .content .name )[1 ].lower () == ".pdf" :
65101 return await self .upload_pdf_blob_images (service_client , container_client , file )
66102 else :
67103 logger .info ("File %s is not a PDF, skipping image upload" , file .content .name )
68104
69- return None
70-
71105 def get_managedidentity_connectionstring (self ):
72106 return f"ResourceId=/subscriptions/{ self .subscriptionId } /resourceGroups/{ self .resourceGroup } /providers/Microsoft.Storage/storageAccounts/{ self .account } ;"
73107
@@ -93,7 +127,21 @@ async def upload_pdf_blob_images(
93127
94128 for i in range (page_count ):
95129 blob_name = BlobManager .blob_image_name_from_file_page (file .content .name , i )
96- logger .info ("Converting page %s to image and uploading -> %s" , i , blob_name )
130+
131+ blob_client = container_client .get_blob_client (blob_name )
132+ do_upload : bool = True
133+ if await blob_client .exists ():
134+ # Get existing blob properties
135+ blob_properties = await blob_client .get_blob_properties ()
136+ blob_metadata = blob_properties .metadata
137+
138+ # Check if the md5 values are the same
139+ file_md5 = file .metadata .get ('md5' )
140+ blob_md5 = blob_metadata .get ('md5' )
141+ if file_md5 == blob_md5 :
142+ continue # documemt already uploaded
143+
144+ logger .debug ("Converting page %s to image and uploading -> %s" , i , blob_name )
97145
98146 doc = fitz .open (file .content .name )
99147 page = doc .load_page (i )
@@ -119,21 +167,21 @@ async def upload_pdf_blob_images(
119167 output = io .BytesIO ()
120168 new_img .save (output , format = "PNG" )
121169 output .seek (0 )
122-
123- blob_client = await container_client .upload_blob (blob_name , output , overwrite = True )
170+
171+ await blob_client .upload_blob (data = output , overwrite = True , metadata = file . metadata )
124172 if not self .user_delegation_key :
125173 self .user_delegation_key = await service_client .get_user_delegation_key (start_time , expiry_time )
126174
127- if blob_client .account_name is not None :
175+ if container_client .account_name is not None :
128176 sas_token = generate_blob_sas (
129- account_name = blob_client .account_name ,
130- container_name = blob_client .container_name ,
131- blob_name = blob_client . blob_name ,
177+ account_name = container_client .account_name ,
178+ container_name = container_client .container_name ,
179+ blob_name = blob_name ,
132180 user_delegation_key = self .user_delegation_key ,
133181 permission = BlobSasPermissions (read = True ),
134182 expiry = expiry_time ,
135183 start = start_time ,
136- )
184+ )
137185 sas_uris .append (f"{ blob_client .url } ?{ sas_token } " )
138186
139187 return sas_uris
0 commit comments