diff --git a/Pipfile b/Pipfile index b5b66216..7cf64280 100644 --- a/Pipfile +++ b/Pipfile @@ -5,6 +5,7 @@ verify_ssl = true [packages] pygitguardian = { editable = true, path = "." } +strenum = "*" [dev-packages] black = "==22.3.0" diff --git a/changelog.d/20231229_172424_james.sinclair_add_incidents.md b/changelog.d/20231229_172424_james.sinclair_add_incidents.md new file mode 100644 index 00000000..e2593c1a --- /dev/null +++ b/changelog.d/20231229_172424_james.sinclair_add_incidents.md @@ -0,0 +1,3 @@ +### Added + +- Added support for Secrets Incident API endpoints. diff --git a/pygitguardian/client.py b/pygitguardian/client.py index c86ff171..9ebe28f6 100644 --- a/pygitguardian/client.py +++ b/pygitguardian/client.py @@ -4,12 +4,13 @@ import tarfile import time import urllib.parse +from datetime import datetime from io import BytesIO from pathlib import Path -from typing import Any, Dict, List, Optional, Union, cast +from typing import Any, Dict, Generator, List, Optional, Union, cast import requests -from requests import Response, Session, codes +from requests import Response, Session from .config import DEFAULT_API_VERSION, DEFAULT_BASE_URI, DEFAULT_TIMEOUT from .iac_models import ( @@ -18,6 +19,21 @@ IaCScanParametersSchema, IaCScanResult, ) +from .incident_models import ( + Incident, + IncidentIdOrIncident, + ListIncidentResult, + SharedIncidentDetails, +) +from .incident_models.constants import ( + IncidentIgnoreReason, + IncidentOrdering, + IncidentPermission, + IncidentSeverity, + IncidentStatus, + IncidentTag, + IncidentValidity, +) from .models import ( Detail, Document, @@ -38,6 +54,14 @@ SCAScanDiffOutput, SCAScanParameters, ) +from .utils.response import ( + is_create_ok, + is_ok, + load_detail, + load_incident_response, + load_no_content_response, +) +from .utils.tools import dict_filter_none, ensure_mutually_exclusive logger = logging.getLogger(__name__) @@ -63,46 +87,6 @@ class Versions: VERSIONS = Versions() -def load_detail(resp: Response) -> Detail: - """ - load_detail loads a Detail from a response - be it JSON or html. - - :param resp: API response - :type resp: Response - :return: detail object of response - :rtype: Detail - """ - if resp.headers["content-type"] == "application/json": - data = resp.json() - else: - data = {"detail": resp.text} - - return Detail.from_dict(data) - - -def is_ok(resp: Response) -> bool: - """ - is_ok returns True is the API responded with 200 - and the content type is JSON. - """ - return ( - resp.headers["content-type"] == "application/json" - and resp.status_code == codes.ok - ) - - -def is_create_ok(resp: Response) -> bool: - """ - is_create_ok returns True if the API returns code 201 - and the content type is JSON. - """ - return ( - resp.headers["content-type"] == "application/json" - and resp.status_code == codes.created - ) - - def _create_tar(root_path: Path, filenames: List[str]) -> bytes: """ :param root_path: the root_path from which the tar is created @@ -195,12 +179,18 @@ def __init__( def request( self, method: str, - endpoint: str, + endpoint: Optional[str] = None, version: Optional[str] = DEFAULT_API_VERSION, + url: Optional[str] = None, extra_headers: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> Response: - url = self._url_from_endpoint(endpoint, version) + if endpoint is not None: + _url = self._url_from_endpoint(endpoint, version) + elif str is not None: + _url = cast(str, url) + else: + raise ValueError("Request error: 'ednpoint' and 'url' cannot both be None") headers = ( {**self.session.headers, **extra_headers} @@ -209,7 +199,7 @@ def request( ) start = time.time() response: Response = self.session.request( - method=method, url=url, timeout=self.timeout, headers=headers, **kwargs + method=method, url=_url, timeout=self.timeout, headers=headers, **kwargs ) duration = time.time() - start logger.debug( @@ -291,6 +281,23 @@ def post( **kwargs, ) + def patch( + self, + endpoint: str, + data: Optional[Dict[str, str]] = None, + version: str = DEFAULT_API_VERSION, + extra_headers: Optional[Dict[str, str]] = None, + **kwargs: Any, + ) -> Response: + return self.request( + "patch", + endpoint=endpoint, + json=data, + version=version, + extra_headers=extra_headers, + **kwargs, + ) + def health_check(self) -> HealthCheckResponse: """ health_check handles the /health endpoint of the API @@ -699,3 +706,359 @@ def scan_diff( result = load_detail(response) result.status_code = response.status_code return result + + # Incidents management + def list_secret_incidents( + self, + per_page: Optional[int] = None, + date_before: Optional[datetime] = None, + date_after: Optional[datetime] = None, + assignee_email: Optional[str] = None, + assignee_id: Optional[int] = None, + status: Optional[IncidentStatus] = None, + severity: Optional[IncidentSeverity] = None, + validity: Optional[IncidentValidity] = None, + tags: Optional[List[IncidentTag]] = None, + ordering: Optional[IncidentOrdering] = None, + detector_group_name: Optional[str] = None, + ignorer_id: Optional[int] = None, + ignorer_api_token_id: Optional[str] = None, + resolver_id: Optional[int] = None, + resolver_api_token_id: Optional[str] = None, + extra_headers: Optional[Dict[str, str]] = None, + _url: Optional[str] = None, + ) -> Union[Detail, ListIncidentResult]: + """ + List secret incidents detected by the GitGuardian dashboard. + Occurrences are not returned by this method. + """ + if _url is not None: + resp = self.request( + method="get", + url=_url, + extra_headers=extra_headers, + ) + else: + params = dict_filter_none( + { + "per_page": per_page, + "date_before": date_before, + "date_after": date_after, + "assignee_email": assignee_email, + "assignee_id": assignee_id, + "status": status, + "severity": severity, + "validity": validity, + "tags": tags, + "ordering": ordering, + "detector_group_name": detector_group_name, + "ignorer_id": ignorer_id, + "ignorer_api_token_id": ignorer_api_token_id, + "resolver_id": resolver_id, + "resolver_api_token_id": resolver_api_token_id, + } + ) + + resp = self.get( + endpoint="incidents/secrets", + extra_headers=extra_headers, + params=params, + ) + + obj: Union[Detail, ListIncidentResult] + if is_ok(resp): + obj = ListIncidentResult.from_dict( + { + "incidents": resp.json(), + "links": resp.links, + } + ) + else: + obj = load_detail(resp) # pragma: no cover + + obj.status_code = resp.status_code + + return obj + + def iter_incidents(self, **kwargs: Any) -> Generator[Incident, None, None]: + page = self.list_secret_incidents(**kwargs) + if "extra_headers" in kwargs: + extra_headers = {"extra_headers": kwargs["extra_headers"]} + else: + extra_headers = {} + while True: + if isinstance(page, Detail): + raise Exception( + "Received an error response before iteration completed." + ) + if len(page.incidents) == 0: + break + yield from page.incidents + if page.links is None: + break # pragma: no cover + if page.links is not None and page.links.next is None: + break + if page.links is not None and page.links.next is not None: + page = self.list_secret_incidents( + _url=page.links.next.url, + **extra_headers, + ) + + def get_secret_incident( + self, + incident_id: int, + with_occurrences: Optional[int] = None, + extra_headers: Optional[Dict[str, str]] = None, + ) -> Union[Detail, Incident]: + """ + Retrieve secret incident detected by the GitGuardian dashboard with or + without its occurrences. + """ + params = dict_filter_none({"with_occurrences": with_occurrences}) + return load_incident_response( + self.get( + endpoint=f"incidents/secrets/{int(incident_id)}", + extra_headers=extra_headers, + params=params, + ) + ) + + def update_incident_severity( + self, + incident_id: IncidentIdOrIncident, + severity: IncidentSeverity, + extra_headers: Optional[Dict[str, str]] = None, + ) -> Union[Detail, Incident]: + """ + Set a secret incident's severity. + """ + if isinstance(incident_id, Incident): + incident_id = incident_id.id + + return load_incident_response( + self.patch( + endpoint=f"incidents/secrets/{int(incident_id)}", + extra_headers=extra_headers, + data={"severity": severity}, + ) + ) + + def assign_incident( + self, + incident_id: IncidentIdOrIncident, + email: Optional[str] = None, + member_id: Optional[int] = None, + extra_headers: Optional[Dict[str, str]] = None, + ) -> Union[Detail, Incident]: + """ + Assign s secret incident detected by the GitGuardian dashboard to a + workspace member by email or member ID. + """ + ensure_mutually_exclusive( + "You must supply 'email' or 'member_id', but not both.", + email, + member_id, + ) + + return load_incident_response( + self.post( + endpoint=f"incidents/secrets/{int(incident_id)}/assign", + extra_headers=extra_headers, + params=dict_filter_none({"email": email, "member_id": member_id}), + ) + ) + + def unassign_incident( + self, + incident_id: IncidentIdOrIncident, + extra_headers: Optional[Dict[str, str]] = None, + ) -> Union[Detail, Incident]: + """ + Unassign a secret incident previously assigned to a workspace member. + """ + return load_incident_response( + self.post( + endpoint=f"incidents/secrets/{int(incident_id)}/unassign", + extra_headers=extra_headers, + ) + ) + + def resolve_incident( + self, + incident_id: IncidentIdOrIncident, + secret_revoked: bool, + extra_headers: Optional[Dict[str, str]] = None, + ) -> Union[Detail, Incident]: + """ + Resolve a secret incident detected by the GitGuardian dashboard and + specicy whether or not the secret was revoked. + """ + return load_incident_response( + self.post( + endpoint=f"incidents/secrets/{int(incident_id)}/resolve", + extra_headers=extra_headers, + params={"secret_revoked": secret_revoked}, + ) + ) + + def ignore_incident( + self, + incident_id: IncidentIdOrIncident, + ignore_reason: IncidentIgnoreReason, + extra_headers: Optional[Dict[str, str]] = None, + ) -> Union[Detail, Incident]: + """ + Ignore a secret incident detected by the GitGuardian dashboard and + specicy whether it is a test credential, a false positive or a low risk + secret. + """ + return load_incident_response( + self.post( + endpoint=f"incidents/secrets/{int(incident_id)}/ignore", + extra_headers=extra_headers, + params={"ignore_reason": ignore_reason}, + ) + ) + + def reopen_incident( + self, + incident_id: IncidentIdOrIncident, + extra_headers: Optional[Dict[str, str]] = None, + ) -> Union[Detail, Incident]: + """ + Reopen a secret incident detected by the GitGuardian dashboard that was + previously resolved or ignored. + """ + return load_incident_response( + self.post( + endpoint=f"incidents/secrets/{int(incident_id)}/reopen", + extra_headers=extra_headers, + ) + ) + + def share_incident( + self, + incident_id: IncidentIdOrIncident, + auto_healing: Optional[bool] = None, + feedback_collection: Optional[bool] = None, + lifespan: Optional[int] = None, + extra_headers: Optional[Dict[str, str]] = None, + ) -> Union[Detail, SharedIncidentDetails]: + """ + Share a secret incident by creating a public link that expires. + Optionally, allow someone with the link to resolve/ignore the secret + and/or leave feedback about the secret. + """ + resp = self.post( + endpoint=f"incidents/secrets/{int(incident_id)}/share", + extra_headers=extra_headers, + params=dict_filter_none( + { + "auto_healing": auto_healing, + "feedback_collection": feedback_collection, + "lifespan": lifespan, + } + ), + ) + + obj: Union[Detail, SharedIncidentDetails] + if is_ok(resp): + obj = SharedIncidentDetails.from_dict(resp.json()) + else: # pragma: no cover + obj = load_detail(resp) + + obj.status_code = resp.status_code + + return obj + + def unshare_incident( + self, + incident_id: IncidentIdOrIncident, + extra_headers: Optional[Dict[str, str]] = None, + ) -> Union[Detail, bool]: + """ + Unshare a secret incident by revoking its public link before it + expires. + """ + return load_no_content_response( + self.post( + endpoint=f"incidents/secrets/{int(incident_id)}/unshare", + extra_headers=extra_headers, + ) + ) + + def grant_access_to_incident( + self, + incident_id: IncidentIdOrIncident, + incident_permission: IncidentPermission, + email: Optional[str] = None, + member_id: Optional[int] = None, + invitation_id: Optional[int] = None, + team_id: Optional[int] = None, + extra_headers: Optional[Dict[str, str]] = None, + ) -> Union[Detail, bool]: + """ + Grant a user, an existing invitee or a team access to a secret + incident. + """ + ensure_mutually_exclusive( + "'email', 'member_id', 'invitation_id' and 'team_id' " + "are mutually exclusive--you can only provide one.", + email, + member_id, + invitation_id, + team_id, + ) + + return load_no_content_response( + self.post( + endpoint=f"incidents/secrets/{int(incident_id)}/grant_access", + extra_headers=extra_headers, + params=dict_filter_none( + { + "email": email, + "member_id": member_id, + "invitation_id": invitation_id, + "team_id": team_id, + "incident_permission": incident_permission, + } + ), + ) + ) + + def revoke_access_to_incident( + self, + incident_id: IncidentIdOrIncident, + email: Optional[str] = None, + member_id: Optional[int] = None, + invitation_id: Optional[int] = None, + team_id: Optional[int] = None, + extra_headers: Optional[Dict[str, str]] = None, + ) -> Union[Detail, bool]: + """ + Revoke access of a user, an existing invitee or a team to a secret + incident. + """ + ensure_mutually_exclusive( + "'email', 'member_id', 'invitation_id' and 'team_id' " + "are mutually exclusive--you can only provide one.", + email, + member_id, + invitation_id, + team_id, + ) + + return load_no_content_response( + self.post( + endpoint=f"incidents/secrets/{int(incident_id)}/revoke_access", + extra_headers=extra_headers, + params=dict_filter_none( + { + "email": email, + "member_id": member_id, + "invitation_id": invitation_id, + "team_id": team_id, + } + ), + ) + ) diff --git a/pygitguardian/incident_models/__init__.py b/pygitguardian/incident_models/__init__.py new file mode 100644 index 00000000..4fedb365 --- /dev/null +++ b/pygitguardian/incident_models/__init__.py @@ -0,0 +1,138 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import List, Optional, Type, Union, cast + +from marshmallow_dataclass import class_schema + +from pygitguardian.incident_models.constants import ( + IncidentIgnoreReason, + IncidentSeverity, + IncidentStatus, + IncidentTag, + IncidentValidity, + OccurrenceKind, + OccurrencePresence, +) +from pygitguardian.models import Base, BaseSchema, FromDictMixin +from pygitguardian.source_models import Source + + +@dataclass +class Detector(Base, FromDictMixin): + name: str + display_name: str + nature: str + family: str + detector_group_name: str + detector_group_display_name: str + + +DetectorSchema = cast(Type[BaseSchema], class_schema(Detector, BaseSchema)) +Detector.SCHEMA = DetectorSchema() + + +@dataclass +class Match(Base, FromDictMixin): + name: str + indice_start: int + indice_end: int + pre_line_start: Optional[int] + pre_line_end: Optional[int] + post_line_start: Optional[int] + post_line_end: Optional[int] + + +MatchSchema = cast(Type[BaseSchema], class_schema(Match, BaseSchema)) +Match.SCHEMA = MatchSchema() + + +@dataclass +class Occurrence(Base, FromDictMixin): + id: int + incident_id: int + kind: OccurrenceKind = field(metadata={"by_value": True}) + sha: str + source: Source + author_name: str + author_info: str + date: datetime + presence: OccurrencePresence = field(metadata={"by_value": True}) + url: str + matches: List[Match] + filepath: str + + +OccurrenceSchema = cast(Type[BaseSchema], class_schema(Occurrence, BaseSchema)) +Occurrence.SCHEMA = OccurrenceSchema() + + +@dataclass +class Incident(Base, FromDictMixin): + id: int + date: datetime + detector: Detector + secret_hash: str + gitguardian_url: str + regression: bool + status: IncidentStatus = field(metadata={"by_value": True}) + assignee_email: Optional[str] + occurrences_count: int + occurrences: Optional[List[Occurrence]] + ignore_reason: Optional[IncidentIgnoreReason] = field(metadata={"by_value": True}) + ignored_at: Optional[datetime] + secret_revoked: bool + severity: IncidentSeverity = field(metadata={"by_value": True}) + validity: IncidentValidity = field(metadata={"by_value": True}) + resolved_at: Optional[datetime] + share_url: Optional[str] + tags: List[IncidentTag] = field(metadata={"by_value": True}) + + def __int__(self): + return self.id + + +IncidentSchema = cast(Type[BaseSchema], class_schema(Incident, BaseSchema)) +Incident.SCHEMA = IncidentSchema() + + +@dataclass +class Link: + url: str + rel: str + + +@dataclass +class Links: + next: Optional[Link] + prev: Optional[Link] + + +@dataclass +class ListIncidentResult(Base, FromDictMixin): + incidents: List[Incident] + links: Optional[Links] = None + + +ListIncidentResultSchema = cast( + Type[BaseSchema], class_schema(ListIncidentResult, BaseSchema) +) +ListIncidentResult.SCHEMA = ListIncidentResultSchema() + + +@dataclass +class SharedIncidentDetails(Base, FromDictMixin): + incident_id: int + share_url: str + feedback_collection: bool + auto_healing: bool + token: str + expire_at: Optional[datetime] = None + revoked_at: Optional[datetime] = None + + +SharedIncidentDetailsSchema = cast( + Type[BaseSchema], class_schema(SharedIncidentDetails, BaseSchema) +) +SharedIncidentDetails.SCHEMA = SharedIncidentDetailsSchema() + +IncidentIdOrIncident = Union[int, Incident] diff --git a/pygitguardian/incident_models/constants.py b/pygitguardian/incident_models/constants.py new file mode 100644 index 00000000..98655fb6 --- /dev/null +++ b/pygitguardian/incident_models/constants.py @@ -0,0 +1,70 @@ +from enum import auto + +from strenum import LowercaseStrEnum, MacroCaseStrEnum, SnakeCaseStrEnum, StrEnum + + +class IncidentIgnoreReason(SnakeCaseStrEnum): + TEST_CREDENTIAL = auto() + FALSE_POSITIVE = auto() + LOW_RISK = auto() + + +class IncidentOrdering(StrEnum): + DATE_ASC = "date" + DATE_DESC = "-date" + RESOLVED_AT_ASC = "resolved_at" + RESOLVED_AT_DESC = "-resolved_at" + IGNORED_AT_ASC = "ignored_at" + IGNORED_AT_DESC = "-ignored_at" + + +class IncidentPermission(LowercaseStrEnum): + CAN_VIEW = auto() + CAN_EDIT = auto() + FULL_ACCESS = auto() + + +class IncidentSeverity(SnakeCaseStrEnum): + CRITICAL = auto() + HIGH = auto() + MEDIUM = auto() + LOW = auto() + INFO = auto() + UNKNOWN = auto() + + +class IncidentStatus(MacroCaseStrEnum): + IGNORED = auto() + TRIGGERED = auto() + ASSIGNED = auto() + RESOLVED = auto() + + +class IncidentTag(MacroCaseStrEnum): + DEFAULT_BRANCH = auto() + FROM_HISTORICAL_SCAN = auto() + IGNORED_IN_CHECK_RUN = auto() + PUBLIC = auto() + PUBLICLY_EXPOSED = auto() + PUBLICLY_LEAKED = auto() + REGRESSION = auto() + SENSITIVE_FILE = auto() + TEST_FILE = auto() + + +class IncidentValidity(SnakeCaseStrEnum): + VALID = auto() + INVALID = auto() + FAILED_TO_CHECK = auto() + NO_CHECKER = auto() + UNKNOWN = auto() + + +class OccurrenceKind(LowercaseStrEnum): + REALTIME = auto() + HISTORICAL = auto() + + +class OccurrencePresence(SnakeCaseStrEnum): + present = auto() + removed = auto() diff --git a/pygitguardian/source_models.py b/pygitguardian/source_models.py new file mode 100644 index 00000000..59d313bd --- /dev/null +++ b/pygitguardian/source_models.py @@ -0,0 +1,50 @@ +from dataclasses import dataclass, field +from datetime import datetime +from enum import auto + +from marshmallow_dataclass import class_schema +from strenum import SnakeCaseStrEnum + +from pygitguardian.models import Base, BaseSchema + + +class ScanStatus(SnakeCaseStrEnum): + PENDING = auto() + RUNNING = auto() + CANCELED = auto() + FAILED = auto() + TOO_LARGE = auto() + TIMEOUT = auto() + FINISHED = auto() + + +@dataclass +class Scan(Base): + date: datetime + status: ScanStatus = field(metadata={"by_value": True}) + + +ScanSchema = class_schema(Scan, BaseSchema) + + +class SourceHealth(SnakeCaseStrEnum): + SAFE = auto() + UNKNOWN = auto() + AT_RISK = auto() + + +@dataclass +class Source(Base): + id: int + url: str + type: str # TODO: Reserved word + full_name: str + health: SourceHealth = field(metadata={"by_value": True}) + open_incidents_count: int # TODO: Type documented as "number" - what's the difference? + closed_incidents_count: int # TODO: Also "number" + visibility: str # TODO: Really? str + external_id: str + last_scan: Scan + + +SourceSchema = class_schema(Source, BaseSchema) diff --git a/pygitguardian/utils/__init__.py b/pygitguardian/utils/__init__.py new file mode 100644 index 00000000..de873ee2 --- /dev/null +++ b/pygitguardian/utils/__init__.py @@ -0,0 +1 @@ +"""Various utility functions""" diff --git a/pygitguardian/utils/response.py b/pygitguardian/utils/response.py new file mode 100644 index 00000000..47d1dccb --- /dev/null +++ b/pygitguardian/utils/response.py @@ -0,0 +1,72 @@ +from typing import Union + +from requests import Response, codes + +from ..incident_models import Incident +from ..models import Detail + + +def load_detail(resp: Response) -> Detail: + """ + load_detail loads a Detail from a response + be it JSON or html. + + :param resp: API response + :type resp: Response + :return: detail object of response + :rtype: Detail + """ + if resp.headers["content-type"] == "application/json": + data = resp.json() + else: + data = {"detail": resp.text} + + return Detail.from_dict(data) + + +def is_ok(resp: Response) -> bool: + """ + is_ok returns True is the API responded with 200 + and the content type is JSON. + """ + return ( + resp.headers["content-type"] == "application/json" + and resp.status_code == codes.ok + ) + + +def is_create_ok(resp: Response) -> bool: + """ + is_create_ok returns True if the API returns code 201 + and the content type is JSON. + """ + return ( + resp.headers["content-type"] == "application/json" + and resp.status_code == codes.created + ) + + +def load_incident_response( + incident_response: Response, +) -> Union[Detail, Incident]: + obj: Union[Detail, Incident] + if is_ok(incident_response): + obj = Incident.from_dict(incident_response.json()) + else: + obj = load_detail(incident_response) + + obj.status_code = incident_response.status_code + + return obj + + +def load_no_content_response( + response: Response, +) -> Union[Detail, bool]: + if response.status_code == codes.no_content: + return True + obj = load_detail(response) + + obj.status_code = response.status_code + + return obj diff --git a/pygitguardian/utils/tools.py b/pygitguardian/utils/tools.py new file mode 100644 index 00000000..dfeef5bb --- /dev/null +++ b/pygitguardian/utils/tools.py @@ -0,0 +1,20 @@ +from typing import Any, Mapping, Sequence + + +def dict_filter_none(dct: Mapping[Any, Any]) -> dict: + """Filter a dict to remove all items where the value is None""" + return {k: v for k, v in dct.items() if v is not None} + + +def count_not_none(seq: Sequence[Any]) -> int: + """Count the number of None values in a sequence""" + return sum(0 if v is None else 1 for v in seq) + + +def ensure_mutually_exclusive(msg: str, *seq: Any) -> None: + """ + Ensure only one value in a sequence is None. + Raises ValueError with a supplied error message if more than one None value is found. + """ + if count_not_none(seq) > 1: + raise ValueError(msg) diff --git a/setup.py b/setup.py index 22b9059f..06122cd8 100644 --- a/setup.py +++ b/setup.py @@ -34,8 +34,10 @@ def get_version() -> str: install_requires=[ "marshmallow>=3.5, <4", "requests>=2, <3", - "marshmallow-dataclass >=8.5.8, <8.6.0", + "marshmallow-dataclass[enum,union] >=8.5.8, <8.6.0", "typing-extensions", + "urllib3<2", + "strenum", ], include_package_data=True, zip_safe=True, diff --git a/tests/test_client_incidents.py b/tests/test_client_incidents.py new file mode 100644 index 00000000..d24b407b --- /dev/null +++ b/tests/test_client_incidents.py @@ -0,0 +1,520 @@ +import json +from collections import OrderedDict +from datetime import datetime +from uuid import uuid4 + +import pytest +import responses + +from pygitguardian import GGClient +from pygitguardian.incident_models import ( + Incident, + ListIncidentResult, + SharedIncidentDetails, +) +from pygitguardian.incident_models.constants import ( + IncidentPermission, + IncidentSeverity, + IncidentStatus, +) +from pygitguardian.models import Detail + + +def make_incident(idx): + return { + "id": idx, + "gitguardian_url": f"https://dashboard.gitguardian.com/workspace/0/incidents/{idx}", + "assignee_id": 0, + "assignee_email": "john.smith@example.com", + "date": "2020-10-29T13:19:59.005564Z", + "detector": { + "name": "aws_iam", + "display_name": "AWS Keys", + "nature": "specific", + "family": "Api", + "detector_group_name": "aws_iam", + "detector_group_display_name": "AWS Keys", + }, + "ignore_reason": None, + "ignored_at": None, + "ignorer_api_token_id": None, + "ignorer_id": None, + "occurrences": None, + "tags": ["PUBLIC"], + "occurrences_count": 2, + "regression": False, + "resolved_at": "2022-09-06T09:06:08.295495Z", + "resolver_api_token_id": None, + "resolver_id": None, + "secret_hash": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "secret_revoked": True, + "status": "RESOLVED", + "validity": "invalid", + "share_url": None, + "severity": "medium", + } + + +def make_occurrence(idx, incident_id): + return { + "author_info": "john.smith@example.com", + "author_name": "John Smith", + "date": "2020-10-20T01:19:28.788008Z", + "filepath": "app.py", + "id": idx, + "incident_id": incident_id, + "kind": "realtime", + "matches": [ + { + "name": "client_secret", + "indice_start": 150, + "indice_end": 169, + "pre_line_start": 4, + "pre_line_end": 4, + "post_line_start": None, + "post_line_end": None, + }, + { + "name": "client_id", + "indice_start": 49, + "indice_end": 134, + "pre_line_start": 3, + "pre_line_end": 3, + "post_line_start": None, + "post_line_end": None, + }, + ], + "presence": "removed", + "sha": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "location": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + "source": { + "id": 12367, + "url": "https://git.example.com/group/repo", + "type": "github", + "full_name": "group/repo", + "health": "at_risk", + "open_incidents_count": 7, + "closed_incidents_count": 6, + "visibility": "public", + "last_scan": {"date": "2020-09-24T09:06:39.257426Z", "status": "finished"}, + "external_id": "83385", + }, + "url": "https://git.example.com/group/repo/commit/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "tags": ["PUBLIC"], + } + + +def make_share_response(idx, feedback_collection=False): + token = str(uuid4()) + return { + "share_url": f"https://dashboard.gitguardian.com/share/incidents/{token}", + "incident_id": idx, + "feedback_collection": feedback_collection, + "auto_healing": False, + "token": token, + "expire_at": "2023-07-01T14:47:42.939558Z", + "revoked_at": None, + } + + +def make_incident_with_occurrences(idx, occurrences_count): + incident = make_incident(idx) + incident["occurrences_count"] = int(occurrences_count) + incident["occurrences"] = [ + make_occurrence(i, idx) for i in range(int(occurrences_count)) + ] + return incident + + +def test_list_secret_incidents(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.gitguardian.com/v1/incidents/secrets", + body=json.dumps([make_incident(i) for i in range(20)]), + status=200, + content_type="application/json", + ) + response = client.list_secret_incidents() + assert response.status_code == 200 + assert isinstance(response, ListIncidentResult) + assert isinstance(response.incidents, list) + assert len(response.incidents) == 20 + assert isinstance(response.incidents[0], Incident) + assert isinstance(response.to_dict(), OrderedDict) + response_json = response.to_json() + assert isinstance(response_json, str) + loaded_response = ListIncidentResult.SCHEMA.load(json.loads(response_json)) + assert loaded_response == response + + +def test_list_secret_incidents_10_per_page(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.gitguardian.com/v1/incidents/secrets", + match=[responses.matchers.query_param_matcher({"per_page": 10})], + body=json.dumps([make_incident(i) for i in range(10)]), + status=200, + content_type="application/json", + ) + response = client.list_secret_incidents(per_page=10) + assert response.status_code == 200 + assert isinstance(response, ListIncidentResult) + assert len(response.incidents) == 10 + print(response.links) + + +def test_list_secret_incidents_date_before(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.gitguardian.com/v1/incidents/secrets", + match=[ + responses.matchers.query_param_matcher( + {"date_before": "2022-12-31 00:00:00+00:00"} + ) + ], + body=json.dumps([make_incident(i) for i in range(10)]), + status=200, + content_type="application/json", + ) + response = client.list_secret_incidents( + date_before=datetime.fromisoformat("2022-12-31T00:00:00+00:00") + ) + assert response.status_code == 200 + + +def test_list_secret_incidents_date_after(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.gitguardian.com/v1/incidents/secrets", + match=[ + responses.matchers.query_param_matcher( + {"date_after": "2022-12-31 00:00:00+00:00"} + ) + ], + body=json.dumps([make_incident(i) for i in range(10)]), + status=200, + content_type="application/json", + ) + response = client.list_secret_incidents( + date_after=datetime.fromisoformat("2022-12-31T00:00:00+00:00") + ) + assert response.status_code == 200 + + +def test_list_secret_incidents_assignee_email(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.gitguardian.com/v1/incidents/secrets", + match=[ + responses.matchers.query_param_matcher( + {"assignee_email": "bruce-wayne-gg@protonmail.com"} + ) + ], + body=json.dumps([make_incident(i) for i in range(10)]), + status=200, + content_type="application/json", + ) + response = client.list_secret_incidents( + assignee_email="bruce-wayne-gg@protonmail.com" + ) + assert response.status_code == 200 + + +def test_list_secret_incidents_assignee_id(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.gitguardian.com/v1/incidents/secrets", + match=[responses.matchers.query_param_matcher({"assignee_id": "10"})], + body=json.dumps([make_incident(i) for i in range(10)]), + status=200, + content_type="application/json", + ) + response = client.list_secret_incidents(assignee_id=10) + assert response.status_code == 200 + + +def test_iter_incidents_all(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.gitguardian.com/v1/incidents/secrets", + match=[responses.matchers.query_param_matcher({"per_page": "10"})], + body=json.dumps([make_incident(i) for i in range(10)]), + status=200, + content_type="application/json", + ) + for idx, incident in enumerate(client.iter_incidents(per_page=10)): + assert isinstance(incident, Incident) + if idx > 30: + break + + +def test_iter_incidents_ignored(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.gitguardian.com/v1/incidents/secrets", + match=[ + responses.matchers.query_param_matcher( + {"per_page": "10", "status": "IGNORED"} + ) + ], + body=json.dumps([make_incident(i) for i in range(10)]), + status=200, + headers={ + "link": "; rel="next"' + }, + content_type="application/json", + ) + rsps.add( + responses.GET, + "https://api.gitguardian.com/v1/incidents/secrets", + match=[ + responses.matchers.query_param_matcher( + {"per_page": "10", "status": "IGNORED", "cursor": "cD0yMTU0NQ=="} + ) + ], + body=json.dumps([make_incident(i) for i in range(10)]), + status=200, + headers={ + "link": "; rel="next"' + }, + content_type="application/json", + ) + rsps.add( + responses.GET, + "https://api.gitguardian.com/v1/incidents/secrets", + match=[ + responses.matchers.query_param_matcher( + {"per_page": "10", "status": "IGNORED", "cursor": "cD0yMTk4Ng=="} + ) + ], + body=json.dumps([make_incident(i) for i in range(5)]), + status=200, + content_type="application/json", + ) + for idx, incident in enumerate( + client.iter_incidents(per_page=10, status=IncidentStatus.IGNORED) + ): + assert isinstance(incident, Incident) + assert idx == 24 + + +def test_get_secret_incident_without_occurrences(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.gitguardian.com/v1/incidents/secrets/1", + match=[responses.matchers.query_param_matcher({"with_occurrences": "0"})], + body=json.dumps(make_incident(0)), + status=200, + content_type="application/json", + ) + response = client.get_secret_incident(incident_id=1, with_occurrences=0) + assert response.status_code == 200 + assert isinstance(response, Incident) + assert isinstance(response.to_dict(), OrderedDict) + response_json = response.to_json() + assert isinstance(response_json, str) + loaded_response = Incident.SCHEMA.load(json.loads(response_json)) + assert loaded_response == response + + +def test_get_secret_incident_with_occurrences(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.gitguardian.com/v1/incidents/secrets/1", + match=[responses.matchers.query_param_matcher({"with_occurrences": "1"})], + body=json.dumps(make_incident_with_occurrences(1, 2)), + status=200, + content_type="application/json", + ) + response = client.get_secret_incident(incident_id=1, with_occurrences=1) + assert response.status_code == 200 + assert isinstance(response, Incident) + assert response.occurrences is not None + assert len(response.occurrences) > 0 + assert isinstance(response.to_dict(), OrderedDict) + response_json = response.to_json() + assert isinstance(response_json, str) + loaded_response = Incident.SCHEMA.load(json.loads(response_json)) + assert loaded_response == response + + +def test_update_incident_severity(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.PATCH, + "https://api.gitguardian.com/v1/incidents/secrets/1", + match=[responses.matchers.json_params_matcher({"severity": "medium"})], + body=json.dumps(make_incident(1)), + status=200, + content_type="application/json", + ) + incident = client.update_incident_severity( + incident_id=1, severity=IncidentSeverity.MEDIUM + ) + assert isinstance(incident, Incident) + assert incident.status_code == 200 + assert incident.occurrences is None + + +def test_share_incident(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + "https://api.gitguardian.com/v1/incidents/secrets/1/share", + body=json.dumps(make_share_response(1)), + status=200, + content_type="application/json", + ) + response = client.share_incident(incident_id=1) + assert response.status_code == 200 + assert isinstance(response, SharedIncidentDetails) + assert isinstance(response.to_dict(), OrderedDict) + loaded_response = SharedIncidentDetails.SCHEMA.load( + json.loads(response.to_json()) + ) + assert loaded_response == response + + +def test_share_incident_with_obj(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + "https://api.gitguardian.com/v1/incidents/secrets/1/share", + body=json.dumps(make_share_response(1)), + status=200, + content_type="application/json", + ) + incident = Incident.from_dict(make_incident(1)) + response = client.share_incident(incident_id=incident) + assert response.status_code == 200 + assert isinstance(response, SharedIncidentDetails) + assert isinstance(response.to_dict(), OrderedDict) + loaded_response = SharedIncidentDetails.SCHEMA.load( + json.loads(response.to_json()) + ) + assert loaded_response == response + + +def test_share_incident_with_feedback(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + "https://api.gitguardian.com/v1/incidents/secrets/1/share", + match=[ + responses.matchers.query_param_matcher({"feedback_collection": "True"}) + ], + body=json.dumps(make_share_response(1, True)), + status=200, + content_type="application/json", + ) + response = client.share_incident(incident_id=1, feedback_collection=True) + assert response.status_code == 200 + assert isinstance(response, SharedIncidentDetails) + assert response.feedback_collection + assert isinstance(response.to_dict(), OrderedDict) + loaded_response = SharedIncidentDetails.SCHEMA.load( + json.loads(response.to_json()) + ) + assert loaded_response == response + + +def test_unshare_incident(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + "https://api.gitguardian.com/v1/incidents/secrets/1/unshare", + body="", + status=204, + content_type="application/json", + ) + response = client.unshare_incident(1) + assert response + + +def test_unshare_unshared_incident(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + "https://api.gitguardian.com/v1/incidents/secrets/1/unshare", + body='{"detail":"Issue is not shared"}', + status=409, + content_type="application/json", + ) + response = client.unshare_incident(1) + assert isinstance(response, Detail) + assert response.status_code == 409 + + +def test_grant_access_to_incident(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + "https://api.gitguardian.com/v1/incidents/secrets/1/grant_access", + match=[ + responses.matchers.query_param_matcher( + { + "incident_permission": "full_access", + "member_id": 1234, + } + ) + ], + status=204, + ) + response = client.grant_access_to_incident( + incident_id=1, + incident_permission=IncidentPermission.FULL_ACCESS, + member_id=1234, + ) + assert response + + +def test_grant_access_to_incident_mutually_exclusive(client: GGClient): + with pytest.raises(ValueError): + client.grant_access_to_incident( + incident_id=1, + incident_permission=IncidentPermission.FULL_ACCESS, + email="foo@example.com", + member_id=1234, + ) + + +def test_revoke_access_to_incident(client: GGClient): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + "https://api.gitguardian.com/v1/incidents/secrets/1/revoke_access", + match=[ + responses.matchers.query_param_matcher( + { + "member_id": 1234, + } + ) + ], + status=204, + ) + response = client.revoke_access_to_incident( + incident_id=1, + member_id=1234, + ) + assert response + + +def test_revoke_access_to_incident_mutually_exclusive(client: GGClient): + with pytest.raises(ValueError): + client.revoke_access_to_incident( + incident_id=1, + email="foo@example.com", + member_id=1234, + ) diff --git a/tests/test_incident_models.py b/tests/test_incident_models.py new file mode 100644 index 00000000..ade726c6 --- /dev/null +++ b/tests/test_incident_models.py @@ -0,0 +1,241 @@ +import pytest + +from pygitguardian.incident_models import Detector, Incident, Match, Occurrence + + +class TestModel: + @pytest.mark.parametrize( + "klass, instance_data", + [ + ( + Detector, + { + "name": "slack_bot_token", + "display_name": "Slack Bot Token", + "nature": "specific", + "family": "apikey", + "detector_group_name": "slackbot_token", + "detector_group_display_name": "Slack Bot Token", + }, + ), + ( + Match, + { + "name": "apikey", + "indice_start": 32, + "indice_end": 79, + "pre_line_start": None, + "pre_line_end": None, + "post_line_start": 1, + "post_line_end": 1, + }, + ), + ( + Occurrence, + { + "id": 4421, + "incident_id": 3759, + "kind": "realtime", + "sha": "d670460b4b4aece5915caf5c68d12f560a9fe3e4", + "source": { + "id": 6531, + "url": "https://github.com/GitGuardian/gg-shield", + "type": "github", + "full_name": "gitguardian/gg-shield", + "health": "at_risk", + "open_incidents_count": 3, + "closed_incidents_count": 2, + "visibility": "public", + "external_id": "125", + "last_scan": { + "date": "2021-05-20T12:40:55.662949Z", + "status": "finished", + }, + }, + "author_name": "Eric", + "author_info": "eric@gitguardian.com", + "date": "2021-05-20T12:40:55.662949Z", + "presence": "present", + "url": ( + "https://github.com/prm-dev-team/QATest_staging/commit/" + "76dd18a2a8d27eaf00a45851cc7731c53b59ed19" + "#diff-0f372f3171c8f13a15a22a1081487ed54fa70ad088e17c6c6386196a179a04ffR1" + ), + "matches": [ + { + "name": "apikey", + "indice_start": 32, + "indice_end": 79, + "pre_line_start": None, + "pre_line_end": None, + "post_line_start": 1, + "post_line_end": 1, + } + ], + "filepath": "test_data/12123testfile.txt", + }, + ), + ( + # Unresolved incident without ocurrences - returned from list endpoint + Incident, + { + "id": 3759, + "date": "2019-08-22T14:15:22Z", + "detector": { + "name": "slack_bot_token", + "display_name": "Slack Bot Token", + "nature": "specific", + "family": "apikey", + "detector_group_name": "slackbot_token", + "detector_group_display_name": "Slack Bot Token", + }, + "secret_hash": "Ri9FjVgdOlPnBmujoxP4XPJcbe82BhJXB/SAngijw/juCISuOMgPzYhV28m6OG24", + "gitguardian_url": "https://dashboard.gitguardian.com/workspace/1/incidents/3899", + "regression": False, + "status": "IGNORED", + "assignee_email": "eric@gitguardian.com", + "occurrences_count": 4, + "occurrences": None, + "ignore_reason": "test_credential", + "ignored_at": "2019-08-24T14:15:22Z", + "secret_revoked": False, + "severity": "high", + "validity": "valid", + "resolved_at": None, + "share_url": ( + "https://dashboard.gitguardian.com" + "/share/incidents/11111111-1111-1111-1111-111111111111" + ), + "tags": ["FROM_HISTORICAL_SCAN", "SENSITIVE_FILE"], + }, + ), + ( + # Resolved incident without ocurrences - returned from list endpoint + Incident, + { + "id": 3759, + "date": "2019-08-22T14:15:22Z", + "detector": { + "name": "slack_bot_token", + "display_name": "Slack Bot Token", + "nature": "specific", + "family": "apikey", + "detector_group_name": "slackbot_token", + "detector_group_display_name": "Slack Bot Token", + }, + "secret_hash": "Ri9FjVgdOlPnBmujoxP4XPJcbe82BhJXB/SAngijw/juCISuOMgPzYhV28m6OG24", + "gitguardian_url": "https://dashboard.gitguardian.com/workspace/1/incidents/3899", + "regression": False, + "status": "IGNORED", + "assignee_email": "eric@gitguardian.com", + "occurrences_count": 4, + "occurrences": None, + "ignore_reason": "test_credential", + "ignored_at": "2019-08-24T14:15:22Z", + "secret_revoked": False, + "severity": "high", + "validity": "valid", + "resolved_at": None, + "share_url": ( + "https://dashboard.gitguardian.com" + "/share/incidents/11111111-1111-1111-1111-111111111111" + ), + "tags": ["FROM_HISTORICAL_SCAN", "SENSITIVE_FILE"], + }, + ), + ( + # Ignored incident with ocurrences - returned from + # /v1/incidents/secrets/{incident_id} + Incident, + { + "id": 3759, + "date": "2019-08-22T14:15:22Z", + "detector": { + "name": "slack_bot_token", + "display_name": "Slack Bot Token", + "nature": "specific", + "family": "apikey", + "detector_group_name": "slackbot_token", + "detector_group_display_name": "Slack Bot Token", + }, + "secret_hash": "Ri9FjVgdOlPnBmujoxP4XPJcbe82BhJXB/SAngijw/juCISuOMgPzYhV28m6OG24", + "gitguardian_url": "https://dashboard.gitguardian.com/workspace/1/incidents/3899", + "regression": False, + "status": "IGNORED", + "assignee_id": 309, + "assignee_email": "eric@gitguardian.com", + "occurrences_count": 4, + "occurrences": [ + { + "id": 4421, + "incident_id": 3759, + "kind": "realtime", + "sha": "d670460b4b4aece5915caf5c68d12f560a9fe3e4", + "source": { + "id": 6531, + "url": "https://github.com/GitGuardian/gg-shield", + "type": "github", + "full_name": "gitguardian/gg-shield", + "health": "at_risk", + "open_incidents_count": 3, + "closed_incidents_count": 2, + "visibility": "public", + "external_id": "125", + "last_scan": { + "date": "2021-05-20T12:40:55.662949Z", + "status": "finished", + }, + }, + "author_name": "Eric", + "author_info": "eric@gitguardian.com", + "date": "2021-05-20T12:40:55.662949Z", + "presence": "present", + "url": ( + "https://github.com/prm-dev-team/QATest_staging/commit/" + "76dd18a2a8d27eaf00a45851cc7731c53b59ed19" + "#diff-0f372f3171c8f13a15a22a1081487ed54fa70ad088e17c6c6386196a179a04ffR1" + ), + "matches": [ + { + "name": "apikey", + "indice_start": 32, + "indice_end": 79, + "pre_line_start": None, + "pre_line_end": None, + "post_line_start": 1, + "post_line_end": 1, + } + ], + "filepath": "test_data/12123testfile.txt", + } + ], + "ignore_reason": "test_credential", + "severity": "high", + "validity": "valid", + "ignored_at": "2019-08-24T14:15:22Z", + "ignorer_id": 309, + "ignorer_api_token_id": "fdf075f9-1662-4cf1-9171-af50568158a8", + "resolver_id": 395, + "resolver_api_token_id": "fdf075f9-1662-4cf1-9171-af50568158a8", + "secret_revoked": False, + "resolved_at": None, + "share_url": ( + "https://dashboard.gitguardian.com" + "/share/incidents/11111111-1111-1111-1111-111111111111" + ), + "tags": ["FROM_HISTORICAL_SCAN", "SENSITIVE_FILE"], + }, + ), + ], + ) + def test_schema_loads(self, klass, instance_data): + """ + GIVEN the right kwargs and an extra field in dict format + WHEN loading using the schema + THEN the extra field is not taken into account + AND the result should be an instance of the expected class + """ + data = {**instance_data, "field": "extra"} + + obj = klass.from_dict(data) + assert isinstance(obj, klass)