Skip to content

Return headers on request exception #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 52 additions & 23 deletions socketdev/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@
import requests
from socketdev.core.classes import Response
from socketdev.exceptions import (
APIKeyMissing, APIFailure, APIAccessDenied, APIInsufficientQuota,
APIResourceNotFound, APITimeout, APIConnectionError, APIBadGateway,
APIInsufficientPermissions, APIOrganizationNotAllowed
APIKeyMissing,
APIFailure,
APIAccessDenied,
APIInsufficientQuota,
APIResourceNotFound,
APITimeout,
APIConnectionError,
APIBadGateway,
APIInsufficientPermissions,
APIOrganizationNotAllowed,
)
from socketdev.version import __version__
from requests.exceptions import Timeout, ConnectionError
Expand All @@ -24,7 +31,12 @@ def set_timeout(self, timeout: int):
self.request_timeout = timeout

def do_request(
self, path: str, headers: dict | None = None, payload: [dict, str] = None, files: list = None, method: str = "GET"
self,
path: str,
headers: dict | None = None,
payload: [dict, str] = None,
files: list = None,
method: str = "GET",
) -> Response:
if self.encoded_key is None or self.encoded_key == "":
raise APIKeyMissing
Expand All @@ -36,33 +48,39 @@ def do_request(
"accept": "application/json",
}
url = f"{self.api_url}/{path}"

def format_headers(headers_dict):
return "\n".join(f"{k}: {v}" for k, v in headers_dict.items())

try:
start_time = time.time()
response = requests.request(
method.upper(), url, headers=headers, data=payload, files=files, timeout=self.request_timeout
)
request_duration = time.time() - start_time


headers_str = f"\n\nHeaders:\n{format_headers(response.headers)}" if response.headers else ""
path_str = f"\nPath: {url}"

if response.status_code == 401:
raise APIAccessDenied("Unauthorized")
raise APIAccessDenied(f"Unauthorized{path_str}{headers_str}")
if response.status_code == 403:
try:
error_message = response.json().get('error', {}).get('message', '')
error_message = response.json().get("error", {}).get("message", "")
if "Insufficient permissions for API method" in error_message:
raise APIInsufficientPermissions(error_message)
raise APIInsufficientPermissions(f"{error_message}{path_str}{headers_str}")
elif "Organization not allowed" in error_message:
raise APIOrganizationNotAllowed(error_message)
raise APIOrganizationNotAllowed(f"{error_message}{path_str}{headers_str}")
elif "Insufficient max quota" in error_message:
raise APIInsufficientQuota(error_message)
raise APIInsufficientQuota(f"{error_message}{path_str}{headers_str}")
else:
raise APIAccessDenied(error_message or "Access denied")
raise APIAccessDenied(f"{error_message or 'Access denied'}{path_str}{headers_str}")
except ValueError:
# If JSON parsing fails
raise APIAccessDenied("Access denied")
raise APIAccessDenied(f"Access denied{path_str}{headers_str}")
if response.status_code == 404:
raise APIResourceNotFound(f"Path not found {path}")
raise APIResourceNotFound(f"Path not found {path}{path_str}{headers_str}")
if response.status_code == 429:
retry_after = response.headers.get('retry-after')
retry_after = response.headers.get("retry-after")
if retry_after:
try:
seconds = int(retry_after)
Expand All @@ -73,23 +91,34 @@ def do_request(
time_msg = f" Retry after: {retry_after}"
else:
time_msg = ""
raise APIInsufficientQuota(f"Insufficient quota for API route.{time_msg}")
raise APIInsufficientQuota(f"Insufficient quota for API route.{time_msg}{path_str}{headers_str}")
if response.status_code == 502:
raise APIBadGateway("Upstream server error")
raise APIBadGateway(f"Upstream server error{path_str}{headers_str}")
if response.status_code >= 400:
raise APIFailure(f"Bad Request: HTTP {response.status_code}")

raise APIFailure(
f"Bad Request: HTTP original_status_code:{response.status_code}{path_str}{headers_str}",
status_code=500,
)

return response

except Timeout:
request_duration = time.time() - start_time
raise APITimeout(f"Request timed out after {request_duration:.2f} seconds")
except ConnectionError as error:
request_duration = time.time() - start_time
raise APIConnectionError(f"Connection error after {request_duration:.2f} seconds: {error}")
except (APIAccessDenied, APIInsufficientQuota, APIResourceNotFound, APIFailure,
APITimeout, APIConnectionError, APIBadGateway, APIInsufficientPermissions,
APIOrganizationNotAllowed):
except (
APIAccessDenied,
APIInsufficientQuota,
APIResourceNotFound,
APIFailure,
APITimeout,
APIConnectionError,
APIBadGateway,
APIInsufficientPermissions,
APIOrganizationNotAllowed,
):
# Let all our custom exceptions propagate up unchanged
raise
except Exception as error:
Expand Down
76 changes: 37 additions & 39 deletions socketdev/repos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

log = logging.getLogger("socketdev")


@dataclass
class RepositoryInfo:
id: str
Expand All @@ -19,8 +20,11 @@ class RepositoryInfo:
default_branch: str
slug: Optional[str] = None

def __getitem__(self, key): return getattr(self, key)
def to_dict(self): return asdict(self)
def __getitem__(self, key):
return getattr(self, key)

def to_dict(self):
return asdict(self)

@classmethod
def from_dict(cls, data: dict) -> "RepositoryInfo":
Expand All @@ -35,57 +39,59 @@ def from_dict(cls, data: dict) -> "RepositoryInfo":
visibility=data["visibility"],
archived=data["archived"],
default_branch=data["default_branch"],
slug=data.get("slug")
slug=data.get("slug"),
)


