Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/twyn/dependency_managers/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

9 changes: 9 additions & 0 deletions src/twyn/dependency_managers/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
UV_LOCK,
YARN_LOCK,
)
from twyn.trusted_packages.managers.base import TrustedPackagesProtocol
from twyn.trusted_packages.managers.trusted_npm_packages_manager import TrustedNpmPackageManager
from twyn.trusted_packages.managers.trusted_pypi_packages_manager import TrustedPackages
from twyn.trusted_packages.references.base import AbstractPackageReference
from twyn.trusted_packages.references.top_npm_reference import TopNpmReference
from twyn.trusted_packages.references.top_pypi_reference import TopPyPiReference
Expand All @@ -30,6 +33,9 @@ class DependencyManager:
dependency_files: set[str]
"""Set of supported dependency file names."""

trusted_packages_manager: type[TrustedPackagesProtocol]
"""TrustedPackages class that will determine if there's a typo or not."""

def matches_dependency_file(self, dependency_file: str) -> bool:
"""Check if this manager can handle the given dependency file."""
return Path(dependency_file).name in self.dependency_files
Expand All @@ -43,13 +49,16 @@ def get_alternative_source(self, sources: dict[str, str]) -> str | None:
name="npm",
trusted_packages_source=TopNpmReference,
dependency_files={PACKAGE_LOCK_JSON, YARN_LOCK},
trusted_packages_manager=TrustedNpmPackageManager,
)
pypi_dependency_manager = DependencyManager(
name="pypi",
trusted_packages_source=TopPyPiReference,
dependency_files={UV_LOCK, POETRY_LOCK, REQUIREMENTS_TXT},
trusted_packages_manager=TrustedPackages,
)


DEPENDENCY_MANAGERS: list[DependencyManager] = [pypi_dependency_manager, npm_dependency_manager]
"""List of available dependency manager classes."""

Expand Down
8 changes: 4 additions & 4 deletions src/twyn/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
from twyn.similarity.algorithm import EditDistance, SimilarityThreshold
from twyn.trusted_packages.cache_handler import CacheHandler
from twyn.trusted_packages.exceptions import InvalidArgumentsError
from twyn.trusted_packages.managers.base import TrustedPackagesProtocol
from twyn.trusted_packages.models import (
TyposquatCheckResultEntry,
TyposquatCheckResultFromSource,
TyposquatCheckResults,
)
from twyn.trusted_packages.references.base import AbstractPackageReference
from twyn.trusted_packages.trusted_packages import TrustedPackages

