diff --git a/scripts/python/smoke_test/notifications_smoke_test.py b/scripts/python/smoke_test/notifications_smoke_test.py index 879e60682..433298610 100644 --- a/scripts/python/smoke_test/notifications_smoke_test.py +++ b/scripts/python/smoke_test/notifications_smoke_test.py @@ -8,46 +8,34 @@ from mesh_client import INT_ENDPOINT, MeshClient -CONTAINER_NAME = "notifications-reports" -REPORT_FILENAME = "SM0K3-reconciliation-report.csv" WORK_DIR = os.path.dirname(os.path.realpath(__file__)) STARTUP_SCRIPT_PATH = f"{WORK_DIR}/../../bash/run_container_app_job.sh" def test_notifications(): - try: - logging.info("Running notifications smoke test") + logging.info("Running notifications smoke test") - environment, storage_account, resource_group_name = configure() + environment, storage_account, resource_group_name = configure() - if environment == "prod": - return + if environment == "prod": + return - setup_mesh_inbox_test_data(environment, resource_group_name) + setup_mesh_inbox_test_data(environment, resource_group_name) - for job in ["smm", "cap", "smb", "smk"]: - job_result = run_subprocess( - f"Starting notifications container app job manbrs-{job}-{environment}", - [STARTUP_SCRIPT_PATH, environment, job], - ) - assert job_result.returncode == 0 - - download_result = run_subprocess( - "Downloading generated smoke test report from blob storage", - azure_storage_blob_download_reports_command(storage_account), + for job in ["smm", "cap", "smb", "smk"]: + logging.info( + "Starting notifications container app job manbrs-%s-%s", job, environment ) - assert download_result.returncode == 0 - assert REPORT_FILENAME in download_result.stdout - - report_contents = open(f"{WORK_DIR}/{REPORT_FILENAME}").read() - assert "SM0K3" in report_contents - logging.info("Finished notifications smoke test") - finally: - run_subprocess( - "Deleting generated smoke test report from blob storage", - azure_storage_blob_delete_reports_command(storage_account), + job_result = subprocess.run( + [STARTUP_SCRIPT_PATH, environment, job], + check=True, + capture_output=True, + text=True, ) + assert job_result.returncode == 0 + + logging.info("Finished notifications smoke test") def configure(): @@ -64,47 +52,6 @@ def configure(): return (environment, storage_account, resource_group_name) -def run_subprocess(description: str, command: list[str]): - logging.info(description) - return subprocess.run(command, capture_output=True, text=True) - - -def azure_storage_blob_download_reports_command(storage_account: str) -> list[str]: - return [ - "az", - "storage", - "blob", - "download-batch", - "--destination", - WORK_DIR, - "--source", - CONTAINER_NAME, - "--pattern", - REPORT_FILENAME, - "--account-name", - storage_account, - "--auth-mode", - "login", - ] - - -def azure_storage_blob_delete_reports_command(storage_account: str) -> list[str]: - return [ - "az", - "storage", - "blob", - "delete-batch", - "--source", - CONTAINER_NAME, - "--pattern", - REPORT_FILENAME, - "--account-name", - storage_account, - "--auth-mode", - "login", - ] - - def setup_mesh_inbox_test_data(environment: str, resource_group_name: str): populate_mesh_env_vars(environment, resource_group_name) mesh_client().send_message(