@dataclass
class GetRepoResponse:
success: bool
status: int
data: Optional[RepositoryInfo] = None
message: Optional[str] = None

def __getitem__(self, key): return getattr(self, key)
def to_dict(self): return asdict(self)
def __getitem__(self, key):
return getattr(self, key)

def to_dict(self):
return asdict(self)

@classmethod
def from_dict(cls, data: dict) -> "GetRepoResponse":
return cls(
success=data["success"],
status=data["status"],
message=data.get("message"),
data=RepositoryInfo.from_dict(data.get("data")) if data.get("data") else None
data=RepositoryInfo.from_dict(data.get("data")) if data.get("data") else None,
)


class Repos:
def __init__(self, api):
self.api = api

def get(self, org_slug: str, **kwargs) -> dict[str, List[RepositoryInfo]]:
query_params = {}
if kwargs:
for key, val in kwargs.items():
query_params[key] = val
if len(query_params) == 0:
return {}

def get(self, org_slug: str, **kwargs) -> dict[str, list[dict] | int]:
query_params = kwargs
path = "orgs/" + org_slug + "/repos"
if query_params is not None:

if query_params: # Only add query string if we have parameters
path += "?"
for param in query_params:
value = query_params[param]
path += f"{param}={value}&"
path = path.rstrip("&")

response = self.api.do_request(path=path)

if response.status_code == 200:
raw_result = response.json()
result = {
key: [RepositoryInfo.from_dict(repo) for repo in repos]
for key, repos in raw_result.items()
}
return result
per_page = int(query_params.get("per_page", 30))

# TEMPORARY: Handle pagination edge case where API returns nextPage=1 even when no more results exist
if raw_result["nextPage"] != 0 and len(raw_result["results"]) < per_page:
raw_result["nextPage"] = 0

return raw_result

error_message = response.json().get("error", {}).get("message", "Unknown error")
log.error(f"Error getting repositories: {response.status_code}, message: {error_message}")
Expand All @@ -94,27 +100,19 @@ def get(self, org_slug: str, **kwargs) -> dict[str, List[RepositoryInfo]]:
def repo(self, org_slug: str, repo_name: str) -> GetRepoResponse:
path = f"orgs/{org_slug}/repos/{repo_name}"
response = self.api.do_request(path=path)

if response.status_code == 200:
result = response.json()
return GetRepoResponse.from_dict({
"success": True,
"status": 200,
"data": result
})

return GetRepoResponse.from_dict({"success": True, "status": 200, "data": result})

error_message = response.json().get("error", {}).get("message", "Unknown error")
log.error(f"Failed to get repository: {response.status_code}, message: {error_message}")
return GetRepoResponse.from_dict({
"success": False,
"status": response.status_code,
"message": error_message
})
return GetRepoResponse.from_dict({"success": False, "status": response.status_code, "message": error_message})

def delete(self, org_slug: str, name: str) -> dict:
path = f"orgs/{org_slug}/repos/{name}"
response = self.api.do_request(path=path, method="DELETE")

if response.status_code == 200:
result = response.json()
return result
Expand All @@ -130,11 +128,11 @@ def post(self, org_slug: str, **kwargs) -> dict:
params[key] = val
if len(params) == 0:
return {}

path = "orgs/" + org_slug + "/repos"
payload = json.dumps(params)
response = self.api.do_request(path=path, method="POST", payload=payload)

if response.status_code == 201:
result = response.json()
return result
Expand All @@ -150,11 +148,11 @@ def update(self, org_slug: str, repo_name: str, **kwargs) -> dict:
params[key] = val
if len(params) == 0:
return {}

path = f"orgs/{org_slug}/repos/{repo_name}"
payload = json.dumps(params)
response = self.api.do_request(path=path, method="POST", payload=payload)

if response.status_code == 200:
result = response.json()
return result
Expand Down
2 changes: 1 addition & 1 deletion socketdev/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.0.7"
__version__ = "2.0.8"
Loading