diff --git a/pyproject.toml b/pyproject.toml index 1def308..e7080a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,15 +22,15 @@ google-cloud-storage = "^2.12.0" snakemake-interface-common = "^1.14.0" snakemake-interface-executor-plugins = "^9.0.0" jinja2 = "^3.1.2" -google-cloud-logging = "^3.8.0" +google-cloud-logging = "^3.11.4" [tool.poetry.group.dev.dependencies] black = "^24.4.0" flake8 = "^6.1.0" coverage = "^7.3.1" pytest = "^7.4.2" -snakemake = "^8.18.0" -snakemake-storage-plugin-s3 = "^0.2.10" +snakemake = "^8.30.0" +snakemake-storage-plugin-s3 = "^0.3.1" [tool.coverage.run] omit = [".*", "*/site-packages/*", "Snakefile"] diff --git a/snakemake_executor_plugin_googlebatch/executor.py b/snakemake_executor_plugin_googlebatch/executor.py index 5d3dad7..13b5a4a 100644 --- a/snakemake_executor_plugin_googlebatch/executor.py +++ b/snakemake_executor_plugin_googlebatch/executor.py @@ -1,4 +1,5 @@ import os +import time import uuid from typing import List @@ -11,8 +12,8 @@ import snakemake_executor_plugin_googlebatch.utils as utils import snakemake_executor_plugin_googlebatch.command as cmdutil -from google.api_core.exceptions import DeadlineExceeded -from google.cloud import batch_v1 +from google.api_core.exceptions import DeadlineExceeded, ResourceExhausted +from google.cloud import batch_v1, logging class GoogleBatchExecutor(RemoteExecutor): @@ -508,6 +509,9 @@ async def check_active_jobs(self, active_jobs: List[SubmittedJobInfo]): # SUCCEEDED # FAILED # DELETION_IN_PROGRESS + if response.status.state.name in ["FAILED", "SUCCEEDED"]: + self.save_finished_job_logs(j) + if response.status.state.name == "FAILED": msg = f"Google Batch job '{j.external_jobid}' failed. " self.report_job_error(j, msg=msg, aux_logs=aux_logs) @@ -519,9 +523,61 @@ async def check_active_jobs(self, active_jobs: List[SubmittedJobInfo]): else: yield j + def save_finished_job_logs( + self, + job_info: SubmittedJobInfo, + sleeps=60, + page_size=1000, + ): + """ + Download logs using Google Cloud Logging API and save + them locally. Since tail logging does not work, this function + is run only at the end of the job. + """ + job_uid = job_info.aux["batch_job"].uid + filter_query = f"labels.job_uid={job_uid}" + logfname = job_info.aux["logfile"] + + log_client = logging.Client(project=self.executor_settings.project) + logger = log_client.logger("batch_task_logs") + + def attempt_log_save(fname, logger, query, page_size): + with open(fname, "w", encoding="utf-8") as logfile: + for log_entry in logger.list_entries( + filter_=query, + page_size=page_size, + ): + logfile.write(str(log_entry.payload) + "\n") + + self.logger.info(f"Saving logs for Batch job {job_uid} to {logfname}.") + + try: + attempt_log_save(logfname, logger, filter_query, page_size) + except ResourceExhausted: + self.logger.warning( + "Too many requests to Google Logging API.\n" + + f"Skipping logs for job {job_uid} and sleeping for {sleeps}s." + ) + time.sleep(sleeps) + + self.logger.warning( + f"Trying to retrieve logs for Batch job {job_uid} once more." + ) + try: + attempt_log_save(logfname, logger, filter_query, page_size) + except ResourceExhausted: + self.logger.warning( + "Retry to retrieve logs failed, " + + f"the log file {logfname} might be incomplete." + ) + except Exception as e: + self.logger.warning( + f"Failed to retrieve logs for Batch job {job_uid}: {str(e)}" + ) + def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]): """ - cancel all active jobs. This method is called when snakemake is interrupted. + Cancel all active jobs. This method is called when snakemake is interrupted. """ for job in active_jobs: jobid = job.external_jobid diff --git a/tests/tests_mocked_api.py b/tests/tests_mocked_api.py index fe8d7fd..ebbdb33 100644 --- a/tests/tests_mocked_api.py +++ b/tests/tests_mocked_api.py @@ -11,7 +11,7 @@ class TestWorkflowsMockedApi(TestWorkflowsBase): @patch( "google.cloud.batch_v1.BatchServiceClient.create_job", new=MagicMock( - return_value=Job(name="foo"), + return_value=Job(name="foo", uid="bar"), autospec=True, ), ) @@ -22,6 +22,13 @@ class TestWorkflowsMockedApi(TestWorkflowsBase): autospec=True, ), ) + @patch( + "google.cloud.logging.Client.logger", + new=MagicMock( + return_value=MagicMock(list_entries=lambda filter_, page_size: []), + autospec=True, + ), + ) @patch( "snakemake.dag.DAG.check_and_touch_output", new=AsyncMock(autospec=True),