Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions .github/workflows/ci_mocked_api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,22 @@ jobs:
- name: 'Use gcloud CLI'
run: 'gcloud info'

# - id: 'auth'
# uses: 'google-github-actions/auth@v1'
# with:
# credentials_json: '${{ secrets.GCP_SA_KEY }}'
- name: Setup minio
uses: comfuture/minio-action@v1
with:
access_key: minio
secret_key: minio123
port: 9000

- name: Install MinIO Client CLI
run: |
curl -O https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
sudo mv mc /usr/local/bin/

# - name: 'Set up GCloud SDK'
# uses: 'google-github-actions/setup-gcloud@v1'
- name: Configure MinIO client
run: |
mc alias set minio http://localhost:9000 minio minio123

- name: Run pytest
run: |
Expand Down
9 changes: 5 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ version = "0.5.0"
description = ""
authors = [
"Vanessa Sochat <sochat1@llnl.gov>",
"Johannes Koester <johannes.koester@uni-due.de>"
"Johannes Koester <johannes.koester@uni-due.de>",
"Tadas Bareikis <tadas@vugene.com>"
]
readme = "README.md"
license = "MIT"
Expand All @@ -21,15 +22,15 @@ google-cloud-storage = "^2.12.0"
snakemake-interface-common = "^1.14.0"
snakemake-interface-executor-plugins = "^9.0.0"
jinja2 = "^3.1.2"
google-cloud-logging = "^3.8.0"
google-cloud-logging = "^3.11.4"

[tool.poetry.group.dev.dependencies]
black = "^24.4.0"
flake8 = "^6.1.0"
coverage = "^7.3.1"
pytest = "^7.4.2"
snakemake = "^8.18.0"
snakemake-storage-plugin-s3 = "^0.2.10"
snakemake = "^8.30.0"
snakemake-storage-plugin-s3 = "^0.3.1"

[tool.coverage.run]
omit = [".*", "*/site-packages/*", "Snakefile"]
Expand Down
56 changes: 53 additions & 3 deletions snakemake_executor_plugin_googlebatch/executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import time
import uuid

from typing import List
Expand All @@ -11,8 +12,8 @@
import snakemake_executor_plugin_googlebatch.utils as utils
import snakemake_executor_plugin_googlebatch.command as cmdutil

from google.api_core.exceptions import DeadlineExceeded
from google.cloud import batch_v1
from google.api_core.exceptions import DeadlineExceeded, ResourceExhausted
from google.cloud import batch_v1, logging


class GoogleBatchExecutor(RemoteExecutor):
Expand Down Expand Up @@ -501,6 +502,9 @@ async def check_active_jobs(self, active_jobs: List[SubmittedJobInfo]):
# SUCCEEDED
# FAILED
# DELETION_IN_PROGRESS
if response.status.state.name in ["FAILED", "SUCCEEDED"]:
self.save_finished_job_logs(j)

if response.status.state.name == "FAILED":
msg = f"Google Batch job '{j.external_jobid}' failed. "
self.report_job_error(j, msg=msg, aux_logs=aux_logs)
Expand All @@ -512,9 +516,55 @@ async def check_active_jobs(self, active_jobs: List[SubmittedJobInfo]):
else:
yield j

def save_finished_job_logs(
self,
job_info: SubmittedJobInfo,
sleeps=60,
page_size=1000,
):
"""
Download logs using Google Cloud Logging API and save
them locally. Since tail logging does not work, this function
is run only at the end of the job.
"""
job_uid = job_info.aux["batch_job"].uid
filter_query = f"labels.job_uid={job_uid}"
logfname = job_info.aux["logfile"]

log_client = logging.Client(project=self.executor_settings.project)
logger = log_client.logger("batch_task_logs")

try:
with open(logfname, "w", encoding="utf-8") as logfile:
for log_entry in logger.list_entries(
filter_=filter_query,
page_size=page_size,
):
logfile.write(str(log_entry.payload) + "\n")
except ResourceExhausted:
self.logger.warning(
"Too many requests to Google Logging API.\n"
+ f"Skipping logs for job {job_uid} and sleeping for {sleeps}s."
)
time.sleep(sleeps)

self.logger.warning(f"Trying to retrieve logs for job {job_uid} once more.")
try:
with open(logfname, "w", encoding="utf-8") as logfile:
for log_entry in logger.list_entries(
filter_=filter_query,
page_size=page_size,
):
logfile.write(log_entry.payload + "\n")
except ResourceExhausted:
self.logger.warning(
"Retry to retrieve logs failed, "
+ f"the log file {logfname} might be incomplete."
)

def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
"""
cancel all active jobs. This method is called when snakemake is interrupted.
Cancel all active jobs. This method is called when snakemake is interrupted.
"""
for job in active_jobs:
jobid = job.external_jobid
Expand Down
9 changes: 8 additions & 1 deletion tests/tests_mocked_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class TestWorkflowsMockedApi(TestWorkflowsBase):
@patch(
"google.cloud.batch_v1.BatchServiceClient.create_job",
new=MagicMock(
return_value=Job(name="foo"),
return_value=Job(name="foo", uid="bar"),
autospec=True,
),
)
Expand All @@ -22,6 +22,13 @@ class TestWorkflowsMockedApi(TestWorkflowsBase):
autospec=True,
),
)
@patch(
"google.cloud.logging.Client.logger",
new=MagicMock(
return_value=MagicMock(list_entries=lambda filter_, page_size: []),
autospec=True,
),
)
@patch(
"snakemake.dag.DAG.check_and_touch_output",
new=AsyncMock(autospec=True),
Expand Down