Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ MarkupSafe==2.0.1
more-itertools==8.13.0
normality==2.3.3
packagedcode-msitools==0.101.210706
packageurl-python==0.9.9
packaging==21.3
packageurl-python==0.17.4
packaging==24.0
parameter-expansion-patched==0.3.1
patch==1.16
pdfminer-six==20220506
Expand Down
59 changes: 55 additions & 4 deletions src/fetchcode/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
from urllib.parse import urlparse

import requests
from packageurl.contrib import purl2url

from fetchcode.utils import _http_exists


class Response:
Expand Down Expand Up @@ -89,24 +92,72 @@ def fetch_ftp(url, location):
return resp


def resolve_purl(purl):
"""
Resolve a Package URL (PURL) to a download URL.

This function attempts to resolve the PURL using first purl2url library and
if that fails, it falls back to fetchcode's download_urls module.
"""
from fetchcode.download_urls import download_url as get_download_url_from_fetchcode

for resolver in (purl2url.get_download_url, get_download_url_from_fetchcode):
url = resolver(purl)
if url and _http_exists(url):
return url


def get_resolved_url(url, scheme):
resoltion_by_scheme = {
"pkg": resolve_url_from_purl,
}
resolution_handler = resoltion_by_scheme.get(scheme)
if not resolution_handler:
raise ValueError(f"Not a supported/known scheme: {scheme}")
url, scheme = resolution_handler(url)
return url, scheme


def resolve_url_from_purl(url):
"""
Resolve a Package URL (PURL) to a valid URL.
Raises ValueError if the PURL cannot be resolved.
"""
url = resolve_purl(url)
if not url:
raise ValueError("Could not resolve PURL to a valid URL.")
scheme = get_url_scheme(url)
return url, scheme


def get_url_scheme(url):
"""
Return the scheme of the given URL.
"""
url_parts = urlparse(url)
scheme = url_parts.scheme
return scheme


def fetch(url):
"""
Return a `Response` object built from fetching the content at the `url` URL string and
store content at a temporary file.
"""
scheme = get_url_scheme(url)

if scheme in ["pkg"]:
url, scheme = get_resolved_url(url, scheme)

temp = tempfile.NamedTemporaryFile(delete=False)
location = temp.name

url_parts = urlparse(url)
scheme = url_parts.scheme

fetchers = {"ftp": fetch_ftp, "http": fetch_http, "https": fetch_http}

if scheme in fetchers:
return fetchers.get(scheme)(url, location)

raise Exception("Not a supported/known scheme.")
raise Exception(f"Not a supported/known scheme: {scheme}.")


def fetch_json_response(url):
Expand Down
21 changes: 21 additions & 0 deletions tests/test_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,24 @@ def test_fetch_with_scheme_not_present():
url = "abc://speedtest/1KB.zip"
response = fetch(url=url)
assert "Not a supported/known scheme." == e_info


@mock.patch("fetchcode.resolve_url_from_purl")
@mock.patch("fetchcode.fetch_http")
def test_fetch_purl(mock_fetch_http, mock_resolve):
mock_fetch_http.return_value = "mocked_purl_response"
mock_resolve.return_value = ("http://resolved.com/file.tar.gz", "http")

response = fetch("pkg:pypi/[email protected]")

assert response == "mocked_purl_response"
mock_resolve.assert_called_once()
mock_fetch_http.assert_called_once()


@mock.patch("fetchcode.get_url_scheme")
def test_fetch_unsupported_scheme(mock_get_scheme):
mock_get_scheme.return_value = "s3"

with pytest.raises(Exception, match="Not a supported/known scheme"):
fetch("s3://bucket/object")