Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
)
from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2
from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2
from vulnerabilities.pipelines.v2_importers import gitlab_live_importer as gitlab_live_importer_v2
from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2
from vulnerabilities.pipelines.v2_importers import mozilla_importer as mozilla_importer_v2
from vulnerabilities.pipelines.v2_importers import npm_importer as npm_importer_v2
Expand Down Expand Up @@ -117,3 +118,9 @@
oss_fuzz.OSSFuzzImporter,
]
)

LIVE_IMPORTERS_REGISTRY = create_registry(
[
gitlab_live_importer_v2.GitLabLiveImporterPipeline,
]
)
242 changes: 242 additions & 0 deletions vulnerabilities/pipelines/v2_importers/gitlab_live_importer.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this should be inside vulnerabilities/pipelines/v2_importers/gitlab_importer.py

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image image

Based on the proposed architecture, we agreed that we'll have another parallel registry for live importers, each new live importer pipeline class will be added to this registry which the API endpoint uses to pick the relevant live importers to run.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it will be better if we merge #1969 first, it adds the live evaluation API which directly utilizes the LIVE_IMPORTERS_REGISTRY.

Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
import logging
import traceback
from typing import Iterable
from urllib.parse import urljoin

import pytz
from dateutil import parser as dateparser
from packageurl import PackageURL
from univers.version_range import RANGE_CLASS_BY_SCHEMES
from univers.version_range import VersionRange
from univers.version_range import from_gitlab_native

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.pipelines.v2_importers.gitlab_importer import get_purl
from vulnerabilities.utils import build_description
from vulnerabilities.utils import get_cwe_id
from vulntotal.datasources.gitlab import get_casesensitive_slug
from vulntotal.datasources.gitlab_api import fetch_gitlab_advisories_for_purl
from vulntotal.datasources.gitlab_api import get_estimated_advisories_count


class GitLabLiveImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""
GitLab Live Importer Pipeline
Collect advisory from GitLab Advisory Database (Open Source Edition) for a single PURL.
"""

pipeline_id = "gitlab_live_importer_v2"
spdx_license_expression = "MIT"
license_url = "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/LICENSE"
supported_types = ["pypi", "npm", "maven", "nuget", "composer", "conan", "gem"]

@classmethod
def steps(cls):
return (
cls.get_purl_inputs,
cls.collect_and_store_advisories,
)

def get_purl_inputs(self):
purl = self.inputs["purl"]
if not purl:
raise ValueError("PURL is required for GitLabLiveImporterPipeline")

if isinstance(purl, str):
purl = PackageURL.from_string(purl)

if not isinstance(purl, PackageURL):
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")

if purl.type not in self.supported_types:
raise ValueError(
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
)

if not purl.version:
raise ValueError(f"PURL: {purl!s} is expected to have a version")

self.purl = purl

purl_type_by_gitlab_scheme = {
"conan": "conan",
"gem": "gem",
# Entering issue to parse go package names https://github.com/nexB/vulnerablecode/issues/742
# "go": "golang",
"maven": "maven",
"npm": "npm",
"nuget": "nuget",
"packagist": "composer",
"pypi": "pypi",
}

gitlab_scheme_by_purl_type = {v: k for k, v in purl_type_by_gitlab_scheme.items()}

def advisories_count(self):
return get_estimated_advisories_count(
self.purl, self.gitlab_scheme_by_purl_type, get_casesensitive_slug
)

def collect_advisories(self) -> Iterable[AdvisoryData]:
advisories = fetch_gitlab_advisories_for_purl(
self.purl, self.gitlab_scheme_by_purl_type, get_casesensitive_slug
)

input_version = self.purl.version
vrc = RANGE_CLASS_BY_SCHEMES[self.purl.type]
version_obj = vrc.version_class(input_version) if input_version else None

for advisory in advisories:
advisory_data = self._advisory_dict_to_advisory_data(advisory)

affected = False
for affected_package in advisory_data.affected_packages:
vrange = affected_package.affected_version_range
if vrange and version_obj in vrange:
affected = True
break
if affected:
yield advisory_data

def _advisory_dict_to_advisory_data(self, advisory):
return advisory_dict_to_advisory_data(
advisory=advisory,
purl_type_by_gitlab_scheme=self.purl_type_by_gitlab_scheme,
gitlab_scheme_by_purl_type=self.gitlab_scheme_by_purl_type,
logger=self.log,
purl=self.purl,
)


def advisory_dict_to_advisory_data(
advisory: dict,
purl_type_by_gitlab_scheme,
gitlab_scheme_by_purl_type,
logger,
purl=None,
advisory_url=None,
):
"""
Convert a GitLab advisory dict to AdvisoryDataV2.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has lots of duplicate with

