|
18 | 18 | from load_azd_env import load_azd_env |
19 | 19 |
|
20 | 20 | logger = logging.getLogger("scripts") |
21 | | - |
| 21 | +# Set the logging level for the azure package to DEBUG |
| 22 | +logging.getLogger("azure").setLevel(logging.DEBUG) |
| 23 | +logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.DEBUG) |
22 | 24 |
|
23 | 25 | class AdlsGen2Setup: |
24 | 26 | """ |
@@ -162,64 +164,65 @@ async def scan_and_upload_directories(self, directories: dict[str, DataLakeDirec |
162 | 164 | file_paths = await self.walk_files(directory_path) |
163 | 165 |
|
164 | 166 | # Upload each file collected |
| 167 | + count =0 |
| 168 | + num = len(file_paths) |
165 | 169 | for file_path in file_paths: |
166 | 170 | await self.upload_file(directory_client, file_path, directory) |
167 | | - logger.info(f"Uploaded {file_path} to {directory}") |
| 171 | + count=+1 |
| 172 | + logger.info(f"Uploaded [{count}/{num}] {directory}/{file_path}") |
168 | 173 |
|
169 | 174 | def create_service_client(self): |
170 | 175 | return DataLakeServiceClient( |
171 | 176 | account_url=f"https://{self.storage_account_name}.dfs.core.windows.net", credential=self.credentials |
172 | 177 | ) |
173 | 178 |
|
174 | 179 | async def calc_md5(self, path: str) -> str: |
| 180 | + hash_md5 = hashlib.md5() |
175 | 181 | with open(path, "rb") as file: |
176 | | - return hashlib.md5(file.read()).hexdigest() |
177 | | - |
178 | | - async def check_md5(self, path: str, md5_hash: str) -> bool: |
179 | | - # if filename ends in .md5 skip |
180 | | - if path.endswith(".md5"): |
181 | | - return True |
182 | | - |
183 | | - # if there is a file called .md5 in this directory, see if its updated |
184 | | - stored_hash = None |
185 | | - hash_path = f"{path}.md5" |
186 | | - if os.path.exists(hash_path): |
187 | | - with open(hash_path, encoding="utf-8") as md5_f: |
188 | | - stored_hash = md5_f.read() |
189 | | - |
190 | | - if stored_hash and stored_hash.strip() == md5_hash.strip(): |
191 | | - logger.info("Skipping %s, no changes detected.", path) |
192 | | - return True |
193 | | - |
194 | | - # Write the hash |
195 | | - with open(hash_path, "w", encoding="utf-8") as md5_f: |
196 | | - md5_f.write(md5_hash) |
197 | | - |
198 | | - return False |
| 182 | + for chunk in iter(lambda: file.read(4096), b""): |
| 183 | + hash_md5.update(chunk) |
| 184 | + return hash_md5.hexdigest() |
199 | 185 |
|
| 186 | + async def get_blob_md5(self, directory_client: DataLakeDirectoryClient, filename: str) -> Optional[str]: |
| 187 | + """ |
| 188 | + Retrieves the MD5 checksum from the metadata of the specified blob. |
| 189 | + """ |
| 190 | + file_client = directory_client.get_file_client(filename) |
| 191 | + try: |
| 192 | + properties = await file_client.get_file_properties() |
| 193 | + return properties.metadata.get('md5') |
| 194 | + except Exception as e: |
| 195 | + logger.error(f"Error getting blob properties for {filename}: {e}") |
| 196 | + return None |
| 197 | + |
200 | 198 | async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str, category: str = ""): |
201 | 199 | # Calculate MD5 hash once |
202 | 200 | md5_hash = await self.calc_md5(file_path) |
203 | 201 |
|
204 | | - # Check if the file has been uploaded or if it has changed |
205 | | - if await self.check_md5(file_path, md5_hash): |
206 | | - logger.info("File %s has already been uploaded, skipping upload.", file_path) |
207 | | - return # Skip uploading if the MD5 check indicates no changes |
208 | | - |
209 | | - # Proceed with the upload since the file has changed |
210 | | - with open(file=file_path, mode="rb") as f: |
211 | | - file_client = directory_client.get_file_client(file=os.path.basename(file_path)) |
212 | | - tmtime = os.path.getmtime(file_path) |
213 | | - last_modified = datetime.fromtimestamp(tmtime).isoformat() |
214 | | - title = os.path.splitext(os.path.basename(file_path))[0] |
215 | | - metadata = { |
216 | | - "md5": md5_hash, |
217 | | - "category": category, |
218 | | - "updated": last_modified, |
219 | | - "title": title |
220 | | - } |
221 | | - await file_client.upload_data(f, overwrite=True, metadata=metadata) |
222 | | - logger.info("File %s uploaded with metadata %s.", file_path, metadata) |
| 202 | + # Get the filename |
| 203 | + filename = os.path.basename(file_path) |
| 204 | + |
| 205 | + # Get the MD5 checksum from the blob metadata |
| 206 | + blob_md5 = await self.get_blob_md5(directory_client, filename) |
| 207 | + |
| 208 | + # Upload the file if it does not exist or the checksum differs |
| 209 | + if blob_md5 is None or md5_hash != blob_md5: |
| 210 | + with open(file_path, "rb") as f: |
| 211 | + file_client = directory_client.get_file_client(filename) |
| 212 | + tmtime = os.path.getmtime(file_path) |
| 213 | + last_modified = datetime.fromtimestamp(tmtime).isoformat() |
| 214 | + title = os.path.splitext(filename)[0] |
| 215 | + metadata = { |
| 216 | + "md5": md5_hash, |
| 217 | + "category": category, |
| 218 | + "updated": last_modified, |
| 219 | + "title": title |
| 220 | + } |
| 221 | + await file_client.upload_data(f, overwrite=True) |
| 222 | + await file_client.set_metadata(metadata) |
| 223 | + logger.info(f"Uploaded and updated metadata for {filename}") |
| 224 | + else: |
| 225 | + logger.info(f"No upload needed for {filename}, checksums match") |
223 | 226 |
|
224 | 227 | async def create_or_get_group(self, group_name: str): |
225 | 228 | group_id = None |
@@ -296,6 +299,6 @@ async def main(args: Any): |
296 | 299 | args = parser.parse_args() |
297 | 300 | if args.verbose: |
298 | 301 | logging.basicConfig() |
299 | | - logging.getLogger().setLevel(logging.INFO) |
| 302 | + logging.getLogger().setLevel(logging.INFO) |
300 | 303 |
|
301 | 304 | asyncio.run(main(args)) |
0 commit comments