Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions vulnerabilities/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ class Meta:
"weaknesses",
"exploits",
"severity_range_score",
"exploitability",
"weighted_severity",
"risk_score",
]


Expand Down
6 changes: 6 additions & 0 deletions vulnerabilities/api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class VulnerabilityV2Serializer(serializers.ModelSerializer):
weaknesses = WeaknessV2Serializer(many=True)
references = VulnerabilityReferenceV2Serializer(many=True, source="vulnerabilityreference_set")
severities = VulnerabilitySeverityV2Serializer(many=True)
exploitability = serializers.FloatField(read_only=True)
weighted_severity = serializers.FloatField(read_only=True)
risk_score = serializers.FloatField(read_only=True)

class Meta:
model = Vulnerability
Expand All @@ -77,6 +80,9 @@ class Meta:
"severities",
"weaknesses",
"references",
"exploitability",
"weighted_severity",
"risk_score",
]

def get_aliases(self, obj):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Generated by Django 4.2.16 on 2024-11-16 20:41

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0081_alter_packagechangelog_software_version_and_more"),
]

operations = [
migrations.AddField(
model_name="vulnerability",
name="exploitability",
field=models.DecimalField(
decimal_places=1,
help_text='"Exploitability indicates the likelihood that a vulnerability in a software package could be used \n by malicious actors to compromise systems, applications, or networks. \n This metric is determined automatically based on the discovery of known exploits.',
max_digits=2,
null=True,
),
),
migrations.AddField(
model_name="vulnerability",
name="weighted_severity",
field=models.DecimalField(
decimal_places=1,
help_text="Weighted severity is the highest value calculated by multiplying each severity by its corresponding weight, divided by 10",
max_digits=3,
null=True,
),
),
migrations.AlterField(
model_name="package",
name="risk_score",
field=models.DecimalField(
decimal_places=1,
help_text="Risk score between 0.00 and 10.00, where higher values indicate greater vulnerability risk for the package.",
max_digits=3,
null=True,
),
),
]
32 changes: 30 additions & 2 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,34 @@ class Vulnerability(models.Model):
related_name="vulnerabilities",
)

exploitability = models.DecimalField(
null=True,
max_digits=2,
decimal_places=1,
help_text=""""Exploitability indicates the likelihood that a vulnerability in a software package could be used
by malicious actors to compromise systems, applications, or networks.
This metric is determined automatically based on the discovery of known exploits.""",
)

weighted_severity = models.DecimalField(
null=True,
max_digits=3,
decimal_places=1,
help_text="Weighted severity is the highest value calculated by multiplying each severity by its corresponding weight, divided by 10",
)

@property
def risk_score(self):
"""
Risk expressed as a number ranging from 0 to 10.
Risk is calculated from weighted severity and exploitability values.
It is the maximum value of (the weighted severity multiplied by its exploitability) or 10
Risk = min(weighted severity * exploitability, 10)
"""
if self.exploitability and self.weighted_severity:
risk_score = min(float(self.exploitability * self.weighted_severity), 10.0)
return round(risk_score, 1)

objects = VulnerabilityQuerySet.as_manager()

class Meta:
Expand Down Expand Up @@ -672,8 +700,8 @@ class Package(PackageURLMixin):

risk_score = models.DecimalField(
null=True,
max_digits=4,
decimal_places=2,
max_digits=3,
decimal_places=1,
help_text="Risk score between 0.00 and 10.00, where higher values "
"indicate greater vulnerability risk for the package.",
)
Expand Down
74 changes: 65 additions & 9 deletions vulnerabilities/pipelines/compute_package_risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
from aboutcode.pipeline import LoopProgress

from vulnerabilities.models import Package
from vulnerabilities.models import Vulnerability
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.risk import compute_package_risk
from vulnerabilities.risk import compute_vulnerability_risk_factors


class ComputePackageRiskPipeline(VulnerableCodePipeline):
Expand All @@ -26,17 +28,57 @@ class ComputePackageRiskPipeline(VulnerableCodePipeline):

