diff --git a/socketdev/dependencies/__init__.py b/socketdev/dependencies/__init__.py index 201a9e0..be4e485 100644 --- a/socketdev/dependencies/__init__.py +++ b/socketdev/dependencies/__init__.py @@ -2,6 +2,7 @@ from urllib.parse import urlencode import logging from socketdev.tools import load_files +from socketdev.exceptions import APIFailure log = logging.getLogger("socketdev") @@ -13,17 +14,20 @@ def __init__(self, api): self.api = api def post(self, files: list, params: dict) -> dict: - loaded_files = [] - loaded_files = load_files(files, loaded_files) + loaded_files = load_files(files, []) path = "dependencies/upload?" + urlencode(params) - response = self.api.do_request(path=path, files=loaded_files, method="POST") - if response.status_code == 200: - result = response.json() - else: - result = {} - log.error(f"Error posting {files} to the Dependency API") - log.error(response.text) - return result + try: + response = self.api.do_request(path=path, files=loaded_files, method="POST") + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while posting dependencies {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while posting dependencies {e}") + raise + + return {} def get( self, @@ -33,11 +37,15 @@ def get( path = "dependencies/search" payload = {"limit": limit, "offset": offset} payload_str = json.dumps(payload) - response = self.api.do_request(path=path, method="POST", payload=payload_str) - if response.status_code == 200: - result = response.json() - else: - result = {} - log.error("Unable to retrieve Dependencies") - log.error(response.text) - return result + try: + response = self.api.do_request(path=path, method="POST", payload=payload_str) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while retrieving dependencies {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while retrieving dependencies {e}") + raise + + return {} diff --git a/socketdev/export/__init__.py b/socketdev/export/__init__.py index 19c04e1..8aeb31c 100644 --- a/socketdev/export/__init__.py +++ b/socketdev/export/__init__.py @@ -2,6 +2,7 @@ from dataclasses import dataclass, asdict from typing import Optional import logging +from socketdev.exceptions import APIFailure log = logging.getLogger("socketdev") @@ -40,14 +41,17 @@ def cdx_bom( path = f"orgs/{org_slug}/export/cdx/{id}" if query_params: path += query_params.to_query_params() - response = self.api.do_request(path=path) + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while exporting CDX BOM {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while exporting CDX BOM {e}") + raise - if response.status_code == 200: - return response.json() - # TODO: Add typed response when types are defined - - log.error(f"Error exporting CDX BOM: {response.status_code}") - print(response.text) return {} def spdx_bom( @@ -64,12 +68,15 @@ def spdx_bom( path = f"orgs/{org_slug}/export/spdx/{id}" if query_params: path += query_params.to_query_params() - response = self.api.do_request(path=path) - - if response.status_code == 200: - return response.json() - # TODO: Add typed response when types are defined + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while exporting SPDX BOM {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while exporting SPDX BOM {e}") + raise - log.error(f"Error exporting SPDX BOM: {response.status_code}") - print(response.text) return {} diff --git a/socketdev/fullscans/__init__.py b/socketdev/fullscans/__init__.py index 68175ae..7ea1619 100644 --- a/socketdev/fullscans/__init__.py +++ b/socketdev/fullscans/__init__.py @@ -3,7 +3,7 @@ from enum import Enum from typing import Any, Dict, List, Optional, Union from dataclasses import dataclass, asdict, field - +from socketdev.exceptions import APIFailure from ..utils import IntegrationType, Utils @@ -710,20 +710,20 @@ def create_params_string(self, params: dict) -> str: def get(self, org_slug: str, params: dict, use_types: bool = False) -> Union[dict, GetFullScanMetadataResponse]: params_arg = self.create_params_string(params) path = "orgs/" + org_slug + "/full-scans" + str(params_arg) - response = self.api.do_request(path=path) - - if response.status_code == 200: - result = response.json() - if use_types: - return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result}) - return result - - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error getting full scan metadata: {response.status_code}, message: {error_message}") - if use_types: - return GetFullScanMetadataResponse.from_dict( - {"success": False, "status": response.status_code, "message": error_message} - ) + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + result = response.json() + if use_types: + return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result}) + return result + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting full scan metadata {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting full scan metadata {e}") + raise + return {} def post(self, files: list, params: FullScanParams, use_types: bool = False) -> Union[dict, CreateFullScanResponse]: @@ -731,116 +731,116 @@ def post(self, files: list, params: FullScanParams, use_types: bool = False) -> org_slug = str(params.org_slug) params_dict = params.to_dict() params_dict.pop("org_slug") - params_arg = self.create_params_string(params_dict) - path = "orgs/" + org_slug + "/full-scans" + str(params_arg) - response = self.api.do_request(path=path, method="POST", files=files) - - if response.status_code == 201: - result = response.json() - if use_types: - return CreateFullScanResponse.from_dict({"success": True, "status": 201, "data": result}) - return result + try: + response = self.api.do_request(path=path, method="POST", files=files) + if response.status_code == 201: + result = response.json() + if use_types: + return CreateFullScanResponse.from_dict({"success": True, "status": 201, "data": result}) + return result + except APIFailure as e: + log.error(f"Socket SDK: API failure while posting full scan {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while posting full scan {e}") + raise - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error posting {files} to the Fullscans API: {response.status_code}, message: {error_message}") - if use_types: - return CreateFullScanResponse.from_dict( - {"success": False, "status": response.status_code, "message": error_message} - ) return {} def delete(self, org_slug: str, full_scan_id: str) -> dict: path = "orgs/" + org_slug + "/full-scans/" + full_scan_id + try: + response = self.api.do_request(path=path, method="DELETE") + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while deleting full scan {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while deleting full scan {e}") + raise - response = self.api.do_request(path=path, method="DELETE") - - if response.status_code == 200: - result = response.json() - return result - - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error deleting full scan: {response.status_code}, message: {error_message}") return {} def stream_diff( self, org_slug: str, before: str, after: str, use_types: bool = False ) -> Union[dict, StreamDiffResponse]: path = f"orgs/{org_slug}/full-scans/diff?before={before}&after={after}" + try: + response = self.api.do_request(path=path, method="GET") + if response.status_code == 200: + result = response.json() + if use_types: + return StreamDiffResponse.from_dict({"success": True, "status": 200, "data": result}) + return result + except APIFailure as e: + log.error(f"Socket SDK: API failure while streaming diff {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while streaming diff {e}") + raise - response = self.api.do_request(path=path, method="GET") - - if response.status_code == 200: - result = response.json() - if use_types: - return StreamDiffResponse.from_dict({"success": True, "status": 200, "data": result}) - return result - - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error streaming diff: {response.status_code}, message: {error_message}") - if use_types: - return StreamDiffResponse.from_dict( - {"success": False, "status": response.status_code, "message": error_message} - ) return {} def stream(self, org_slug: str, full_scan_id: str, use_types: bool = False) -> Union[dict, FullScanStreamResponse]: path = "orgs/" + org_slug + "/full-scans/" + full_scan_id - response = self.api.do_request(path=path, method="GET") - - if response.status_code == 200: - try: - stream_str = [] - artifacts = {} - result = response.text - result = result.strip('"').strip() - for line in result.split("\n"): - if line != '"' and line != "" and line is not None: - item = json.loads(line) - stream_str.append(item) - for val in stream_str: - artifacts[val["id"]] = val - - if use_types: - return FullScanStreamResponse.from_dict({"success": True, "status": 200, "artifacts": artifacts}) - return artifacts + try: + response = self.api.do_request(path=path, method="GET") + if response.status_code == 200: + try: + stream_str = [] + artifacts = {} + result = response.text.strip('"').strip() + for line in result.split("\n"): + if line != '"' and line != "" and line is not None: + item = json.loads(line) + stream_str.append(item) + for val in stream_str: + artifacts[val["id"]] = val + + if use_types: + return FullScanStreamResponse.from_dict( + {"success": True, "status": 200, "artifacts": artifacts} + ) + return artifacts + + except Exception as e: + error_message = f"Error parsing stream response: {str(e)}" + log.error(error_message) + if use_types: + return FullScanStreamResponse.from_dict( + {"success": False, "status": response.status_code, "message": error_message} + ) + return {} + + except APIFailure as e: + log.error(f"Socket SDK: API failure while streaming full scan {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while streaming full scan {e}") + raise - except Exception as e: - error_message = f"Error parsing stream response: {str(e)}" - log.error(error_message) - if use_types: - return FullScanStreamResponse.from_dict( - {"success": False, "status": response.status_code, "message": error_message} - ) - return {} - - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error streaming full scan: {response.status_code}, message: {error_message}") - if use_types: - return FullScanStreamResponse.from_dict( - {"success": False, "status": response.status_code, "message": error_message} - ) return {} def metadata( self, org_slug: str, full_scan_id: str, use_types: bool = False ) -> Union[dict, GetFullScanMetadataResponse]: path = "orgs/" + org_slug + "/full-scans/" + full_scan_id + "/metadata" + try: + response = self.api.do_request(path=path, method="GET") + if response.status_code == 200: + result = response.json() + if use_types: + return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result}) + return result + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting metadata {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting metadata {e}") + raise - response = self.api.do_request(path=path, method="GET") - - if response.status_code == 200: - result = response.json() - if use_types: - return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result}) - return result - - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error getting metadata: {response.status_code}, message: {error_message}") - if use_types: - return GetFullScanMetadataResponse.from_dict( - {"success": False, "status": response.status_code, "message": error_message} - ) return {} diff --git a/socketdev/historical/__init__.py b/socketdev/historical/__init__.py index 223d90c..202b30f 100644 --- a/socketdev/historical/__init__.py +++ b/socketdev/historical/__init__.py @@ -1,5 +1,6 @@ import logging from urllib.parse import urlencode +from socketdev.exceptions import APIFailure log = logging.getLogger("socketdev") @@ -19,12 +20,17 @@ def list(self, org_slug: str, query_params: dict = None) -> dict: if query_params: path += "?" + urlencode(query_params) - response = self.api.do_request(path=path) - if response.status_code == 200: - return response.json() + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting historical alerts {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting historical alerts {e}") + raise - log.error(f"Error getting historical alerts: {response.status_code}") - log.error(response.text) return {} def trend(self, org_slug: str, query_params: dict = None) -> dict: @@ -38,10 +44,15 @@ def trend(self, org_slug: str, query_params: dict = None) -> dict: if query_params: path += "?" + urlencode(query_params) - response = self.api.do_request(path=path) - if response.status_code == 200: - return response.json() + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting historical trend {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting historical trend {e}") + raise - log.error(f"Error getting historical trend: {response.status_code}") - log.error(response.text) return {} diff --git a/socketdev/npm/__init__.py b/socketdev/npm/__init__.py index 55abd90..e8dabbd 100644 --- a/socketdev/npm/__init__.py +++ b/socketdev/npm/__init__.py @@ -1,9 +1,8 @@ import logging +from socketdev.exceptions import APIFailure log = logging.getLogger("socketdev") -# TODO: Add response type classes for NPM endpoints - class NPM: def __init__(self, api): @@ -11,18 +10,30 @@ def __init__(self, api): def issues(self, package: str, version: str) -> list: path = f"npm/{package}/{version}/issues" - response = self.api.do_request(path=path) - if response.status_code == 200: - return response.json() - log.error(f"Error getting npm issues: {response.status_code}") - print(response.text) + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting npm issues {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting npm issues {e}") + raise + return [] def score(self, package: str, version: str) -> list: path = f"npm/{package}/{version}/score" - response = self.api.do_request(path=path) - if response.status_code == 200: - return response.json() - log.error(f"Error getting npm score: {response.status_code}") - print(response.text) - return [] + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting npm score {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting npm score {e}") + raise + + return {} diff --git a/socketdev/openapi/__init__.py b/socketdev/openapi/__init__.py index 96f7fa9..5daadca 100644 --- a/socketdev/openapi/__init__.py +++ b/socketdev/openapi/__init__.py @@ -1,4 +1,5 @@ import logging +from socketdev.exceptions import APIFailure log = logging.getLogger("socketdev") @@ -11,9 +12,15 @@ def __init__(self, api): def get(self) -> dict: path = "openapi" - response = self.api.do_request(path=path) - if response.status_code == 200: - return response.json() - log.error(f"Error getting OpenAPI spec: {response.status_code}") - print(response.text) + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting OpenAPI spec {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting OpenAPI spec {e}") + raise + return {} diff --git a/socketdev/org/__init__.py b/socketdev/org/__init__.py index d59aa0a..b38bd7c 100644 --- a/socketdev/org/__init__.py +++ b/socketdev/org/__init__.py @@ -1,5 +1,6 @@ from typing import TypedDict, Dict import logging +from socketdev.api import APIFailure log = logging.getLogger("socketdev") @@ -23,12 +24,18 @@ def __init__(self, api): def get(self, use_types: bool = False) -> OrganizationsResponse: path = "organizations" - response = self.api.do_request(path=path) - if response.status_code == 200: - result = response.json() - if use_types: - return OrganizationsResponse(result) - return result - log.error(f"Error getting organizations: {response.status_code}") - print(response.text) + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + result = response.json() + if use_types: + return OrganizationsResponse(result) + return result + except APIFailure as e: + log.error(f"Socket SDK: API failure {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error {e}") + raise + return {"organizations": {}} diff --git a/socketdev/purl/__init__.py b/socketdev/purl/__init__.py index 248f2ff..2140aab 100644 --- a/socketdev/purl/__init__.py +++ b/socketdev/purl/__init__.py @@ -1,4 +1,8 @@ import json +from socketdev.exceptions import APIFailure +import logging + +log = logging.getLogger("socketdev") class Purl: @@ -10,20 +14,24 @@ def post(self, license: str = "true", components: list = []) -> list: components = {"components": components} components = json.dumps(components) - response = self.api.do_request(path=path, payload=components, method="POST") - if response.status_code == 200: - purl = [] - result = response.text - result = result.strip('"').strip() - for line in result.split("\n"): - if line and line != '"': - try: - item = json.loads(line) - purl.append(item) - except json.JSONDecodeError: - continue - return purl + try: + response = self.api.do_request(path=path, payload=components, method="POST") + if response.status_code == 200: + purl = [] + result = response.text.strip('"').strip() + for line in result.split("\n"): + if line and line != '"': + try: + item = json.loads(line) + purl.append(item) + except json.JSONDecodeError: + continue + return purl + except APIFailure as e: + log.error(f"Socket SDK: API failure while posting to the Purl API {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while posting to the Purl API {e}") + raise - log.error(f"Error posting {components} to the Purl API: {response.status_code}") - print(response.text) return [] diff --git a/socketdev/quota/__init__.py b/socketdev/quota/__init__.py index bd3269f..294bf89 100644 --- a/socketdev/quota/__init__.py +++ b/socketdev/quota/__init__.py @@ -1,4 +1,5 @@ import logging +from socketdev.exceptions import APIFailure log = logging.getLogger("socketdev") @@ -11,9 +12,15 @@ def __init__(self, api): def get(self) -> dict: path = "quota" - response = self.api.do_request(path=path) - if response.status_code == 200: - return response.json() - log.error(f"Error getting quota: {response.status_code}") - print(response.text) + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting quota {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting quota {e}") + raise + return {} diff --git a/socketdev/report/__init__.py b/socketdev/report/__init__.py index f92a621..27efc12 100644 --- a/socketdev/report/__init__.py +++ b/socketdev/report/__init__.py @@ -1,10 +1,9 @@ import logging from datetime import datetime, timedelta, timezone +from socketdev.exceptions import APIFailure log = logging.getLogger("socketdev") -# TODO: Add response type classes for Report endpoints - class Report: def __init__(self, api): @@ -23,38 +22,63 @@ def list(self, from_time: int = None) -> dict: path = "report/list" if from_time is not None: path += f"?from={from_time}" - response = self.api.do_request(path=path) - if response.status_code == 200: - return response.json() - log.error(f"Error listing reports: {response.status_code}") - print(response.text) + + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while listing reports {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while listing reports {e}") + raise + return {} def delete(self, report_id: str) -> bool: path = f"report/delete/{report_id}" - response = self.api.do_request(path=path, method="DELETE") - if response.status_code == 200: - return True - log.error(f"Error deleting report: {response.status_code}") - print(response.text) + try: + response = self.api.do_request(path=path, method="DELETE") + if response.status_code == 200: + return True + except APIFailure as e: + log.error(f"Socket SDK: API failure while deleting report {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while deleting report {e}") + raise + return False def view(self, report_id) -> dict: path = f"report/view/{report_id}" - response = self.api.do_request(path=path) - if response.status_code == 200: - return response.json() - log.error(f"Error viewing report: {response.status_code}") - print(response.text) + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while viewing report {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while viewing report {e}") + raise + return {} def supported(self) -> dict: path = "report/supported" - response = self.api.do_request(path=path) - if response.status_code == 200: - return response.json() - log.error(f"Error getting supported reports: {response.status_code}") - print(response.text) + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting supported files {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting supported files {e}") + raise + return {} def create(self, files: list) -> dict: @@ -64,9 +88,16 @@ def create(self, files: list) -> dict: open_files.append(file_info) path = "report/upload" payload = {} - response = self.api.do_request(path=path, method="PUT", files=open_files, payload=payload) - if response.status_code == 200: - return response.json() - log.error(f"Error creating report: {response.status_code}") - print(response.text) + + try: + response = self.api.do_request(path=path, method="PUT", files=open_files, payload=payload) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while creating report {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while creating report {e}") + raise + return {} diff --git a/socketdev/repos/__init__.py b/socketdev/repos/__init__.py index f1028d9..192f15f 100644 --- a/socketdev/repos/__init__.py +++ b/socketdev/repos/__init__.py @@ -2,6 +2,7 @@ import logging from typing import Optional, Union from dataclasses import dataclass, asdict +from socketdev.exceptions import APIFailure log = logging.getLogger("socketdev") @@ -81,35 +82,44 @@ def get(self, org_slug: str, **kwargs) -> dict[str, list[dict] | int]: path += f"{param}={value}&" path = path.rstrip("&") - response = self.api.do_request(path=path) + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + raw_result = response.json() + per_page = int(query_params.get("per_page", 30)) - if response.status_code == 200: - raw_result = response.json() - per_page = int(query_params.get("per_page", 30)) + # TEMPORARY: Handle pagination edge case where API returns nextPage=1 even when no more results exist + if raw_result["nextPage"] != 0 and len(raw_result["results"]) < per_page: + raw_result["nextPage"] = 0 - # TEMPORARY: Handle pagination edge case where API returns nextPage=1 even when no more results exist - if raw_result["nextPage"] != 0 and len(raw_result["results"]) < per_page: - raw_result["nextPage"] = 0 + return raw_result + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting repositories {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting repositories {e}") + raise - return raw_result - - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error getting repositories: {response.status_code}, message: {error_message}") return {} def repo(self, org_slug: str, repo_name: str, use_types: bool = False) -> Union[dict, GetRepoResponse]: path = f"orgs/{org_slug}/repos/{repo_name}" - response = self.api.do_request(path=path) - - if response.status_code == 200: - result = response.json() - if use_types: - return GetRepoResponse.from_dict({"success": True, "status": 200, "data": result}) - return result + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + result = response.json() + if use_types: + return GetRepoResponse.from_dict({"success": True, "status": 200, "data": result}) + return result + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting repository {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting repository {e}") + raise - error_message = response.json().get("error", {}).get("message", "Unknown error") - print(f"Failed to get repository: {response.status_code}, message: {error_message}") if use_types: + error_message = response.json().get("error", {}).get("message", "Unknown error") return GetRepoResponse.from_dict( {"success": False, "status": response.status_code, "message": error_message} ) @@ -117,13 +127,17 @@ def repo(self, org_slug: str, repo_name: str, use_types: bool = False) -> Union[ def delete(self, org_slug: str, name: str) -> dict: path = f"orgs/{org_slug}/repos/{name}" - response = self.api.do_request(path=path, method="DELETE") - - if response.status_code == 200: - return response.json() + try: + response = self.api.do_request(path=path, method="DELETE") + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while deleting repository {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while deleting repository {e}") + raise - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error deleting repository: {response.status_code}, message: {error_message}") return {} def post(self, org_slug: str, **kwargs) -> dict: @@ -136,30 +150,38 @@ def post(self, org_slug: str, **kwargs) -> dict: path = "orgs/" + org_slug + "/repos" payload = json.dumps(params) - response = self.api.do_request(path=path, method="POST", payload=payload) + try: + response = self.api.do_request(path=path, method="POST", payload=payload) + if response.status_code == 201: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while creating repository {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while creating repository {e}") + raise - if response.status_code == 201: - return response.json() - - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error creating repository: {response.status_code}, message: {error_message}") return {} def update(self, org_slug: str, repo_name: str, **kwargs) -> dict: params = {} if kwargs: - for key, val in kwargs.keys(): + for key, val in kwargs.items(): params[key] = val if len(params) == 0: return {} path = f"orgs/{org_slug}/repos/{repo_name}" payload = json.dumps(params) - response = self.api.do_request(path=path, method="POST", payload=payload) - - if response.status_code == 200: - return response.json() + try: + response = self.api.do_request(path=path, method="POST", payload=payload) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while updating repository {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while updating repository {e}") + raise - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error updating repository: {response.status_code}, message: {error_message}") return {} diff --git a/socketdev/repositories/__init__.py b/socketdev/repositories/__init__.py index 4390313..0f96359 100644 --- a/socketdev/repositories/__init__.py +++ b/socketdev/repositories/__init__.py @@ -1,5 +1,6 @@ from typing import TypedDict, Union import logging +from socketdev.exceptions import APIFailure log = logging.getLogger("socketdev") @@ -19,13 +20,18 @@ def __init__(self, api): def list(self, use_types: bool = False) -> Union[dict, list[Repo]]: path = "repos" - response = self.api.do_request(path=path) - if response.status_code == 200: - result = response.json() - if use_types: - return [Repo(repo) for repo in result] - return result + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + result = response.json() + if use_types: + return [Repo(repo) for repo in result] + return result + except APIFailure as e: + log.error(f"Socket SDK: API failure while listing repositories {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while listing repositories {e}") + raise - log.error(f"Error listing repositories: {response.status_code}") - print(response.text) return [] diff --git a/socketdev/sbom/__init__.py b/socketdev/sbom/__init__.py index 67aa15e..9b6780d 100644 --- a/socketdev/sbom/__init__.py +++ b/socketdev/sbom/__init__.py @@ -1,11 +1,10 @@ import json from socketdev.core.classes import Package import logging +from socketdev.exceptions import APIFailure log = logging.getLogger("socketdev") -# TODO: Add response type classes for SBOM endpoints - class Sbom: def __init__(self, api): @@ -17,24 +16,27 @@ def __init__(self, api): # who have been using this method since its introduction 9 months ago. def view(self, report_id: str) -> dict[str, dict]: path = f"sbom/view/{report_id}" - response = self.api.do_request(path=path) - if response.status_code == 200: - sbom = [] - sbom_dict = {} - data = response.text - data.strip('"') - data.strip() - for line in data.split("\n"): - if line != '"' and line != "" and line is not None: - item = json.loads(line) - sbom.append(item) - for val in sbom: - sbom_dict[val["id"]] = val - else: - log.error(f"Error viewing SBOM: {response.status_code}") - print(response.text) - sbom_dict = {} - return sbom_dict + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + sbom = [] + sbom_dict = {} + data = response.text.strip('"').strip() + for line in data.split("\n"): + if line and line != '"': + item = json.loads(line) + sbom.append(item) + for val in sbom: + sbom_dict[val["id"]] = val + return sbom_dict + except APIFailure as e: + log.error(f"Socket SDK: API failure while viewing SBOM {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while viewing SBOM {e}") + raise + + return {} def create_packages_dict(self, sbom: dict[str, dict]) -> dict[str, Package]: """ diff --git a/socketdev/settings/__init__.py b/socketdev/settings/__init__.py index 416454c..f71cd9f 100644 --- a/socketdev/settings/__init__.py +++ b/socketdev/settings/__init__.py @@ -2,6 +2,7 @@ from enum import Enum from typing import Dict, Optional, Union from dataclasses import dataclass, asdict +from socketdev.exceptions import APIFailure log = logging.getLogger("socketdev") @@ -80,21 +81,31 @@ def get( params = {"custom_rules_only": custom_rules_only} params_args = self.create_params_string(params) if custom_rules_only else "" path += params_args - response = self.api.do_request(path=path, method="GET") - - if response.status_code == 200: - rules = response.json() - if use_types: - return OrgSecurityPolicyResponse.from_dict( - {"securityPolicyRules": rules.get("securityPolicyRules", {}), "success": True, "status": 200} - ) - return rules - - error_message = response.json().get("error", {}).get("message", "Unknown error") - print(f"Failed to get security policy: {response.status_code}, message: {error_message}") + + try: + response = self.api.do_request(path=path, method="GET") + if response.status_code == 200: + rules = response.json() + if use_types: + return OrgSecurityPolicyResponse.from_dict( + {"securityPolicyRules": rules.get("securityPolicyRules", {}), "success": True, "status": 200} + ) + return rules + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting security policy {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting security policy {e}") + raise + if use_types: return OrgSecurityPolicyResponse.from_dict( - {"securityPolicyRules": {}, "success": False, "status": response.status_code, "message": error_message} + { + "securityPolicyRules": {}, + "success": False, + "status": response.status_code, + "message": "Unknown error", + } ) return {} @@ -106,13 +117,17 @@ def integration_events(self, org_slug: str, integration_id: str) -> dict: integration_id: Integration ID """ path = f"orgs/{org_slug}/settings/integrations/{integration_id}" - response = self.api.do_request(path=path) - - if response.status_code == 200: - return response.json() + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting integration events {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting integration events {e}") + raise - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error getting integration events: {response.status_code}, message: {error_message}") return {} def get_license_policy(self, org_slug: str) -> dict: @@ -122,13 +137,17 @@ def get_license_policy(self, org_slug: str) -> dict: org_slug: Organization slug """ path = f"orgs/{org_slug}/settings/license-policy" - response = self.api.do_request(path=path) + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting license policy {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting license policy {e}") + raise - if response.status_code == 200: - return response.json() - - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error getting license policy: {response.status_code}, message: {error_message}") return {} def update_security_policy(self, org_slug: str, body: dict, custom_rules_only: bool = False) -> dict: @@ -143,13 +162,17 @@ def update_security_policy(self, org_slug: str, body: dict, custom_rules_only: b if custom_rules_only: path += "?custom_rules_only=true" - response = self.api.do_request(path=path, method="POST", payload=body) - - if response.status_code == 200: - return response.json() + try: + response = self.api.do_request(path=path, method="POST", payload=body) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while updating security policy {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while updating security policy {e}") + raise - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error updating security policy: {response.status_code}, message: {error_message}") return {} def update_license_policy(self, org_slug: str, body: dict, merge_update: bool = False) -> dict: @@ -162,11 +185,15 @@ def update_license_policy(self, org_slug: str, body: dict, merge_update: bool = """ path = f"orgs/{org_slug}/settings/license-policy?merge_update={str(merge_update).lower()}" - response = self.api.do_request(path=path, method="POST", payload=body) - - if response.status_code == 200: - return response.json() + try: + response = self.api.do_request(path=path, method="POST", payload=body) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while updating license policy {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while updating license policy {e}") + raise - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error updating license policy: {response.status_code}, message: {error_message}") return {} diff --git a/socketdev/triage/__init__.py b/socketdev/triage/__init__.py index b3a9d96..1aee8f5 100644 --- a/socketdev/triage/__init__.py +++ b/socketdev/triage/__init__.py @@ -1,5 +1,6 @@ import logging from urllib.parse import urlencode +from socketdev.exceptions import APIFailure log = logging.getLogger("socketdev") @@ -19,13 +20,17 @@ def list_alert_triage(self, org_slug: str, query_params: dict = None) -> dict: if query_params: path += "?" + urlencode(query_params) - response = self.api.do_request(path=path) + try: + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while getting alert triage list {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while getting alert triage list {e}") + raise - if response.status_code == 200: - return response.json() - - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error getting alert triage list: {response.status_code}, message: {error_message}") return {} def update_alert_triage(self, org_slug: str, body: dict) -> dict: @@ -37,11 +42,15 @@ def update_alert_triage(self, org_slug: str, body: dict) -> dict: """ path = f"orgs/{org_slug}/triage/alerts" - response = self.api.do_request(path=path, method="POST", payload=body) - - if 200 <= response.status_code < 300: - return response.json() + try: + response = self.api.do_request(path=path, method="POST", payload=body) + if 200 <= response.status_code < 300: + return response.json() + except APIFailure as e: + log.error(f"Socket SDK: API failure while updating alert triage {e}") + raise + except Exception as e: + log.error(f"Socket SDK: Unexpected error while updating alert triage {e}") + raise - error_message = response.json().get("error", {}).get("message", "Unknown error") - log.error(f"Error updating alert triage: {response.status_code}, message: {error_message}") return {} diff --git a/socketdev/version.py b/socketdev/version.py index e5d18b2..61663da 100644 --- a/socketdev/version.py +++ b/socketdev/version.py @@ -1 +1 @@ -__version__ = "2.0.9" +__version__ = "2.0.10"