Skip to content

all methods now log and raise exceptions to the caller #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions socketdev/dependencies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from urllib.parse import urlencode
import logging
from socketdev.tools import load_files
from socketdev.exceptions import APIFailure

log = logging.getLogger("socketdev")

Expand All @@ -13,17 +14,20 @@ def __init__(self, api):
self.api = api

def post(self, files: list, params: dict) -> dict:
loaded_files = []
loaded_files = load_files(files, loaded_files)
loaded_files = load_files(files, [])
path = "dependencies/upload?" + urlencode(params)
response = self.api.do_request(path=path, files=loaded_files, method="POST")
if response.status_code == 200:
result = response.json()
else:
result = {}
log.error(f"Error posting {files} to the Dependency API")
log.error(response.text)
return result
try:
response = self.api.do_request(path=path, files=loaded_files, method="POST")
if response.status_code == 200:
return response.json()
except APIFailure as e:
log.error(f"Socket SDK: API failure while posting dependencies {e}")
raise
except Exception as e:
log.error(f"Socket SDK: Unexpected error while posting dependencies {e}")
raise

return {}

def get(
self,
Expand All @@ -33,11 +37,15 @@ def get(
path = "dependencies/search"
payload = {"limit": limit, "offset": offset}
payload_str = json.dumps(payload)
response = self.api.do_request(path=path, method="POST", payload=payload_str)
if response.status_code == 200:
result = response.json()
else:
result = {}
log.error("Unable to retrieve Dependencies")
log.error(response.text)
return result
try:
response = self.api.do_request(path=path, method="POST", payload=payload_str)
if response.status_code == 200:
return response.json()
except APIFailure as e:
log.error(f"Socket SDK: API failure while retrieving dependencies {e}")
raise
except Exception as e:
log.error(f"Socket SDK: Unexpected error while retrieving dependencies {e}")
raise

return {}
35 changes: 21 additions & 14 deletions socketdev/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dataclasses import dataclass, asdict
from typing import Optional
import logging
from socketdev.exceptions import APIFailure

log = logging.getLogger("socketdev")

Expand Down Expand Up @@ -40,14 +41,17 @@ def cdx_bom(
path = f"orgs/{org_slug}/export/cdx/{id}"
if query_params:
path += query_params.to_query_params()
response = self.api.do_request(path=path)
try:
response = self.api.do_request(path=path)
if response.status_code == 200:
return response.json()
except APIFailure as e:
log.error(f"Socket SDK: API failure while exporting CDX BOM {e}")
raise
except Exception as e:
log.error(f"Socket SDK: Unexpected error while exporting CDX BOM {e}")
raise

if response.status_code == 200:
return response.json()
# TODO: Add typed response when types are defined

log.error(f"Error exporting CDX BOM: {response.status_code}")
print(response.text)
return {}

def spdx_bom(
Expand All @@ -64,12 +68,15 @@ def spdx_bom(
path = f"orgs/{org_slug}/export/spdx/{id}"
if query_params:
path += query_params.to_query_params()
response = self.api.do_request(path=path)

if response.status_code == 200:
return response.json()
# TODO: Add typed response when types are defined
try:
response = self.api.do_request(path=path)
if response.status_code == 200:
return response.json()
except APIFailure as e:
log.error(f"Socket SDK: API failure while exporting SPDX BOM {e}")
raise
except Exception as e:
log.error(f"Socket SDK: Unexpected error while exporting SPDX BOM {e}")
raise

log.error(f"Error exporting SPDX BOM: {response.status_code}")
print(response.text)
return {}
198 changes: 99 additions & 99 deletions socketdev/fullscans/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, asdict, field

from socketdev.exceptions import APIFailure

from ..utils import IntegrationType, Utils

Expand Down Expand Up @@ -710,137 +710,137 @@ def create_params_string(self, params: dict) -> str:
def get(self, org_slug: str, params: dict, use_types: bool = False) -> Union[dict, GetFullScanMetadataResponse]:
params_arg = self.create_params_string(params)
path = "orgs/" + org_slug + "/full-scans" + str(params_arg)
response = self.api.do_request(path=path)

if response.status_code == 200:
result = response.json()
if use_types:
return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result})
return result

error_message = response.json().get("error", {}).get("message", "Unknown error")
log.error(f"Error getting full scan metadata: {response.status_code}, message: {error_message}")
if use_types:
return GetFullScanMetadataResponse.from_dict(
{"success": False, "status": response.status_code, "message": error_message}
)
try:
response = self.api.do_request(path=path)
if response.status_code == 200:
result = response.json()
if use_types:
return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result})
return result
except APIFailure as e:
log.error(f"Socket SDK: API failure while getting full scan metadata {e}")
raise
except Exception as e:
log.error(f"Socket SDK: Unexpected error while getting full scan metadata {e}")
raise

return {}

def post(self, files: list, params: FullScanParams, use_types: bool = False) -> Union[dict, CreateFullScanResponse]:
Utils.validate_integration_type(params.integration_type if params.integration_type else "api")
org_slug = str(params.org_slug)
params_dict = params.to_dict()
params_dict.pop("org_slug")

params_arg = self.create_params_string(params_dict)

path = "orgs/" + org_slug + "/full-scans" + str(params_arg)

response = self.api.do_request(path=path, method="POST", files=files)

if response.status_code == 201:
result = response.json()
if use_types:
return CreateFullScanResponse.from_dict({"success": True, "status": 201, "data": result})
return result
try:
response = self.api.do_request(path=path, method="POST", files=files)
if response.status_code == 201:
result = response.json()
if use_types:
return CreateFullScanResponse.from_dict({"success": True, "status": 201, "data": result})
return result
except APIFailure as e:
log.error(f"Socket SDK: API failure while posting full scan {e}")
raise
except Exception as e:
log.error(f"Socket SDK: Unexpected error while posting full scan {e}")
raise

error_message = response.json().get("error", {}).get("message", "Unknown error")
log.error(f"Error posting {files} to the Fullscans API: {response.status_code}, message: {error_message}")
if use_types:
return CreateFullScanResponse.from_dict(
{"success": False, "status": response.status_code, "message": error_message}
)
return {}

def delete(self, org_slug: str, full_scan_id: str) -> dict:
path = "orgs/" + org_slug + "/full-scans/" + full_scan_id
try:
response = self.api.do_request(path=path, method="DELETE")
if response.status_code == 200:
return response.json()
except APIFailure as e:
log.error(f"Socket SDK: API failure while deleting full scan {e}")
raise
except Exception as e:
log.error(f"Socket SDK: Unexpected error while deleting full scan {e}")
raise

response = self.api.do_request(path=path, method="DELETE")

if response.status_code == 200:
result = response.json()
return result

error_message = response.json().get("error", {}).get("message", "Unknown error")
log.error(f"Error deleting full scan: {response.status_code}, message: {error_message}")
return {}

def stream_diff(
self, org_slug: str, before: str, after: str, use_types: bool = False
) -> Union[dict, StreamDiffResponse]:
path = f"orgs/{org_slug}/full-scans/diff?before={before}&after={after}"
try:
response = self.api.do_request(path=path, method="GET")
if response.status_code == 200:
result = response.json()
if use_types:
return StreamDiffResponse.from_dict({"success": True, "status": 200, "data": result})
return result
except APIFailure as e:
log.error(f"Socket SDK: API failure while streaming diff {e}")
raise
except Exception as e:
log.error(f"Socket SDK: Unexpected error while streaming diff {e}")
raise

response = self.api.do_request(path=path, method="GET")

if response.status_code == 200:
result = response.json()
if use_types:
return StreamDiffResponse.from_dict({"success": True, "status": 200, "data": result})
return result

error_message = response.json().get("error", {}).get("message", "Unknown error")
log.error(f"Error streaming diff: {response.status_code}, message: {error_message}")
if use_types:
return StreamDiffResponse.from_dict(
{"success": False, "status": response.status_code, "message": error_message}
)
return {}

def stream(self, org_slug: str, full_scan_id: str, use_types: bool = False) -> Union[dict, FullScanStreamResponse]:
path = "orgs/" + org_slug + "/full-scans/" + full_scan_id
response = self.api.do_request(path=path, method="GET")

if response.status_code == 200:
try:
stream_str = []
artifacts = {}
result = response.text
result = result.strip('"').strip()
for line in result.split("\n"):
if line != '"' and line != "" and line is not None:
item = json.loads(line)
stream_str.append(item)
for val in stream_str:
artifacts[val["id"]] = val

if use_types:
return FullScanStreamResponse.from_dict({"success": True, "status": 200, "artifacts": artifacts})
return artifacts
try:
response = self.api.do_request(path=path, method="GET")
if response.status_code == 200:
try:
stream_str = []
artifacts = {}
result = response.text.strip('"').strip()
for line in result.split("\n"):
if line != '"' and line != "" and line is not None:
item = json.loads(line)
stream_str.append(item)
for val in stream_str:
artifacts[val["id"]] = val

if use_types:
return FullScanStreamResponse.from_dict(
{"success": True, "status": 200, "artifacts": artifacts}
)
return artifacts

except Exception as e:
error_message = f"Error parsing stream response: {str(e)}"
log.error(error_message)
if use_types:
return FullScanStreamResponse.from_dict(
{"success": False, "status": response.status_code, "message": error_message}
)
return {}

except APIFailure as e:
log.error(f"Socket SDK: API failure while streaming full scan {e}")
raise
except Exception as e:
log.error(f"Socket SDK: Unexpected error while streaming full scan {e}")
raise

except Exception as e:
error_message = f"Error parsing stream response: {str(e)}"
log.error(error_message)
if use_types:
return FullScanStreamResponse.from_dict(
{"success": False, "status": response.status_code, "message": error_message}
)
return {}

error_message = response.json().get("error", {}).get("message", "Unknown error")
log.error(f"Error streaming full scan: {response.status_code}, message: {error_message}")
if use_types:
return FullScanStreamResponse.from_dict(
{"success": False, "status": response.status_code, "message": error_message}
)
return {}

def metadata(
self, org_slug: str, full_scan_id: str, use_types: bool = False
) -> Union[dict, GetFullScanMetadataResponse]:
path = "orgs/" + org_slug + "/full-scans/" + full_scan_id + "/metadata"
try:
response = self.api.do_request(path=path, method="GET")
if response.status_code == 200:
result = response.json()
if use_types:
return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result})
return result
except APIFailure as e:
log.error(f"Socket SDK: API failure while getting metadata {e}")
raise
except Exception as e:
log.error(f"Socket SDK: Unexpected error while getting metadata {e}")
raise

response = self.api.do_request(path=path, method="GET")

if response.status_code == 200:
result = response.json()
if use_types:
return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result})
return result

error_message = response.json().get("error", {}).get("message", "Unknown error")
log.error(f"Error getting metadata: {response.status_code}, message: {error_message}")
if use_types:
return GetFullScanMetadataResponse.from_dict(
{"success": False, "status": response.status_code, "message": error_message}
)
return {}
Loading
Loading