@classmethod
def steps(cls):
return (cls.add_package_risk_score,)
return (
cls.compute_and_store_vulnerability_risk_score,
cls.compute_and_store_package_risk_score,
)

def compute_and_store_vulnerability_risk_score(self):
affected_vulnerabilities = Vulnerability.objects.filter(
affectedbypackagerelatedvulnerability__isnull=False
).prefetch_related(
"references",
"exploits",
)

def add_package_risk_score(self):
self.log(
f"Calculating risk for {affected_vulnerabilities.count():,d} vulnerability with a affected packages records"
)

progress = LoopProgress(total_iterations=affected_vulnerabilities.count(), logger=self.log)

updatables = []
updated_vulnerability_count = 0
batch_size = 5000

for vulnerability in progress.iter(affected_vulnerabilities.paginated()):
severities = vulnerability.severities.all()
references = vulnerability.references.all()

(
vulnerability.weighted_severity,
vulnerability.exploitability,
) = compute_vulnerability_risk_factors(references, severities, vulnerability.exploits)

updatables.append(vulnerability)

if len(updatables) >= batch_size:
updated_vulnerability_count += bulk_update_vulnerability_risk_score(
vulnerabilities=updatables,
logger=self.log,
)
updated_vulnerability_count += bulk_update_vulnerability_risk_score(
vulnerabilities=updatables,
logger=self.log,
)
self.log(
f"Successfully added risk score for {updated_vulnerability_count:,d} vulnerability"
)

def compute_and_store_package_risk_score(self):
affected_packages = (
Package.objects.filter(affected_by_vulnerabilities__isnull=False).prefetch_related(
"affectedbypackagerelatedvulnerability_set__vulnerability",
"affectedbypackagerelatedvulnerability_set__vulnerability__references",
"affectedbypackagerelatedvulnerability_set__vulnerability__severities",
"affectedbypackagerelatedvulnerability_set__vulnerability__exploits",
)
).distinct()
Package.objects.filter(affected_by_vulnerabilities__isnull=False).only("id").distinct()
)

self.log(f"Calculating risk for {affected_packages.count():,d} affected package records")

Expand Down Expand Up @@ -81,3 +123,17 @@ def bulk_update_package_risk_score(packages, logger):
logger(f"Error updating packages: {e}")
packages.clear()
return package_count


def bulk_update_vulnerability_risk_score(vulnerabilities, logger):
vulnerabilities_count = 0
if vulnerabilities:
try:
Vulnerability.objects.bulk_update(
objs=vulnerabilities, fields=["weighted_severity", "exploitability"]
)
vulnerabilities_count += len(vulnerabilities)
except Exception as e:
logger(f"Error updating vulnerability: {e}")
vulnerabilities.clear()
return vulnerabilities_count
44 changes: 28 additions & 16 deletions vulnerabilities/risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,29 @@
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#


from typing import List
from urllib.parse import urlparse

from django.db.models import Prefetch

from vulnerabilities.models import AffectedByPackageRelatedVulnerability
from vulnerabilities.models import Vulnerability
from vulnerabilities.models import VulnerabilityReference
from vulnerabilities.models import VulnerabilitySeverity
from vulnerabilities.severity_systems import EPSS
from vulnerabilities.weight_config import WEIGHT_CONFIG

DEFAULT_WEIGHT = 5


def get_weighted_severity(severities):
def get_weighted_severity(severities: List[VulnerabilitySeverity]):
"""
Weighted Severity is the maximum value obtained when each Severity is multiplied
by its associated Weight/10.
Example of Weighted Severity: max(7*(10/10), 8*(3/10), 6*(8/10)) = 7
"""
if not severities:
return 0