logger = logging.getLogger("twyn")
logger.addHandler(logging.NullHandler())
Expand Down Expand Up @@ -134,7 +134,7 @@ def _analyze_dependencies_from_input(
dependency_manager = get_dependency_manager_from_name(package_ecosystem)
source = dependency_manager.get_alternative_source({"pypi": pypi_source, "npm": npm_source})
top_package_reference = dependency_manager.trusted_packages_source(source, maybe_cache_handler)
trusted_packages = TrustedPackages(
trusted_packages = dependency_manager.trusted_packages_manager(
names=top_package_reference.get_packages(),
algorithm=EditDistance(),
selector=selector_method,
Expand Down Expand Up @@ -177,7 +177,7 @@ def _analyze_packages_from_source(
top_package_reference = manager.trusted_packages_source(source, maybe_cache_handler)

packages_from_source = top_package_reference.get_packages()
trusted_packages = TrustedPackages(
trusted_packages = manager.trusted_packages_manager(
names=packages_from_source,
algorithm=EditDistance(),
selector=selector_method,
Expand All @@ -200,7 +200,7 @@ def _analyze_packages_from_source(

def _analyze_dependencies(
top_package_reference: AbstractPackageReference,
trusted_packages: TrustedPackages,
trusted_packages: TrustedPackagesProtocol,
packages: set[str],
allowlist: set[str],
show_progress_bar: bool,
Expand Down
5 changes: 3 additions & 2 deletions src/twyn/trusted_packages/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from twyn.trusted_packages.managers.trusted_npm_packages_manager import TrustedNpmPackageManager
from twyn.trusted_packages.managers.trusted_pypi_packages_manager import TrustedPackages
from twyn.trusted_packages.references.top_npm_reference import TopNpmReference
from twyn.trusted_packages.references.top_pypi_reference import TopPyPiReference
from twyn.trusted_packages.trusted_packages import TrustedPackages

__all__ = ["TopPyPiReference", "TrustedPackages", "TopNpmReference"]
__all__ = ["TopPyPiReference", "TopNpmReference", "TrustedPackages", "TrustedNpmPackageManager"]
Empty file.
13 changes: 13 additions & 0 deletions src/twyn/trusted_packages/managers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from collections import defaultdict
from typing import Any, Protocol

from twyn.trusted_packages.models import TyposquatCheckResultEntry

OrderedPackages = defaultdict[str, set[str]]
"""Type alias for mapping package names by ecosystem."""


class TrustedPackagesProtocol(Protocol):
def __contains__(self, obj: Any) -> bool: ...

def get_typosquat(self, package_name: str) -> TyposquatCheckResultEntry: ...
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from collections import defaultdict
from typing import Any

from twyn.similarity.algorithm import (
AbstractSimilarityAlgorithm,
SimilarityThreshold,
)
from twyn.trusted_packages.managers.base import OrderedPackages
from twyn.trusted_packages.models import TyposquatCheckResultEntry
from twyn.trusted_packages.selectors import AbstractSelector


class TrustedNpmPackageManager:
"""Representation of namespaces that can be trusted."""

def __init__(
self,
names: set[str],
algorithm: AbstractSimilarityAlgorithm,
selector: AbstractSelector,
threshold_class: type[SimilarityThreshold],
) -> None:
self.packages, self.namespaces = self._create_names_dictionary(names)

self.threshold_class = threshold_class
self.selector = selector
self.algorithm = algorithm

def __contains__(self, obj: Any) -> bool:
"""Check if an object exists in the trusted namespaces."""
if isinstance(obj, str):
return obj in self.packages[obj[0]] or obj in self.namespaces
return False

def _create_names_dictionary(self, names: set[str]) -> tuple[OrderedPackages, OrderedPackages]:
"""Create a dictionary which will group all packages that start with the same letter under the same key."""
first_letter_names: OrderedPackages = defaultdict(set)
namespaces: OrderedPackages = defaultdict(set)
for name in names:
if name.startswith("@"):
namespace, dependency = name.split("/")
namespaces[namespace].add(dependency)
else:
first_letter_names[name[0]].add(name)
return first_letter_names, namespaces

def _get_typosquats_from_namespace_dependency(self, package_name: str) -> TyposquatCheckResultEntry:
namespace, dependency = package_name.split("/")
threshold = self.threshold_class.from_name(namespace)
typosquat_result = TyposquatCheckResultEntry(dependency=package_name)
for trusted_namespace_name in self.selector.select_similar_names(
names={"@": self.namespaces.keys()}, name=namespace
):
distance = self.algorithm.get_distance(namespace, trusted_namespace_name)
if threshold.is_inside_threshold(distance) and dependency in self.namespaces[trusted_namespace_name]:
typosquat_result.add(f"{trusted_namespace_name}/{dependency}")
return typosquat_result

def _get_typosquats_from_dependency(self, package_name: str) -> TyposquatCheckResultEntry:
threshold = self.threshold_class.from_name(package_name)
typosquat_result = TyposquatCheckResultEntry(dependency=package_name)
for trusted_package_name in self.selector.select_similar_names(names=self.packages, name=package_name):
distance = self.algorithm.get_distance(package_name, trusted_package_name)
if threshold.is_inside_threshold(distance):
typosquat_result.add(trusted_package_name)
return typosquat_result

def get_typosquat(self, package_name: str) -> TyposquatCheckResultEntry:
"""Check if a given package name is similar to any trusted package and returns it.

Only if there is a match on the first letter can a package name be
considered similar to another one. The algorithm provided and the threshold
are used to determine if the package name can be considered similar.
"""
if package_name.startswith("@"):
return self._get_typosquats_from_namespace_dependency(package_name)
return self._get_typosquats_from_dependency(package_name)
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
AbstractSimilarityAlgorithm,
SimilarityThreshold,
)
from twyn.trusted_packages.managers.base import OrderedPackages
from twyn.trusted_packages.models import TyposquatCheckResultEntry
from twyn.trusted_packages.selectors import AbstractSelector

_PackageNames = defaultdict[str, set[str]]
"""Type alias for mapping package names by ecosystem."""


class TrustedPackages:
"""Representation of packages that can be trusted."""
Expand All @@ -22,7 +20,7 @@ def __init__(
selector: AbstractSelector,
threshold_class: type[SimilarityThreshold],
) -> None:
self.names: _PackageNames = self._create_names_dictionary(names)
self.names = self._create_names_dictionary(names)
self.threshold_class = threshold_class
self.selector = selector
self.algorithm = algorithm
Expand All @@ -34,17 +32,14 @@ def __contains__(self, obj: Any) -> bool:
return False

@staticmethod
def _create_names_dictionary(names: set[str]) -> _PackageNames:
def _create_names_dictionary(names: set[str]) -> OrderedPackages:
"""Create a dictionary which will group all packages that start with the same letter under the same key."""
first_letter_names = defaultdict(set)
for name in names:
first_letter_names[name[0]].add(name)
return first_letter_names

def get_typosquat(
self,
package_name: str,
) -> TyposquatCheckResultEntry:
def get_typosquat(self, package_name: str) -> TyposquatCheckResultEntry:
"""Check if a given package name is similar to any trusted package and returns it.

Only if there is a match on the first letter can a package name be
Expand Down
38 changes: 34 additions & 4 deletions src/twyn/trusted_packages/references/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
from abc import abstractmethod
from collections.abc import Iterator
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any

Expand All @@ -14,6 +16,35 @@
logger = logging.getLogger("twyn")


@dataclass
class NormalizedPackages:
packages: set[str]
namespaces: dict[str, set[str]] | None = None
_raw_namespaces: set[str] = field(default_factory=set)

def __post__init__(self) -> None:
if self.namespaces:
for namespace in self.namespaces:
for package_name in self.namespaces[namespace]:
self._raw_namespaces.add(f"{namespace}/{package_name}")

def __iter__(self) -> Iterator[str]:
yield from self.packages

if not self.namespaces:
return

for namespace in self.namespaces:
for package_name in self.namespaces[namespace]:
yield f"{namespace}/{package_name}"

def __contains__(self, value: str) -> bool:
if not isinstance(value, str):
return False

return value in self.packages or value in self._raw_namespaces


class AbstractPackageReference:
"""Represents a reference from where to retrieve trusted packages.

Expand All @@ -32,7 +63,7 @@ def __init__(self, source: str | None = None, cache_handler: CacheHandler | None

@staticmethod
@abstractmethod
def normalize_packages(packages: set[str]) -> set[str]:
def normalize_packages(packages: set[str]) -> NormalizedPackages:
"""Normalize package names to make sure they're valid within the package manager context."""

def _download(self) -> dict[str, Any]:
Expand Down Expand Up @@ -64,7 +95,7 @@ def _get_packages_from_cache_if_enabled(self) -> set[str]:

return cache_entry.packages

def get_packages(self) -> set[str]:
def get_packages(self) -> NormalizedPackages:
"""Download and parse online source of top packages from the package ecosystem."""
packages = self._get_packages_from_cache_if_enabled()
# we don't save the cache here, we keep it as it is so the date remains the original one.
Expand All @@ -84,5 +115,4 @@ def get_packages(self) -> set[str]:
# New packages were downloaded, we create a new entry updating all values.
self._save_trusted_packages_to_cache_if_enabled(packages)

normalized_packages = self.normalize_packages(packages)
return normalized_packages
return self.normalize_packages(packages)
25 changes: 19 additions & 6 deletions src/twyn/trusted_packages/references/top_npm_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from twyn.trusted_packages.exceptions import (
PackageNormalizingError,
)
from twyn.trusted_packages.references.base import AbstractPackageReference
from twyn.trusted_packages.references.base import AbstractPackageReference, NormalizedPackages

logger = logging.getLogger("twyn")

Expand All @@ -21,15 +21,28 @@ class TopNpmReference(AbstractPackageReference):

@override
@staticmethod
def normalize_packages(packages: set[str]) -> set[str]:
def normalize_packages(packages: set[str]) -> NormalizedPackages:
"""Normalize dependency names according to npm."""
if not packages:
logger.debug("Tried to normalize packages, but none were provided")
return set()
return NormalizedPackages(packages=set())

# Extract namespaces from package names
package_pattern = re.compile(r"^[a-z0-9-~][a-z0-9-._~]*$") # noqa: F821
namespace_pattern = re.compile(r"^(?:@[a-z0-9-~][a-z0-9-._~]*)\/[a-z0-9-~][a-z0-9-._~]*$") # noqa: F821

extracted_namespaces: dict[str, set[str]] = {}
regular_packages = set()

pattern = re.compile(r"^(?:@[a-z0-9-~][a-z0-9-._~]*\/)?[a-z0-9-~][a-z0-9-._~]*$") # noqa: F821
for package in packages:
if not pattern.match(package.lower()):
if namespace_pattern.match(package.lower()):
namespace, namespace_package = package.split("/")
if namespace not in extracted_namespaces:
extracted_namespaces[namespace] = set()
extracted_namespaces[namespace].add(namespace_package)
elif package_pattern.match(package.lower()):
regular_packages.add(package)
else:
raise PackageNormalizingError(f"Package name '{package}' does not match required pattern")

return packages
return NormalizedPackages(packages=regular_packages, namespaces=extracted_namespaces)
8 changes: 4 additions & 4 deletions src/twyn/trusted_packages/references/top_pypi_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from twyn.trusted_packages.exceptions import (
PackageNormalizingError,
)
from twyn.trusted_packages.references.base import AbstractPackageReference
from twyn.trusted_packages.references.base import AbstractPackageReference, NormalizedPackages

logger = logging.getLogger("twyn")

Expand All @@ -21,16 +21,16 @@ class TopPyPiReference(AbstractPackageReference):

@override
@staticmethod
def normalize_packages(packages: set[str]) -> set[str]:
def normalize_packages(packages: set[str]) -> NormalizedPackages:
"""Normalize dependency names according to PyPi https://packaging.python.org/en/latest/specifications/name-normalization/."""
if not packages:
logger.debug("Tried to normalize packages, but none were provided")
return set()
return NormalizedPackages(packages=set())
renamed_packages = {re.sub(r"[-_.]+", "-", name).lower() for name in packages}

pattern = re.compile(r"^([a-z0-9]|[a-z0-9][a-z0-9._-]*[a-z0-9])\Z") # noqa: F821
for package in renamed_packages:
if not pattern.match(package):
raise PackageNormalizingError(f"Package name '{package}' does not match required pattern")

return renamed_packages
return NormalizedPackages(packages=renamed_packages)
Loading
Loading