Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions vulnerabilities/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ class Meta:
"weaknesses",
"exploits",
"severity_range_score",
"exploitability",
"weighted_severity",
"risk_score",
]


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Generated by Django 4.2.16 on 2024-11-08 14:07

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0076_alter_packagechangelog_software_version_and_more"),
]

operations = [
migrations.AddField(
model_name="vulnerability",
name="exploitability",
field=models.DecimalField(
decimal_places=2,
help_text="Exploitability refers to the potential or probability of a software package vulnerability being \n exploited by malicious actors to compromise systems, applications, or networks. \n It is determined automatically by the discovery of exploits.",
max_digits=4,
null=True,
),
),
migrations.AddField(
model_name="vulnerability",
name="weighted_severity",
field=models.DecimalField(
decimal_places=2,
help_text="Weighted Severity is the maximum value obtained when each Severity is multiplied by its associated Weight/10.",
max_digits=4,
null=True,
),
),
]
29 changes: 29 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,35 @@ class Vulnerability(models.Model):
choices=VulnerabilityStatusType.choices, default=VulnerabilityStatusType.PUBLISHED
)

exploitability = models.DecimalField(
null=True,
max_digits=4,
decimal_places=2,
help_text="""Exploitability refers to the potential or probability of a software package vulnerability being
exploited by malicious actors to compromise systems, applications, or networks.
It is determined automatically by the discovery of exploits.""",
)

weighted_severity = models.DecimalField(
null=True,
max_digits=4,
decimal_places=2,
help_text="Weighted Severity is the maximum value obtained when each Severity is multiplied by its associated Weight/10.",
)

@property
def risk_score(self):
"""
Risk expressed as a number ranging from 0 to 10.
Risk is calculated from weighted severity and exploitability values.
It is the maximum value of (the weighted severity multiplied by its exploitability) or 10

Risk = min(weighted severity * exploitability, 10)
"""
if self.exploitability is not None and self.weighted_severity is not None:
return f"{min(float(self.exploitability) * float(self.weighted_severity), 10.0):.2f}"
return None

objects = VulnerabilityQuerySet.as_manager()

class Meta:
Expand Down
56 changes: 55 additions & 1 deletion vulnerabilities/pipelines/compute_package_risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@

from aboutcode.pipeline import LoopProgress

from vulnerabilities.models import AffectedByPackageRelatedVulnerability
from vulnerabilities.models import Package
from vulnerabilities.models import Vulnerability
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.risk import compute_package_risk
from vulnerabilities.risk import compute_vulnerability_risk


class ComputePackageRiskPipeline(VulnerableCodePipeline):
Expand All @@ -26,7 +29,44 @@ class ComputePackageRiskPipeline(VulnerableCodePipeline):

@classmethod
def steps(cls):
return (cls.add_package_risk_score,)
return (cls.add_vulnerability_risk_score, cls.add_package_risk_score)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add is not the best verb to use here.