def parse_gitlab_advisory(
file, base_path, gitlab_scheme_by_purl_type, purl_type_by_gitlab_scheme, logger
):
"""
Parse a Gitlab advisory file and return an AdvisoryData or None.
These files are YAML. There is a JSON schema documented at
https://gitlab.com/gitlab-org/advisories-community/-/blob/main/ci/schema/schema.json
Sample YAML file:
---
identifier: "GMS-2018-26"
package_slug: "packagist/amphp/http"
title: "Incorrect header injection check"
description: "amphp/http isn't properly protected against HTTP header injection."
pubdate: "2018-03-15"
affected_range: "<1.0.1"
fixed_versions:
- "v1.0.1"
urls:
- "https://github.com/amphp/http/pull/4"
cwe_ids:
- "CWE-1035"
- "CWE-937"
identifiers:
- "GMS-2018-26"
"""
with open(file) as f:
gitlab_advisory = saneyaml.load(f)
if not isinstance(gitlab_advisory, dict):
logger(
f"parse_gitlab_advisory: unknown gitlab advisory format in {file!r} with data: {gitlab_advisory!r}",
level=logging.ERROR,
)
return
# refer to schema here https://gitlab.com/gitlab-org/advisories-community/-/blob/main/ci/schema/schema.json
aliases = gitlab_advisory.get("identifiers")
advisory_id = gitlab_advisory.get("identifier")
package_slug = gitlab_advisory.get("package_slug")
advisory_id = f"{package_slug}/{advisory_id}" if package_slug else advisory_id
if advisory_id in aliases:
aliases.remove(advisory_id)
summary = build_description(gitlab_advisory.get("title"), gitlab_advisory.get("description"))
urls = gitlab_advisory.get("urls")
references = [ReferenceV2.from_url(u) for u in urls]
cwe_ids = gitlab_advisory.get("cwe_ids") or []
cwe_list = list(map(get_cwe_id, cwe_ids))
date_published = dateparser.parse(gitlab_advisory.get("pubdate"))
date_published = date_published.replace(tzinfo=pytz.UTC)
advisory_url = get_advisory_url(
file=file,
base_path=base_path,
url="https://gitlab.com/gitlab-org/advisories-community/-/blob/main/",
)
purl: PackageURL = get_purl(
package_slug=package_slug,
purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme,
logger=logger,
)
if not purl:
logger(
f"parse_yaml_file: purl is not valid: {file!r} {package_slug!r}", level=logging.ERROR
)
return AdvisoryData(
advisory_id=advisory_id,
aliases=aliases,
summary=summary,
references_v2=references,
date_published=date_published,
url=advisory_url,
original_advisory_text=json.dumps(gitlab_advisory, indent=2, ensure_ascii=False),
)
affected_version_range = None
fixed_versions = gitlab_advisory.get("fixed_versions") or []
affected_range = gitlab_advisory.get("affected_range")
gitlab_native_schemes = set(["pypi", "gem", "npm", "go", "packagist", "conan"])
vrc = RANGE_CLASS_BY_SCHEMES[purl.type]
gitlab_scheme = gitlab_scheme_by_purl_type[purl.type]
try:
if affected_range:
if gitlab_scheme in gitlab_native_schemes:
affected_version_range = from_gitlab_native(
gitlab_scheme=gitlab_scheme, string=affected_range
)
else:
affected_version_range = vrc.from_native(affected_range)
except Exception as e:
logger(
f"parse_yaml_file: affected_range is not parsable: {affected_range!r} for: {purl!s} error: {e!r}\n {traceback.format_exc()}",
level=logging.ERROR,
)
parsed_fixed_versions = []
for fixed_version in fixed_versions:
try:
fixed_version = vrc.version_class(fixed_version)
parsed_fixed_versions.append(fixed_version.string)
except Exception as e:
logger(
f"parse_yaml_file: fixed_version is not parsable`: {fixed_version!r} error: {e!r}\n {traceback.format_exc()}",
level=logging.ERROR,
)
if affected_version_range:
vrc = affected_version_range.__class__
fixed_version_range = vrc.from_versions(parsed_fixed_versions)
if not fixed_version_range and not affected_version_range:
return
affected_package = AffectedPackageV2(
package=purl,
affected_version_range=affected_version_range,
fixed_version_range=fixed_version_range,
)
return AdvisoryData(
advisory_id=advisory_id,
aliases=aliases,
summary=summary,
references_v2=references,
date_published=date_published,
affected_packages=[affected_package],
weaknesses=cwe_list,
url=advisory_url,
original_advisory_text=json.dumps(gitlab_advisory, indent=2, ensure_ascii=False),
)
we should re use the existing code and if needed we can break down parse_gitlab_advisory smaller function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've adressed this in my latest commit by extracting the shared logic into a common file so we can avoid duplicate code, thanks for pointing out.

