Skip to content

Commit bac2274

Browse files
authored
Merge pull request #1651 from aboutcode-org/1639-implement-on-failure
Add `on_failure` to handle cleanup during pipeline failure
2 parents 297ab51 + 8f47de3 commit bac2274

File tree

8 files changed

+104
-4
lines changed

8 files changed

+104
-4
lines changed

docs/source/contributing.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ Helpful Resources
8989

9090
- Review our `comprehensive guide <https://scancode-toolkit.readthedocs.io/en/latest/contribute/index.html>`_
9191
for more details on how to add quality contributions to our codebase and documentation
92-
- Check this free resource on `how to contribute to an open source project on github <https://egghead.io/courses/how-to-contribute-to-an-open-source-project-on-github>`_
92+
- Check this free resource on `How to contribute to an open source project on github <https://egghead.io/lessons/javascript-identifying-how-to-contribute-to-an-open-source-project-on-github>`_
9393
- Follow `this wiki page <https://aboutcode.readthedocs.io/en/latest/contributing/writing_good_commit_messages.html>`_
9494
on how to write good commit messages
9595
- `Pro Git book <https://git-scm.com/book/en/v2>`_

docs/source/tutorial_add_importer_pipeline.rst

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,10 +298,14 @@ version management from `univers <https://github.com/aboutcode-org/univers>`_.
298298
**advisories_count** should never be directly added in steps.
299299

300300

301+
.. attention::
302+
303+
Implement ``on_failure`` to handle cleanup in case of pipeline failure.
304+
Cleanup of downloaded archives or cloned repos is necessary to avoid potential resource leakage.
301305

302306
.. note::
303307

304-
| Use ``make valid`` to format your code using black and isort automatically.
308+
| Use ``make valid`` to format your new code using black and isort automatically.
305309
| Use ``make check`` to check for formatting errors.
306310
307311
Register the Importer Pipeline

docs/source/tutorial_add_improver_pipeline.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ methods.
187187
self.log(f"Successfully flagged {ghost_package_count:,d} ghost Packages")
188188
189189
190+
.. attention::
191+
192+
Implement ``on_failure`` to handle cleanup in case of pipeline failure.
193+
Cleanup of downloaded archives or cloned repos is necessary to avoid potential resource leakage.
194+
190195
.. note::
191196

192197
| Use ``make valid`` to format your new code using black and isort automatically.

vulnerabilities/pipelines/__init__.py

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
import logging
1111
from datetime import datetime
1212
from datetime import timezone
13+
from timeit import default_timer as timer
1314
from traceback import format_exc as traceback_format_exc
1415
from typing import Iterable
1516

1617
from aboutcode.pipeline import BasePipeline
1718
from aboutcode.pipeline import LoopProgress
19+
from aboutcode.pipeline import humanize_time
1820

1921
from vulnerabilities.importer import AdvisoryData
2022
from vulnerabilities.improver import MAX_CONFIDENCE
@@ -29,6 +31,57 @@
2931
class VulnerableCodePipeline(BasePipeline):
3032
pipeline_id = None # Unique Pipeline ID
3133