def add_vulnerability_risk_score(self):
affected_vulnerabilities = Vulnerability.objects.filter(
affectedbypackagerelatedvulnerability__isnull=False
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the only method of the queryset to only fetch the data that is needed for the computation.


self.log(
f"Calculating risk for {affected_vulnerabilities.count():,d} vulnerability with a affected packages records"
)

progress = LoopProgress(total_iterations=affected_vulnerabilities.count(), logger=self.log)

updatables = []
updated_vulnerability_count = 0
batch_size = 5000

for vulnerability in progress.iter(affected_vulnerabilities.paginated()):

vulnerability = compute_vulnerability_risk(vulnerability)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping the save logic outside the compute_vulnerability_risk, and having the function doing only the computation was a better design.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if not vulnerability:
continue

updatables.append(vulnerability)

if len(updatables) >= batch_size:
updated_vulnerability_count += bulk_update_vulnerability_risk_score(
vulnerabilities=updatables,
logger=self.log,
)
updated_vulnerability_count += bulk_update_vulnerability_risk_score(
vulnerabilities=updatables,
logger=self.log,
)
self.log(
f"Successfully added risk score for {updated_vulnerability_count:,d} vulnerability"
)

def add_package_risk_score(self):
affected_packages = Package.objects.filter(
Expand Down Expand Up @@ -72,3 +112,17 @@ def bulk_update_package_risk_score(packages, logger):
logger(f"Error updating packages: {e}")
packages.clear()
return package_count


def bulk_update_vulnerability_risk_score(vulnerabilities, logger):
vulnerabilities_count = 0
if vulnerabilities:
try:
Vulnerability.objects.bulk_update(
objs=vulnerabilities, fields=["weighted_severity", "exploitability"]
)
vulnerabilities_count += len(vulnerabilities)
except Exception as e:
logger(f"Error updating vulnerability: {e}")
vulnerabilities.clear()
return vulnerabilities_count
30 changes: 21 additions & 9 deletions vulnerabilities/risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,31 @@ def get_exploitability_level(exploits, references, severities):

def compute_vulnerability_risk(vulnerability: Vulnerability):
"""
Risk may be expressed as a number ranging from 0 to 10.
Risk is calculated from weighted severity and exploitability values.
It is the maximum value of (the weighted severity multiplied by its exploitability) or 10
Computes the risk score for a given vulnerability.

Risk = min(weighted severity * exploitability, 10)
Risk is expressed as a number ranging from 0 to 10 and is calculated based on:
- Weighted severity: a value derived from the associated severities of the vulnerability.
- Exploitability: a measure of how easily the vulnerability can be exploited.

The risk score is computed as:
Risk = min(weighted_severity * exploitability, 10)

Args:
vulnerability (Vulnerability): The vulnerability object to compute the risk for.

Returns:
Vulnerability: The updated vulnerability object with computed risk-related attributes.

Notes:
- If there are no associated references, severities, or exploits, the computation is skipped.
"""
references = vulnerability.references
severities = vulnerability.severities.select_related("reference")
exploits = Exploit.objects.filter(vulnerability=vulnerability)
if references.exists() or severities.exists() or exploits.exists():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This likely triggers too many query and should be optimized. For example, using .exists() is not ideal here since we then triggers the QS to get the objects. It would be better to directly check the objects QS for content.

weighted_severity = get_weighted_severity(severities)
exploitability = get_exploitability_level(exploits, references, severities)
return min(weighted_severity * exploitability, 10)
vulnerability.weighted_severity = get_weighted_severity(severities)
vulnerability.exploitability = get_exploitability_level(exploits, references, severities)
return vulnerability


def compute_package_risk(package: Package):
Expand All @@ -117,8 +129,8 @@ def compute_package_risk(package: Package):
for pkg_related_vul in AffectedByPackageRelatedVulnerability.objects.filter(
package=package
).prefetch_related("vulnerability"):
if risk := compute_vulnerability_risk(pkg_related_vul.vulnerability):
result.append(risk)
if risk := pkg_related_vul.vulnerability.risk_score:
result.append(float(risk))

if not result:
return
Expand Down
32 changes: 32 additions & 0 deletions vulnerabilities/templates/vulnerability_details.html
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,38 @@
<td class="two-col-left">Status</td>
<td class="two-col-right">{{ status }}</td>
</tr>

<tr>
<td class="two-col-left"
data-tooltip="Exploitability refers to the potential or probability of a software package vulnerability being
exploited by malicious actors to compromise systems, applications, or networks.
It is determined automatically by the discovery of exploits.">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be taken from the model help instead of duplicated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right, but I think we should handle this separately because we do this for a lot of fields. Based on my understanding, there is no direct way to display the help_text of the model without using a view, form, or a template tag.

Exploitability</td>
<td class="two-col-right wrap-strings">
{{ vulnerability.exploitability }}
</td>
</tr>

<tr>
<td class="two-col-left"
data-tooltip="Weighted Severity is the maximum value obtained when each Severity is multiplied by its associated Weight/10."
>Weighted Severity</td>
<td class="two-col-right wrap-strings">
{{ vulnerability.weighted_severity }}
</td>
</tr>

<tr>
<td class="two-col-left"
data-tooltip="Risk expressed as a number ranging from 0 to 10. It is calculated by multiplying
the weighted severity and exploitability values, capped at a maximum of 10.
"
>Risk</td>
<td class="two-col-right wrap-strings">
{{ vulnerability.risk_score }}
</td>
</tr>

</tbody>
</table>
</div>
Expand Down
3 changes: 2 additions & 1 deletion vulnerabilities/tests/pipelines/test_compute_package_risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
from decimal import Decimal

import pytest

Expand All @@ -30,4 +31,4 @@ def test_simple_risk_pipeline(vulnerability):
improver.execute()

pkg = Package.objects.get(type="pypi", name="foo", version="2.3.0")
assert str(pkg.risk_score) == str(3.11)
assert f"{pkg.risk_score:.2f}" == "3.10"
6 changes: 6 additions & 0 deletions vulnerabilities/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ def test_api_with_single_vulnerability(self):
},
],
"exploits": [],
"risk_score": None,
"exploitability": None,
"weighted_severity": None,
}

def test_api_with_single_vulnerability_with_filters(self):
Expand Down Expand Up @@ -346,6 +349,9 @@ def test_api_with_single_vulnerability_with_filters(self):
},
],
"exploits": [],
"risk_score": None,
"exploitability": None,
"weighted_severity": None,
}


Expand Down
3 changes: 2 additions & 1 deletion vulnerabilities/tests/test_risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,5 @@ def test_get_weighted_severity(vulnerability):

@pytest.mark.django_db
def test_compute_vulnerability_risk(vulnerability):
assert compute_vulnerability_risk(vulnerability) == 3.1050000000000004
vulnerability = compute_vulnerability_risk(vulnerability)
assert vulnerability.risk_score == str(3.11)