score_map = {
"low": 3,
Expand All @@ -49,7 +55,9 @@ def get_weighted_severity(severities):
vul_score_value = score_map.get(vul_score, 0) * max_weight

score_list.append(vul_score_value)
return max(score_list) if score_list else 0

max_score = max(score_list) if score_list else 0
return round(max_score, 1)


def get_exploitability_level(exploits, references, severities):
Expand Down Expand Up @@ -83,21 +91,17 @@ def get_exploitability_level(exploits, references, severities):
return exploit_level


def compute_vulnerability_risk(vulnerability):
def compute_vulnerability_risk_factors(references, severities, exploits):
"""
Risk may be expressed as a number ranging from 0 to 10.
Risk is calculated from weighted severity and exploitability values.
It is the maximum value of (the weighted severity multiplied by its exploitability) or 10

Risk = min(weighted severity * exploitability, 10)
"""
severities = vulnerability.severities.all()
exploits = vulnerability.exploits.all()
reference = vulnerability.references.all()
if reference.exists() or severities.exists() or exploits.exists():
weighted_severity = get_weighted_severity(severities)
exploitability = get_exploitability_level(exploits, reference, severities)
return min(weighted_severity * exploitability, 10)
weighted_severity = get_weighted_severity(severities)
exploitability = get_exploitability_level(exploits, references, severities)
return weighted_severity, exploitability


def compute_package_risk(package):
Expand All @@ -107,11 +111,19 @@ def compute_package_risk(package):
"""

result = []
for package_vulnerability in package.affectedbypackagerelatedvulnerability_set.all():
if risk := compute_vulnerability_risk(package_vulnerability.vulnerability):
result.append(risk)
affected_pkg_related_vul = AffectedByPackageRelatedVulnerability.objects.filter(
package=package
).prefetch_related(
Prefetch(
"vulnerability",
queryset=Vulnerability.objects.only("weighted_severity", "exploitability"),
)
)
for pkg_related_vul in affected_pkg_related_vul:
if risk := pkg_related_vul.vulnerability.risk_score:
result.append(float(risk))

if not result:
return

return f"{max(result):.2f}"
return round(max(result), 1)
32 changes: 32 additions & 0 deletions vulnerabilities/templates/vulnerability_details.html
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,38 @@
<td class="two-col-left">Status</td>
<td class="two-col-right">{{ status }}</td>
</tr>

<tr>
<td class="two-col-left"
data-tooltip="Exploitability indicates the likelihood that a vulnerability in a software package could be used
by malicious actors to compromise systems, applications, or networks.
This metric is determined automatically based on the discovery of known exploits.">
Exploitability</td>
<td class="two-col-right wrap-strings">
{{ vulnerability.exploitability }}
</td>
</tr>

<tr>
<td class="two-col-left"
data-tooltip="Weighted severity is the highest value calculated by multiplying each severity by its corresponding weight, divided by 10."
>Weighted Severity</td>
<td class="two-col-right wrap-strings">
{{ vulnerability.weighted_severity }}
</td>
</tr>

<tr>
<td class="two-col-left"
data-tooltip="Risk expressed as a number ranging from 0 to 10. It is calculated by multiplying
the weighted severity and exploitability values, capped at a maximum of 10.
"
>Risk</td>
<td class="two-col-right wrap-strings">
{{ vulnerability.risk_score }}
</td>
</tr>

</tbody>
</table>
</div>
Expand Down
3 changes: 2 additions & 1 deletion vulnerabilities/tests/pipelines/test_compute_package_risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
from decimal import Decimal

import pytest

Expand All @@ -30,4 +31,4 @@ def test_simple_risk_pipeline(vulnerability):
improver.execute()

pkg = Package.objects.get(type="pypi", name="foo", version="2.3.0")
assert str(pkg.risk_score) == str(3.11)
assert pkg.risk_score == Decimal("10")
6 changes: 6 additions & 0 deletions vulnerabilities/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ def test_api_with_single_vulnerability(self):
},
],
"exploits": [],
"risk_score": None,
"exploitability": None,
"weighted_severity": None,
}

def test_api_with_single_vulnerability_with_filters(self):
Expand Down Expand Up @@ -347,6 +350,9 @@ def test_api_with_single_vulnerability_with_filters(self):
},
],
"exploits": [],
"risk_score": None,
"exploitability": None,
"weighted_severity": None,
}


Expand Down
Loading