From a608e572ab9cac58f56ad16378a876e6a0d1f2b3 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Mon, 4 Nov 2024 08:17:36 +0100 Subject: [PATCH 1/9] add scan option --- scripts/adlsgen2setup.py | 126 +++++++++++++++++++++++++++++++++++---- 1 file changed, 113 insertions(+), 13 deletions(-) diff --git a/scripts/adlsgen2setup.py b/scripts/adlsgen2setup.py index 1deccdf199..7f9aee55d5 100644 --- a/scripts/adlsgen2setup.py +++ b/scripts/adlsgen2setup.py @@ -1,8 +1,10 @@ import argparse import asyncio +import datetime import json import logging import os +import hashlib from typing import Any, Optional import aiohttp @@ -56,7 +58,7 @@ def __init__( self.data_access_control_format = data_access_control_format self.graph_headers: Optional[dict[str, str]] = None - async def run(self): + async def run(self, scandirs: bool = False): async with self.create_service_client() as service_client: logger.info(f"Ensuring {self.filesystem_name} exists...") async with service_client.get_file_system_client(self.filesystem_name) as filesystem_client: @@ -80,15 +82,17 @@ async def run(self): ) directories[directory] = directory_client + logger.info("Uploading scanned files...") + if scandirs and directory != "/": + await self.scan_and_upload_directories(directories, filesystem_client) + logger.info("Uploading files...") for file, file_info in self.data_access_control_format["files"].items(): directory = file_info["directory"] if directory not in directories: logger.error(f"File {file} has unknown directory {directory}, exiting...") return - await self.upload_file( - directory_client=directories[directory], file_path=os.path.join(self.data_directory, file) - ) + await self.upload_file(directory_client=directories[directory], file_path=os.path.join(self.data_directory, file)) logger.info("Setting access control...") for directory, access_control in self.data_access_control_format["directories"].items(): @@ -100,8 +104,7 @@ async def run(self): f"Directory {directory} has unknown group {group_name} in access control list, exiting" ) return - await directory_client.update_access_control_recursive( - acl=f"group:{groups[group_name]}:r-x" + await directory_client.update_access_control_recursive(acl=f"group:{groups[group_name]}:r-x" ) if "oids" in access_control: for oid in access_control["oids"]: @@ -110,15 +113,110 @@ async def run(self): for directory_client in directories.values(): await directory_client.close() + async def walk_files(self, src_filepath = "."): + filepath_list = [] + + #This for loop uses the os.walk() function to walk through the files and directories + #and records the filepaths of the files to a list + for root, dirs, files in os.walk(src_filepath): + + #iterate through the files currently obtained by os.walk() and + #create the filepath string for that file and add it to the filepath_list list + for file in files: + #Checks to see if the root is '.' and changes it to the correct current + #working directory by calling os.getcwd(). Otherwise root_path will just be the root variable value. + if root == '.': + root_path = os.getcwd() + "/" + else: + root_path = root + + filepath = os.path.join(root_path, file) + + #Appends filepath to filepath_list if filepath does not currently exist in filepath_list + if filepath not in filepath_list: + filepath_list.append(filepath) + + #Return filepath_list + return filepath_list + + async def scan_and_upload_directories(self, directories: dict[str, DataLakeDirectoryClient], filesystem_client): + logger.info("Scanning and uploading files from directories recursively...") + for directory, directory_client in directories.items(): + directory_path = os.path.join(self.data_directory, directory) + + # Überprüfen, ob 'scandir' existiert und auf False gesetzt ist + if not self.data_access_control_format["directories"][directory].get("scandir", True): + logger.info(f"Skipping directory {directory} as 'scandir' is set to False") + continue + + groups = self.data_access_control_format["directories"][directory].get("groups", []) + + # Check if the directory exists before walking it + if not os.path.exists(directory_path): + logger.warning(f"Directory does not exist: {directory_path}") + continue + + # Get all file paths using the walk_files function + file_paths = await self.walk_files(directory_path) + + # Upload each file collected + for file_path in file_paths: + await self.upload_file(directory_client, file_path) + logger.info(f"Uploaded {file_path} to {directory}") + def create_service_client(self): return DataLakeServiceClient( account_url=f"https://{self.storage_account_name}.dfs.core.windows.net", credential=self.credentials ) - async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str): + async def calc_md5(self, path: str) -> str: + with open(path, "rb") as file: + return hashlib.md5(file.read()).hexdigest() + + async def check_md5(self, path: str, md5_hash: str) -> bool: + # if filename ends in .md5 skip + if path.endswith(".md5"): + return True + + # if there is a file called .md5 in this directory, see if its updated + stored_hash = None + hash_path = f"{path}.md5" + os.path.exists(hash_path): + with open(hash_path, encoding="utf-8") as md5_f: + stored_hash = md5_f.read() + + if stored_hash and stored_hash.strip() == md5_hash.strip(): + logger.info("Skipping %s, no changes detected.", path) + return True + + # Write the hash + with open(hash_path, "w", encoding="utf-8") as md5_f: + md5_f.write(md5_hash) + + return False + + async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str, category: str): + # Calculate MD5 hash once + md5_hash = await self.calc_md5(file_path) + + # Check if the file has been uploaded or if it has changed + if await self.check_md5(file_path, md5_hash): + logger.info("File %s has already been uploaded, skipping upload.", file_path) + return # Skip uploading if the MD5 check indicates no changes + + # Proceed with the upload since the file has changed with open(file=file_path, mode="rb") as f: file_client = directory_client.get_file_client(file=os.path.basename(file_path)) - await file_client.upload_data(f, overwrite=True) + last_modified = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() + title = os.path.splitext(os.path.basename(file_path))[0] + metadata = { + "md5": md5_hash, + "category": category, + "updated": last_modified, + "title": title + } + await file_client.upload_data(f, overwrite=True, metadata=metadata) + logger.info("File %s uploaded with metadata %s.", file_path, metadata) async def create_or_get_group(self, group_name: str): group_id = None @@ -144,6 +242,7 @@ async def create_or_get_group(self, group_name: str): # If Unified does not work for you, then you may need the following settings instead: # "mailEnabled": False, # "mailNickname": group_name, + } async with session.post("https://graph.microsoft.com/v1.0/groups", json=group) as response: content = await response.json() @@ -165,19 +264,19 @@ async def main(args: Any): data_access_control_format = json.load(f) command = AdlsGen2Setup( data_directory=args.data_directory, - storage_account_name=os.environ["AZURE_ADLS_GEN2_STORAGE_ACCOUNT"], - filesystem_name="gptkbcontainer", + storage_account_name=os.environ["AZURE_ADLS_GEN2_STORAGE_ACCOUNT"], + filesystem_name=os.environ["AZURE_ADLS_GEN2_FILESYSTEM"], security_enabled_groups=args.create_security_enabled_groups, credentials=credentials, data_access_control_format=data_access_control_format, ) - await command.run() + await command.run(args.scandirs) if __name__ == "__main__": parser = argparse.ArgumentParser( - description="Upload sample data to a Data Lake Storage Gen2 account and associate sample access control lists with it using sample groups", - epilog="Example: ./scripts/adlsgen2setup.py ./data --data-access-control ./scripts/sampleacls.json --create-security-enabled-groups ", + description="Upload data to a Data Lake Storage Gen2 account and associate access control lists with it using sample groups", + epilog="Example: ./scripts/adlsgen2setup.py ./data --data-access-control .azure/${AZURE_ENV_NAME}/docs_acls.json --create-security-enabled-groups --scandirs", ) parser.add_argument("data_directory", help="Data directory that contains sample PDFs") parser.add_argument( @@ -190,6 +289,7 @@ async def main(args: Any): "--data-access-control", required=True, help="JSON file describing access control for the sample data" ) parser.add_argument("--verbose", "-v", required=False, action="store_true", help="Verbose output") + parser.add_argument("--scandirs", required=False, action="store_true", help="Scan and upload all files from directories recursively") args = parser.parse_args() if args.verbose: logging.basicConfig() From 3206092d7f44bf03a56b6001c166fb5c32acacca Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Mon, 4 Nov 2024 08:22:59 +0100 Subject: [PATCH 2/9] add scandir option an example folder --- scripts/sampleacls.json | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/scripts/sampleacls.json b/scripts/sampleacls.json index dd2d4888fa..b83ca3708c 100644 --- a/scripts/sampleacls.json +++ b/scripts/sampleacls.json @@ -21,10 +21,16 @@ }, "directories": { "employeeinfo": { - "groups": ["GPTKB_HRTest"] + "groups": ["GPTKB_HRTest"], + "scandir": false }, "benefitinfo": { - "groups": ["GPTKB_EmployeeTest", "GPTKB_HRTest"] + "groups": ["GPTKB_EmployeeTest", "GPTKB_HRTest"], + "scandir": false + }, + "GPT4V_Examples": { + "groups": ["GPTKB_EmployeeTest", "GPTKB_HRTest"], + "scandir": true }, "/": { "groups": ["GPTKB_AdminTest"] From a482213ddfdd5411ea0a2870eaa4d62d20321c35 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Mon, 4 Nov 2024 08:32:17 +0100 Subject: [PATCH 3/9] update docs --- docs/login_and_acl.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/login_and_acl.md b/docs/login_and_acl.md index 1af26dd249..333b571b1c 100644 --- a/docs/login_and_acl.md +++ b/docs/login_and_acl.md @@ -233,7 +233,9 @@ The script performs the following steps: - Creates example [groups](https://learn.microsoft.com/entra/fundamentals/how-to-manage-groups) listed in the [sampleacls.json](/scripts/sampleacls.json) file. - Creates a filesystem / container `gptkbcontainer` in the storage account. - Creates the directories listed in the [sampleacls.json](/scripts/sampleacls.json) file. -- Uploads the sample PDFs referenced in the [sampleacls.json](/scripts/sampleacls.json) file into the appropriate directories. +- Scans the directories for files recursively if you add the option '--scandirs' (default false) cto the argument list (default off) and you don't have '"scandir": false' (default true) below the directory element in the sampleacls.json file. +- Caluclates md5 checksuk of each file refrenced anc compares with existing 'filename.ext.md5' file. Skip upload if same else upload and storenew md5 value in 'filename.ext.md5' +- Uploads the sample PDFs referenced in the [sampleacls.json](/scripts/sampleacls.json) file or files found in the folders with scandir option set to true into the appropriate directories. - [Recursively sets Access Control Lists (ACLs)](https://learn.microsoft.com/azure/storage/blobs/data-lake-storage-acl-cli) using the information from the [sampleacls.json](/scripts/sampleacls.json) file. In order to use the sample access control, you need to join these groups in your Microsoft Entra tenant. From ce4bffee1da7972d7743fd83625370e30027b6c0 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Mon, 4 Nov 2024 08:49:26 +0100 Subject: [PATCH 4/9] fix if issue --- scripts/adlsgen2setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/adlsgen2setup.py b/scripts/adlsgen2setup.py index 7f9aee55d5..719538ff8b 100644 --- a/scripts/adlsgen2setup.py +++ b/scripts/adlsgen2setup.py @@ -181,7 +181,7 @@ async def check_md5(self, path: str, md5_hash: str) -> bool: # if there is a file called .md5 in this directory, see if its updated stored_hash = None hash_path = f"{path}.md5" - os.path.exists(hash_path): + if os.path.exists(hash_path): with open(hash_path, encoding="utf-8") as md5_f: stored_hash = md5_f.read() From 285ae7dee1d88a398cc2d00da2b36fea5af6bce0 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Wed, 6 Nov 2024 19:11:50 +0100 Subject: [PATCH 5/9] fix findings, add metadata like md5 --- scripts/adlsgen2setup.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/scripts/adlsgen2setup.py b/scripts/adlsgen2setup.py index 719538ff8b..c9f2729f8c 100644 --- a/scripts/adlsgen2setup.py +++ b/scripts/adlsgen2setup.py @@ -1,6 +1,6 @@ import argparse import asyncio -import datetime +from datetime import datetime import json import logging import os @@ -83,7 +83,7 @@ async def run(self, scandirs: bool = False): directories[directory] = directory_client logger.info("Uploading scanned files...") - if scandirs and directory != "/": + if scandirs: await self.scan_and_upload_directories(directories, filesystem_client) logger.info("Uploading files...") @@ -122,35 +122,37 @@ async def walk_files(self, src_filepath = "."): #iterate through the files currently obtained by os.walk() and #create the filepath string for that file and add it to the filepath_list list + root_found: bool = False for file in files: #Checks to see if the root is '.' and changes it to the correct current - #working directory by calling os.getcwd(). Otherwise root_path will just be the root variable value. - if root == '.': - root_path = os.getcwd() + "/" - else: - root_path = root - - filepath = os.path.join(root_path, file) + #working directory by calling os.getcwd(). Otherwise root_path will just be the root variable value. + if not root_found and root == '.': + filepath =os.path.join(os.getcwd() + "/", file) + root_found = True + else: + filepath = os.path.join(root, file) + #Appends filepath to filepath_list if filepath does not currently exist in filepath_list if filepath not in filepath_list: - filepath_list.append(filepath) + filepath_list.append(filepath) #Return filepath_list return filepath_list async def scan_and_upload_directories(self, directories: dict[str, DataLakeDirectoryClient], filesystem_client): logger.info("Scanning and uploading files from directories recursively...") + for directory, directory_client in directories.items(): directory_path = os.path.join(self.data_directory, directory) + if directory == "/": + continue - # Überprüfen, ob 'scandir' existiert und auf False gesetzt ist + # Check if 'scandir' exists and is set to False if not self.data_access_control_format["directories"][directory].get("scandir", True): logger.info(f"Skipping directory {directory} as 'scandir' is set to False") continue - groups = self.data_access_control_format["directories"][directory].get("groups", []) - # Check if the directory exists before walking it if not os.path.exists(directory_path): logger.warning(f"Directory does not exist: {directory_path}") @@ -161,7 +163,7 @@ async def scan_and_upload_directories(self, directories: dict[str, DataLakeDirec # Upload each file collected for file_path in file_paths: - await self.upload_file(directory_client, file_path) + await self.upload_file(directory_client, file_path, directory) logger.info(f"Uploaded {file_path} to {directory}") def create_service_client(self): @@ -195,7 +197,7 @@ async def check_md5(self, path: str, md5_hash: str) -> bool: return False - async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str, category: str): + async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str, category: str = ""): # Calculate MD5 hash once md5_hash = await self.calc_md5(file_path) @@ -207,7 +209,8 @@ async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path # Proceed with the upload since the file has changed with open(file=file_path, mode="rb") as f: file_client = directory_client.get_file_client(file=os.path.basename(file_path)) - last_modified = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() + tmtime = os.path.getmtime(file_path) + last_modified = datetime.fromtimestamp(tmtime).isoformat() title = os.path.splitext(os.path.basename(file_path))[0] metadata = { "md5": md5_hash, From bfb95abe775390694cd209332622c2958bd2ec64 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Wed, 6 Nov 2024 19:15:36 +0100 Subject: [PATCH 6/9] Update requirements --- app/backend/requirements.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/backend/requirements.txt b/app/backend/requirements.txt index 51df00e14b..b1af1878b9 100644 --- a/app/backend/requirements.txt +++ b/app/backend/requirements.txt @@ -433,5 +433,8 @@ yarl==1.9.4 zipp==3.20.0 # via importlib-metadata +# used for adlsgen2setup.py +datetime==4.3.0 + # via -r requirements.in # The following packages are considered to be unsafe in a requirements file: # setuptools From d07c9eb26c389e39c5677905c0a36a906f1db146 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Wed, 6 Nov 2024 19:15:54 +0100 Subject: [PATCH 7/9] Update requirements --- app/backend/requirements.in | 1 + 1 file changed, 1 insertion(+) diff --git a/app/backend/requirements.in b/app/backend/requirements.in index 99cb44e678..0a1d35a257 100644 --- a/app/backend/requirements.in +++ b/app/backend/requirements.in @@ -30,3 +30,4 @@ types-beautifulsoup4 msgraph-sdk==1.1.0 openai-messages-token-helper python-dotenv +datetime From 4769133b1e2032816fb0269d45d10b3347f1cad6 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Wed, 6 Nov 2024 21:15:50 +0100 Subject: [PATCH 8/9] conditional upload - check blob md5 before upload --- scripts/adlsgen2setup.py | 93 +++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/scripts/adlsgen2setup.py b/scripts/adlsgen2setup.py index c9f2729f8c..90a12eca86 100644 --- a/scripts/adlsgen2setup.py +++ b/scripts/adlsgen2setup.py @@ -18,7 +18,9 @@ from load_azd_env import load_azd_env logger = logging.getLogger("scripts") - +# Set the logging level for the azure package to DEBUG +logging.getLogger("azure").setLevel(logging.DEBUG) +logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.DEBUG) class AdlsGen2Setup: """ @@ -162,9 +164,12 @@ async def scan_and_upload_directories(self, directories: dict[str, DataLakeDirec file_paths = await self.walk_files(directory_path) # Upload each file collected + count =0 + num = len(file_paths) for file_path in file_paths: await self.upload_file(directory_client, file_path, directory) - logger.info(f"Uploaded {file_path} to {directory}") + count=+1 + logger.info(f"Uploaded [{count}/{num}] {directory}/{file_path}") def create_service_client(self): return DataLakeServiceClient( @@ -172,54 +177,52 @@ def create_service_client(self): ) async def calc_md5(self, path: str) -> str: + hash_md5 = hashlib.md5() with open(path, "rb") as file: - return hashlib.md5(file.read()).hexdigest() - - async def check_md5(self, path: str, md5_hash: str) -> bool: - # if filename ends in .md5 skip - if path.endswith(".md5"): - return True - - # if there is a file called .md5 in this directory, see if its updated - stored_hash = None - hash_path = f"{path}.md5" - if os.path.exists(hash_path): - with open(hash_path, encoding="utf-8") as md5_f: - stored_hash = md5_f.read() - - if stored_hash and stored_hash.strip() == md5_hash.strip(): - logger.info("Skipping %s, no changes detected.", path) - return True - - # Write the hash - with open(hash_path, "w", encoding="utf-8") as md5_f: - md5_f.write(md5_hash) - - return False + for chunk in iter(lambda: file.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + async def get_blob_md5(self, directory_client: DataLakeDirectoryClient, filename: str) -> Optional[str]: + """ + Retrieves the MD5 checksum from the metadata of the specified blob. + """ + file_client = directory_client.get_file_client(filename) + try: + properties = await file_client.get_file_properties() + return properties.metadata.get('md5') + except Exception as e: + logger.error(f"Error getting blob properties for {filename}: {e}") + return None + async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str, category: str = ""): # Calculate MD5 hash once md5_hash = await self.calc_md5(file_path) - # Check if the file has been uploaded or if it has changed - if await self.check_md5(file_path, md5_hash): - logger.info("File %s has already been uploaded, skipping upload.", file_path) - return # Skip uploading if the MD5 check indicates no changes - - # Proceed with the upload since the file has changed - with open(file=file_path, mode="rb") as f: - file_client = directory_client.get_file_client(file=os.path.basename(file_path)) - tmtime = os.path.getmtime(file_path) - last_modified = datetime.fromtimestamp(tmtime).isoformat() - title = os.path.splitext(os.path.basename(file_path))[0] - metadata = { - "md5": md5_hash, - "category": category, - "updated": last_modified, - "title": title - } - await file_client.upload_data(f, overwrite=True, metadata=metadata) - logger.info("File %s uploaded with metadata %s.", file_path, metadata) + # Get the filename + filename = os.path.basename(file_path) + + # Get the MD5 checksum from the blob metadata + blob_md5 = await self.get_blob_md5(directory_client, filename) + + # Upload the file if it does not exist or the checksum differs + if blob_md5 is None or md5_hash != blob_md5: + with open(file_path, "rb") as f: + file_client = directory_client.get_file_client(filename) + tmtime = os.path.getmtime(file_path) + last_modified = datetime.fromtimestamp(tmtime).isoformat() + title = os.path.splitext(filename)[0] + metadata = { + "md5": md5_hash, + "category": category, + "updated": last_modified, + "title": title + } + await file_client.upload_data(f, overwrite=True) + await file_client.set_metadata(metadata) + logger.info(f"Uploaded and updated metadata for {filename}") + else: + logger.info(f"No upload needed for {filename}, checksums match") async def create_or_get_group(self, group_name: str): group_id = None @@ -296,6 +299,6 @@ async def main(args: Any): args = parser.parse_args() if args.verbose: logging.basicConfig() - logging.getLogger().setLevel(logging.INFO) + logging.getLogger().setLevel(logging.INFO) asyncio.run(main(args)) From d10586fa0977a80157a6aa64c14f521220a78046 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Tue, 8 Jul 2025 07:52:40 +0200 Subject: [PATCH 9/9] Update login_and_acl.md fix typos --- docs/login_and_acl.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/login_and_acl.md b/docs/login_and_acl.md index b6f39ce3a8..85c1e62e69 100644 --- a/docs/login_and_acl.md +++ b/docs/login_and_acl.md @@ -317,7 +317,7 @@ The script performs the following steps: - Creates a filesystem / container `gptkbcontainer` in the storage account. - Creates the directories listed in the [sampleacls.json](/scripts/sampleacls.json) file. - Scans the directories for files recursively if you add the option '--scandirs' (default false) cto the argument list (default off) and you don't have '"scandir": false' (default true) below the directory element in the sampleacls.json file. -- Caluclates md5 checksuk of each file refrenced anc compares with existing 'filename.ext.md5' file. Skip upload if same else upload and storenew md5 value in 'filename.ext.md5' +- Caluclates md5 checksum of each file refrenced and compares with existing 'filename.ext.md5' file. Skip upload if same else upload and store new md5 value in 'filename.ext.md5' - Uploads the sample PDFs referenced in the [sampleacls.json](/scripts/sampleacls.json) file or files found in the folders with scandir option set to true into the appropriate directories. - [Recursively sets Access Control Lists (ACLs)](https://learn.microsoft.com/azure/storage/blobs/data-lake-storage-acl-cli) using the information from the [sampleacls.json](/scripts/sampleacls.json) file.