aliases = advisory.get("identifiers", [])
identifier = advisory.get("identifier", "")
package_slug = advisory.get("package_slug")

advisory_id = f"{package_slug}/{identifier}" if package_slug else identifier
if advisory_id in aliases:
aliases.remove(advisory_id)

summary = build_description(advisory.get("title"), advisory.get("description"))
urls = advisory.get("urls", [])
references = [ReferenceV2.from_url(u) for u in urls]

cwe_ids = advisory.get("cwe_ids") or []
cwe_list = list(map(get_cwe_id, cwe_ids))

date_published = dateparser.parse(advisory.get("pubdate"))
date_published = date_published.replace(tzinfo=pytz.UTC)

# Determine purl if not provided
if not purl:
purl = get_purl(
package_slug=package_slug,
purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme,
logger=logger,
)

if not purl:
logger(
f"advisory_dict_to_advisory_data: purl is not valid: {package_slug!r}",
level=logging.ERROR,
)
return AdvisoryData(
advisory_id=advisory_id,
aliases=aliases,
summary=summary,
references_v2=references,
date_published=date_published,
url=advisory_url,
)

affected_version_range = None
fixed_versions = advisory.get("fixed_versions") or []
affected_range = advisory.get("affected_range")
gitlab_native_schemes = set(["pypi", "gem", "npm", "go", "packagist", "conan"])
vrc: VersionRange = RANGE_CLASS_BY_SCHEMES[purl.type]
gitlab_scheme = gitlab_scheme_by_purl_type[purl.type]
try:
if affected_range:
if gitlab_scheme in gitlab_native_schemes:
affected_version_range = from_gitlab_native(
gitlab_scheme=gitlab_scheme, string=affected_range
)
else:
affected_version_range = vrc.from_native(affected_range)
except Exception as e:
logger(
f"advisory_dict_to_advisory_data: affected_range is not parsable: {affected_range!r} for: {purl!s} error: {e!r}\n {traceback.format_exc()}",
level=logging.ERROR,
)

