Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ google-cloud-storage = "^2.12.0"
snakemake-interface-common = "^1.14.0"
snakemake-interface-executor-plugins = "^9.0.0"
jinja2 = "^3.1.2"
google-cloud-logging = "^3.8.0"
google-cloud-logging = "^3.11.4"

[tool.poetry.group.dev.dependencies]
black = "^24.4.0"
flake8 = "^6.1.0"
coverage = "^7.3.1"
pytest = "^7.4.2"
snakemake = "^8.18.0"
snakemake-storage-plugin-s3 = "^0.2.10"
snakemake = "^8.30.0"
snakemake-storage-plugin-s3 = "^0.3.1"

[tool.coverage.run]
omit = [".*", "*/site-packages/*", "Snakefile"]
Expand Down
62 changes: 59 additions & 3 deletions snakemake_executor_plugin_googlebatch/executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import time
import uuid

from typing import List
Expand All @@ -11,8 +12,8 @@
import snakemake_executor_plugin_googlebatch.utils as utils
import snakemake_executor_plugin_googlebatch.command as cmdutil

from google.api_core.exceptions import DeadlineExceeded
from google.cloud import batch_v1
from google.api_core.exceptions import DeadlineExceeded, ResourceExhausted
from google.cloud import batch_v1, logging


class GoogleBatchExecutor(RemoteExecutor):
Expand Down Expand Up @@ -508,6 +509,9 @@ async def check_active_jobs(self, active_jobs: List[SubmittedJobInfo]):
# SUCCEEDED
# FAILED
# DELETION_IN_PROGRESS
if response.status.state.name in ["FAILED", "SUCCEEDED"]:
self.save_finished_job_logs(j)

if response.status.state.name == "FAILED":
msg = f"Google Batch job '{j.external_jobid}' failed. "
self.report_job_error(j, msg=msg, aux_logs=aux_logs)
Expand All @@ -519,9 +523,61 @@ async def check_active_jobs(self, active_jobs: List[SubmittedJobInfo]):
else:
yield j

def save_finished_job_logs(
self,
job_info: SubmittedJobInfo,
sleeps=60,
page_size=1000,
):
"""
Download logs using Google Cloud Logging API and save
them locally. Since tail logging does not work, this function
is run only at the end of the job.
"""
job_uid = job_info.aux["batch_job"].uid
filter_query = f"labels.job_uid={job_uid}"
logfname = job_info.aux["logfile"]

log_client = logging.Client(project=self.executor_settings.project)
logger = log_client.logger("batch_task_logs")

def attempt_log_save(fname, logger, query, page_size):
with open(fname, "w", encoding="utf-8") as logfile:
for log_entry in logger.list_entries(
filter_=query,
page_size=page_size,
):
logfile.write(str(log_entry.payload) + "\n")

self.logger.info(f"Saving logs for Batch job {job_uid} to {logfname}.")

try:
attempt_log_save(logfname, logger, filter_query, page_size)
except ResourceExhausted:
self.logger.warning(
"Too many requests to Google Logging API.\n"
+ f"Skipping logs for job {job_uid} and sleeping for {sleeps}s."
)
time.sleep(sleeps)

self.logger.warning(
f"Trying to retrieve logs for Batch job {job_uid} once more."
)
try:
attempt_log_save(logfname, logger, filter_query, page_size)
except ResourceExhausted:
self.logger.warning(
"Retry to retrieve logs failed, "
+ f"the log file {logfname} might be incomplete."
)
except Exception as e:
self.logger.warning(
f"Failed to retrieve logs for Batch job {job_uid}: {str(e)}"
)

def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
"""
cancel all active jobs. This method is called when snakemake is interrupted.
Cancel all active jobs. This method is called when snakemake is interrupted.
"""
for job in active_jobs:
jobid = job.external_jobid
Expand Down
9 changes: 8 additions & 1 deletion tests/tests_mocked_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class TestWorkflowsMockedApi(TestWorkflowsBase):
@patch(
"google.cloud.batch_v1.BatchServiceClient.create_job",
new=MagicMock(
return_value=Job(name="foo"),
return_value=Job(name="foo", uid="bar"),
autospec=True,
),
)
Expand All @@ -22,6 +22,13 @@ class TestWorkflowsMockedApi(TestWorkflowsBase):
autospec=True,
),
)
@patch(
"google.cloud.logging.Client.logger",
new=MagicMock(
return_value=MagicMock(list_entries=lambda filter_, page_size: []),
autospec=True,
),
)
@patch(
"snakemake.dag.DAG.check_and_touch_output",
new=AsyncMock(autospec=True),
Expand Down