34+
def on_failure(self):
35+
"""
36+
Tasks to run in the event that pipeline execution fails.
37+
38+
Implement cleanup or other tasks that need to be performed
39+
on pipeline failure, such as:
40+
- Removing cloned repositories.
41+
- Deleting downloaded archives.
42+
"""
43+
pass
44+
45+
def execute(self):
46+
"""Execute each steps in the order defined on this pipeline class."""
47+
self.log(f"Pipeline [{self.pipeline_name}] starting")
48+
49+
steps = self.pipeline_class.get_steps(groups=self.selected_groups)
50+
steps_count = len(steps)
51+
pipeline_start_time = timer()
52+
53+
for current_index, step in enumerate(steps, start=1):
54+
step_name = step.__name__
55+
56+
if self.selected_steps and step_name not in self.selected_steps:
57+
self.log(f"Step [{step_name}] skipped")
58+
continue
59+
60+
self.set_current_step(f"{current_index}/{steps_count} {step_name}")
61+
self.log(f"Step [{step_name}] starting")
62+
step_start_time = timer()
63+
64+
try:
65+
step(self)
66+
except Exception as exception:
67+
self.log("Pipeline failed")
68+
on_failure_start_time = timer()
69+
self.log(f"Running [on_failure] tasks")
70+
self.on_failure()
71+
on_failure_run_time = timer() - on_failure_start_time
72+
self.log(f"Completed [on_failure] tasks in {humanize_time(on_failure_run_time)}")
73+
74+
return 1, self.output_from_exception(exception)
75+
76+
step_run_time = timer() - step_start_time
77+
self.log(f"Step [{step_name}] completed in {humanize_time(step_run_time)}")
78+
79+
self.set_current_step("") # Reset the `current_step` field on completion
80+
pipeline_run_time = timer() - pipeline_start_time
81+
self.log(f"Pipeline completed in {humanize_time(pipeline_run_time)}")
82+
83+
return 0, ""
84+
3285
def log(self, message, level=logging.INFO):
3386
"""Log the given `message` to the current module logger and execution_log."""
3487
now_local = datetime.now(timezone.utc).astimezone()
@@ -51,8 +104,8 @@ class VulnerableCodeBaseImporterPipeline(VulnerableCodePipeline):
51104
Base importer pipeline for importing advisories.
52105
53106
Uses:
54-
Subclass this Pipeline and implement ``advisories_count`` and ``collect_advisories`` method.
55-
Also override the ``steps`` and ``advisory_confidence`` as needed.
107+
Subclass this Pipeline and implement ``advisories_count`` and ``collect_advisories``
108+
method. Also override the ``steps`` and ``advisory_confidence`` as needed.
56109
"""
57110

58111
pipeline_id = None # Unique Pipeline ID, this should be the name of pipeline module.

vulnerabilities/pipelines/gitlab_importer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ def clean_downloads(self):
106106
self.log(f"Removing cloned repository")
107107
self.vcs_response.delete()
108108

109+
def on_failure(self):
110+
self.clean_downloads()
111+
109112

110113
def parse_advisory_path(base_path: Path, file_path: Path) -> Tuple[str, str, str]:
111114
"""

vulnerabilities/pipelines/npm_importer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,3 +163,6 @@ def clean_downloads(self):
163163
if self.vcs_response:
164164
self.log(f"Removing cloned repository")
165165
self.vcs_response.delete()
166+
167+
def on_failure(self):
168+
self.clean_downloads()

vulnerabilities/pipelines/pypa_importer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,6 @@ def clean_downloads(self):
6868
if self.vcs_response:
6969
self.log(f"Removing cloned repository")
7070
self.vcs_response.delete()
71+
72+
def on_failure(self):
73+
self.clean_downloads()

vulnerabilities/tests/pipelines/test_base_pipeline.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
from vulnerabilities.importer import AffectedPackage
2020
from vulnerabilities.importer import Reference
2121
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline
22+
from vulnerabilities.pipelines import VulnerableCodePipeline
23+
from vulnerabilities.tests.pipelines import TestLogger
2224

2325
advisory_data1 = AdvisoryData(
2426
aliases=["CVE-2020-13371337"],
@@ -47,6 +49,33 @@ def get_advisory1(created_by="test_pipeline"):
4749
)
4850

4951

52+
class TestVulnerableCodePipeline(TestCase):
53+
def test_on_failure(self):
54+
class TestPipeline(VulnerableCodePipeline):
55+
def __init__(self, test_logger):
56+
super().__init__()
57+
self.log = test_logger.write
58+
59+
@classmethod
60+
def steps(cls):
61+
return (cls.step1,)
62+
63+
def step1(self):
64+
raise Exception("Something went wrong!")
65+
66+
def on_failure(self):
67+
self.log("Doing cleanup.")
68+
69+
logger = TestLogger()
70+
pipeline = TestPipeline(test_logger=logger)
71+
72+
pipeline.execute()
73+
log_result = logger.getvalue()
74+
75+
self.assertIn("Pipeline failed", log_result)
76+
self.assertIn("Running [on_failure] tasks", log_result)
77+
78+
5079
class TestVulnerableCodeBaseImporterPipeline(TestCase):
5180
@patch.object(
5281
VulnerableCodeBaseImporterPipeline,

0 commit comments

Comments
 (0)