parsed_fixed_versions = []
for fixed_version in fixed_versions:
try:
fixed_version = vrc.version_class(fixed_version)
parsed_fixed_versions.append(fixed_version.string)
except Exception as e:
logger(
f"advisory_dict_to_advisory_data: fixed_version is not parsable`: {fixed_version!r} error: {e!r}\n {traceback.format_exc()}",
level=logging.ERROR,
)

if affected_version_range:
vrc = affected_version_range.__class__

fixed_version_range = vrc.from_versions(parsed_fixed_versions)
if not fixed_version_range and not affected_version_range:
return

purl_without_version = get_purl(
package_slug=package_slug,
purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme,
logger=logger,
)

affected_package = AffectedPackageV2(
package=purl_without_version,
affected_version_range=affected_version_range,
fixed_version_range=fixed_version_range,
)

if not advisory_url and package_slug and identifier:
advisory_url = urljoin(
"https://gitlab.com/gitlab-org/advisories-community/-/blob/main/",
package_slug + "/" + identifier + ".yml",
)

return AdvisoryData(
advisory_id=advisory_id,
aliases=aliases,
summary=summary,
references_v2=references,
date_published=date_published,
affected_packages=[affected_package],
weaknesses=cwe_list,
url=advisory_url,
original_advisory_text=json.dumps(advisory, indent=2, ensure_ascii=False),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
#

from pathlib import Path
from unittest import mock

import saneyaml
from packageurl import PackageURL

from vulnerabilities.pipelines.v2_importers.gitlab_live_importer import GitLabLiveImporterPipeline
from vulnerabilities.tests import util_tests

TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "gitlab"


@mock.patch(
"vulnerabilities.pipelines.v2_importers.gitlab_live_importer.fetch_gitlab_advisories_for_purl"
)
def test_gitlab_importer_package_first_mode_found_with_version(mock_fetch):
pkg_type = "pypi"
response_file = TEST_DATA / f"{pkg_type}.yaml"
expected_file = TEST_DATA / f"{pkg_type}-live-importer-expected.json"

with open(response_file) as f:
advisory_dict = saneyaml.load(f)

mock_fetch.return_value = [advisory_dict]
purl = PackageURL(type="pypi", name="flask", version="0.9")
pipeline = GitLabLiveImporterPipeline(purl=purl)
pipeline.get_purl_inputs()
advisories = list(pipeline.collect_advisories())
util_tests.check_results_against_json(advisories[0].to_dict(), expected_file)


@mock.patch(
"vulnerabilities.pipelines.v2_importers.gitlab_live_importer.fetch_gitlab_advisories_for_purl"
)
def test_gitlab_importer_package_first_mode_none_found(mock_fetch):
mock_fetch.return_value = []
purl = PackageURL(type="pypi", name="flask", version="1.2")
pipeline = GitLabLiveImporterPipeline(purl=purl)
pipeline.get_purl_inputs()
advisories = list(pipeline.collect_advisories())
assert advisories == []
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"advisory_id": "pypi/Flask/CVE-2019-1010083",
"aliases": ["CVE-2019-1010083"],
"summary": "Denial of service\nDenial of Service due to unexpected memory usage in the Pallets Project Flask",
"affected_packages": [
{
"package": {
"type": "pypi",
"namespace": "",
"name": "flask",
"version": "",
"qualifiers": "",
"subpath": ""
},
"affected_version_range": "vers:pypi/<1.0",
"fixed_version_range": "vers:pypi/1.0"
}
],
"references_v2": [
{
"reference_id": "CVE-2019-1010083",
"reference_type": "",
"url": "https://nvd.nist.gov/vuln/detail/CVE-2019-1010083"
},
{
"reference_id": "",
"reference_type": "",
"url": "https://www.palletsprojects.com/blog/flask-1-0-released/"
}
],
"severities": [],
"date_published": "2019-07-17T00:00:00+00:00",
"weaknesses": [1035, 937],
"url": "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/pypi/Flask/CVE-2019-1010083.yml"
}
Loading