diff --git a/pyproject.toml b/pyproject.toml index 8c5ea56a76..3263262137 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -164,9 +164,12 @@ select = [ "D", # pydocstyle "F", # Pyflakes "UP", # pyupgrade + "DJ", # flake8-django "S", # flake8-bandit "I", # isort "C9", # McCabe complexity + "FIX", # flake8-fix + "FURB", # refurb ] ignore = ["D1", "D203", "D205", "D212", "D400", "D415"] @@ -188,6 +191,5 @@ max-complexity = 10 [tool.ruff.lint.per-file-ignores] # Allow the usage of assert in the test_spdx file. "**/test_spdx.py*" = ["S101"] -"scanpipe/pipes/spdx.py" = ["UP006", "UP035"] # Allow complexity in management commands "scanpipe/management/commands/*" = ["C901"] diff --git a/scanpipe/models.py b/scanpipe/models.py index fe50d57b22..4158eee97e 100644 --- a/scanpipe/models.py +++ b/scanpipe/models.py @@ -431,6 +431,9 @@ class ExtraDataFieldMixin(models.Model): help_text=_("Optional mapping of extra data key/values."), ) + class Meta: + abstract = True + def update_extra_data(self, data): """Update the `extra_data` field with the provided `data` dict.""" if not isinstance(data, dict): @@ -439,9 +442,6 @@ def update_extra_data(self, data): self.extra_data.update(data) self.save(update_fields=["extra_data"]) - class Meta: - abstract = True - class UpdateMixin: """ @@ -635,6 +635,10 @@ def save(self, *args, **kwargs): if global_webhook and is_new and not is_clone and not skip_global_webhook: self.setup_global_webhook() + def get_absolute_url(self): + """Return this project's details URL.""" + return reverse("project_detail", args=[self.slug]) + def setup_global_webhook(self): """ Create a global webhook subscription instance from values defined in the @@ -1180,8 +1184,7 @@ def write_input_file(self, file_object): file_path = Path(self.input_path / filename) with open(file_path, "wb+") as f: - for chunk in file_object.chunks(): - f.write(chunk) + f.writelines(file_object.chunks()) def copy_input_from(self, input_location): """ @@ -1428,10 +1431,6 @@ def add_error( object_instance, ) - def get_absolute_url(self): - """Return this project's details URL.""" - return reverse("project_detail", args=[self.slug]) - @cached_property def resource_count(self): """Return the number of resources related to this project.""" @@ -2533,21 +2532,6 @@ class Compliance(models.TextChoices): class Meta: abstract = True - @classmethod - def from_db(cls, db, field_names, values): - """ - Store the ``license_expression_field`` on loading this instance from the - database value. - The cached value is then used to detect changes on `save()`. - """ - new = super().from_db(db, field_names, values) - - if cls.license_expression_field in field_names: - field_index = field_names.index(cls.license_expression_field) - new._loaded_license_expression = values[field_index] - - return new - def save(self, codebase=None, *args, **kwargs): """ Injects policies, if the feature is enabled, when the @@ -2566,6 +2550,21 @@ def save(self, codebase=None, *args, **kwargs): super().save(*args, **kwargs) + @classmethod + def from_db(cls, db, field_names, values): + """ + Store the ``license_expression_field`` on loading this instance from the + database value. + The cached value is then used to detect changes on `save()`. + """ + new = super().from_db(db, field_names, values) + + if cls.license_expression_field in field_names: + field_index = field_names.index(cls.license_expression_field) + new._loaded_license_expression = values[field_index] + + return new + @property def policy_index(self): return self.project.policy_index @@ -2790,6 +2789,9 @@ class Meta: def __str__(self): return self.path + def get_absolute_url(self): + return reverse("resource_detail", args=[self.project.slug, self.path]) + @property def location_path(self): """Return the location of the resource as a Path instance.""" @@ -2949,9 +2951,6 @@ def extracted_from(self, codebase=None): archive_path, _, _ = self.path.rpartition("-extract") return self.project.get_resource(archive_path) - def get_absolute_url(self): - return reverse("resource_detail", args=[self.project.slug, self.path]) - def get_raw_url(self): """Return the URL to access the RAW content of the resource.""" return reverse("resource_raw", args=[self.project.slug, self.path]) @@ -3143,14 +3142,14 @@ class VulnerabilityMixin(models.Model): affected_by_vulnerabilities = models.JSONField(blank=True, default=list) + class Meta: + abstract = True + @property def is_vulnerable(self): """Returns True if this instance is affected by vulnerabilities.""" return bool(self.affected_by_vulnerabilities) - class Meta: - abstract = True - class VulnerabilityQuerySetMixin: def vulnerable(self): diff --git a/scanpipe/pipelines/analyze_root_filesystem.py b/scanpipe/pipelines/analyze_root_filesystem.py index 76478ce6dc..22e58481da 100644 --- a/scanpipe/pipelines/analyze_root_filesystem.py +++ b/scanpipe/pipelines/analyze_root_filesystem.py @@ -111,7 +111,6 @@ def match_not_analyzed_to_application_packages(self): Match files with "not-yet-analyzed" status to files already belonging to application packages. """ - # TODO: do it one rootfs at a time e.g. for rfs in self.root_filesystems: rootfs.match_not_analyzed( self.project, reference_status=flag.APPLICATION_PACKAGE, diff --git a/scanpipe/pipes/__init__.py b/scanpipe/pipes/__init__.py index b6511c6b9a..a0aad7f0cb 100644 --- a/scanpipe/pipes/__init__.py +++ b/scanpipe/pipes/__init__.py @@ -166,8 +166,7 @@ def _clean_package_data(package_data): package_data = package_data.copy() if release_date := package_data.get("release_date"): if type(release_date) is str: - if release_date.endswith("Z"): - release_date = release_date[:-1] + release_date = release_date.removesuffix("Z") package_data["release_date"] = datetime.fromisoformat(release_date).date() # Strip leading "codebase/" to make path compatible with diff --git a/scanpipe/pipes/federatedcode.py b/scanpipe/pipes/federatedcode.py index 420d7bad38..c996582ec2 100644 --- a/scanpipe/pipes/federatedcode.py +++ b/scanpipe/pipes/federatedcode.py @@ -156,8 +156,7 @@ def add_scan_result(project, repo, package_scan_file, logger=None): write_to.parent.mkdir(parents=True, exist_ok=True) results_generator = JSONResultsGenerator(project) with open(write_to, encoding="utf-8", mode="w") as file: - for chunk in results_generator: - file.write(chunk) + file.writelines(results_generator) return relative_scan_file_path diff --git a/scanpipe/pipes/js.py b/scanpipe/pipes/js.py index 90c5f4c37a..1899df43f4 100644 --- a/scanpipe/pipes/js.py +++ b/scanpipe/pipes/js.py @@ -69,7 +69,7 @@ def is_source_mapping_in_minified(resource, map_file_name): lines = resource.file_content.split("\n") total_lines = len(lines) # Get the last 5 lines. - tail = 5 if total_lines > 5 else total_lines + tail = min(total_lines, 5) return any(source_mapping in line for line in reversed(lines[-tail:])) diff --git a/scanpipe/pipes/spdx.py b/scanpipe/pipes/spdx.py index ce94d782ea..9cd2285e56 100644 --- a/scanpipe/pipes/spdx.py +++ b/scanpipe/pipes/spdx.py @@ -28,7 +28,6 @@ from datetime import datetime from datetime import timezone from pathlib import Path -from typing import List # Python 3.8 compatibility SPDX_SPEC_VERSION = "2.3" SPDX_LICENSE_LIST_VERSION = "3.20" @@ -272,7 +271,7 @@ class ExtractedLicensingInfo: name: str = "" comment: str = "" - see_alsos: List[str] = field(default_factory=list) + see_alsos: list[str] = field(default_factory=list) def as_dict(self): """Return the data as a serializable dict.""" @@ -332,9 +331,9 @@ class Package: comment: str = "" license_comments: str = "" - checksums: List[Checksum] = field(default_factory=list) - external_refs: List[ExternalRef] = field(default_factory=list) - attribution_texts: List[str] = field(default_factory=list) + checksums: list[Checksum] = field(default_factory=list) + external_refs: list[ExternalRef] = field(default_factory=list) + attribution_texts: list[str] = field(default_factory=list) def as_dict(self): """Return the data as a serializable dict.""" @@ -380,9 +379,7 @@ def date_to_iso(date_str): if not date_str: return - if date_str.endswith("Z"): - date_str = date_str[:-1] - + date_str = date_str.removesuffix("Z") as_datetime = datetime.fromisoformat(date_str) return as_datetime.isoformat(timespec="seconds") + "Z" @@ -427,18 +424,18 @@ class File: spdx_id: str name: str - checksums: List[Checksum] = field(default_factory=list) + checksums: list[Checksum] = field(default_factory=list) license_concluded: str = "NOASSERTION" copyright_text: str = "NOASSERTION" - license_in_files: List[str] = field(default_factory=list) - contributors: List[str] = field(default_factory=list) + license_in_files: list[str] = field(default_factory=list) + contributors: list[str] = field(default_factory=list) notice_text: str = "" # Supported values: # SOURCE | BINARY | ARCHIVE | APPLICATION | AUDIO | IMAGE | TEXT | VIDEO | # DOCUMENTATION | SPDX | OTHER - types: List[str] = field(default_factory=list) - attribution_texts: List[str] = field(default_factory=list) + types: list[str] = field(default_factory=list) + attribution_texts: list[str] = field(default_factory=list) comment: str = "" license_comments: str = "" @@ -534,16 +531,16 @@ class Document: name: str namespace: str creation_info: CreationInfo - packages: List[Package] + packages: list[Package] spdx_id: str = "SPDXRef-DOCUMENT" version: str = SPDX_SPEC_VERSION data_license: str = "CC0-1.0" comment: str = "" - files: List[File] = field(default_factory=list) - extracted_licenses: List[ExtractedLicensingInfo] = field(default_factory=list) - relationships: List[Relationship] = field(default_factory=list) + files: list[File] = field(default_factory=list) + extracted_licenses: list[ExtractedLicensingInfo] = field(default_factory=list) + relationships: list[Relationship] = field(default_factory=list) def as_dict(self): """Return the SPDX document as a serializable dict.""" diff --git a/scanpipe/tests/pipes/test_output.py b/scanpipe/tests/pipes/test_output.py index 5cf3538659..5b7b217e6f 100644 --- a/scanpipe/tests/pipes/test_output.py +++ b/scanpipe/tests/pipes/test_output.py @@ -234,7 +234,7 @@ def test_scanpipe_pipes_outputs_to_xlsx(self): "MESSAGES", "TODOS", ] - self.assertEqual(expected_sheet_names, workbook.get_sheet_names()) + self.assertEqual(expected_sheet_names, workbook.sheetnames) def test_scanpipe_pipes_outputs_get_xlsx_report(self): project_qs = None @@ -259,7 +259,7 @@ def test_scanpipe_pipes_outputs_get_xlsx_report(self): expected_sheet_names = [ "PACKAGES", ] - self.assertEqual(expected_sheet_names, workbook.get_sheet_names()) + self.assertEqual(expected_sheet_names, workbook.sheetnames) model_short_name = "todo" output_file = output.get_xlsx_report(project_qs, model_short_name) @@ -267,7 +267,7 @@ def test_scanpipe_pipes_outputs_get_xlsx_report(self): expected_sheet_names = [ "TODOS", ] - self.assertEqual(expected_sheet_names, workbook.get_sheet_names()) + self.assertEqual(expected_sheet_names, workbook.sheetnames) def test_scanpipe_pipes_outputs_get_xlsx_fields_order(self): output_file = output.to_xlsx(project=make_project()) diff --git a/scanpipe/tests/test_api.py b/scanpipe/tests/test_api.py index 9b642be374..71cbe1f8dc 100644 --- a/scanpipe/tests/test_api.py +++ b/scanpipe/tests/test_api.py @@ -665,6 +665,9 @@ def test_scanpipe_api_project_action_results_download_output_formats(self): "application/octet-stream", ] self.assertIn(response["Content-Type"], expected) + # Forces Django to finish the response and close the file + # to prevent a "ResourceWarning: unclosed file" + self.assertTrue(response.getvalue().startswith(b"PK")) def test_scanpipe_api_project_action_pipelines(self): url = reverse("project-pipelines") @@ -704,9 +707,9 @@ def test_scanpipe_api_project_action_report(self): output_file = io.BytesIO(b"".join(response.streaming_content)) workbook = openpyxl.load_workbook(output_file, read_only=True, data_only=True) - self.assertEqual(["PACKAGES"], workbook.get_sheet_names()) + self.assertEqual(["PACKAGES"], workbook.sheetnames) - todos_sheet = workbook.get_sheet_by_name("PACKAGES") + todos_sheet = workbook["PACKAGES"] rows = list(todos_sheet.values) self.assertEqual(2, len(rows)) self.assertEqual("project", rows[0][0]) # header row diff --git a/scanpipe/tests/test_commands.py b/scanpipe/tests/test_commands.py index 6dcbd5d09c..6474f1e59f 100644 --- a/scanpipe/tests/test_commands.py +++ b/scanpipe/tests/test_commands.py @@ -1280,8 +1280,8 @@ def test_scanpipe_management_command_report(self): self.assertIn(f"Report generated at {output_file}", out.getvalue()) workbook = openpyxl.load_workbook(output_file, read_only=True, data_only=True) - self.assertEqual(["TODOS"], workbook.get_sheet_names()) - todos_sheet = workbook.get_sheet_by_name("TODOS") + self.assertEqual(["TODOS"], workbook.sheetnames) + todos_sheet = workbook["TODOS"] header = list(todos_sheet.values)[0] self.assertNotIn("extra_data", header) diff --git a/scanpipe/tests/test_views.py b/scanpipe/tests/test_views.py index 19cdca19e7..b9868cfb22 100644 --- a/scanpipe/tests/test_views.py +++ b/scanpipe/tests/test_views.py @@ -237,7 +237,7 @@ def test_scanpipe_views_project_action_report_view(self): output_file = io.BytesIO(b"".join(response.streaming_content)) workbook = openpyxl.load_workbook(output_file, read_only=True, data_only=True) - self.assertEqual(["TODOS"], workbook.get_sheet_names()) + self.assertEqual(["TODOS"], workbook.sheetnames) def test_scanpipe_views_project_action_reset_view(self): url = reverse("project_action")