Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
656770c
feat:HTTP Client to handle sessions closes #6
watersRand Sep 13, 2025
edd91f0
Merge branch 'RafaelJohn9:master' into feat/add-sessions
watersRand Sep 13, 2025
b86336a
feat(http-client): Add optional session management
watersRand Sep 13, 2025
b6f99ad
Merge branch 'feat/add-sessions' of https://github.com/watersRand/mpe…
watersRand Sep 13, 2025
197f589
fix(ci): Resolve Mypy type-checking error in MpesaHttpClient
watersRand Sep 13, 2025
60d6944
Merge pull request #52 from watersRand/feat/add-sessions
RafaelJohn9 Sep 16, 2025
8fdbf03
feat(http): Add basic logging configuration for request traceability …
watersRand Sep 27, 2025
2f38c07
feat(http_client): Implement robust tenacity-based retry logic for Mp…
watersRand Sep 27, 2025
8c9a109
fix: Resolve ruff and mypy errors in MpesaHttpClient
watersRand Sep 20, 2025
419ed56
updated the test files to fix ruff D205 and 200 errors
watersRand Sep 20, 2025
29a90aa
Lint issues
watersRand Sep 20, 2025
f0e98b8
Merge pull request #85 from watersRand/retry-function
RafaelJohn9 Sep 29, 2025
8843eae
fix: Update mpesakit/http_client/mpesa_http_client.py
RafaelJohn9 Oct 14, 2025
97d7c4d
Merge branch 'develop' into dev
RafaelJohn9 Oct 28, 2025
372a1dc
feat(http-client): harden retry strategy and add session lifecycle ma…
RafaelJohn9 Jan 15, 2026
76c840d
chore(lib): clean up pyproject and bump requests to address GHSA-9hjg…
RafaelJohn9 Jan 15, 2026
74bc47e
Merge branch 'develop' into dev
RafaelJohn9 Jan 15, 2026
3b0b644
fix (pyproject.toml): fixed conflicting keys configuraton.
RafaelJohn9 Jan 15, 2026
7ffbf75
chore (dependencies): upgrade tenacity to version 9
RafaelJohn9 Jan 15, 2026
097ff40
chore: introduced urllib's urljoin functionality to ensure an efficie…
RafaelJohn9 Jan 15, 2026
4cc0e6b
feat(http-client): raise explicit error on JSON decode failures
RafaelJohn9 Jan 15, 2026
5c6434e
chore (format): fix ruff formatting errors.
RafaelJohn9 Jan 16, 2026
309adad
refactor (http client): Split get and post methods into two (low-leve…
RafaelJohn9 Jan 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 180 additions & 115 deletions mpesakit/http_client/mpesa_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,102 +3,194 @@
Handles GET and POST requests with error handling for common HTTP issues.
"""

from typing import Dict, Any, Optional
import logging
from typing import Any, Dict, Optional
from urllib.parse import urljoin

import requests
from tenacity import (
RetryCallState,
before_sleep_log,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
)

from mpesakit.errors import MpesaApiException, MpesaError

from mpesakit.errors import MpesaError, MpesaApiException
from .http_client import HttpClient

logger = logging.getLogger(__name__)

class MpesaHttpClient(HttpClient):
"""A client for making HTTP requests to the M-Pesa API.

This client handles GET and POST requests, including error handling for common HTTP issues.
It supports both sandbox and production environments.
def handle_request_error(response: requests.Response):
"""Handles non-successful HTTP responses.

This function is now responsible for converting HTTP status codes
and JSON parsing errors into MpesaApiException.
"""
try:
response_data = response.json()
except ValueError:
response_data = {"errorMessage": response.text.strip() or ""}

if not response.ok:
error_message = response_data.get("errorMessage", "")
raise MpesaApiException(
MpesaError(
error_code=f"HTTP_{response.status_code}",
error_message=error_message,
status_code=response.status_code,
raw_response=response_data,
)
)


def handle_retry_exception(retry_state: RetryCallState):
"""Custom hook to handle exceptions after all retries fail.

It raises a custom MpesaApiException with the appropriate error code.
"""
if retry_state.outcome:
exception = retry_state.outcome.exception()

if isinstance(exception, requests.exceptions.Timeout):
raise MpesaApiException(
MpesaError(error_code="REQUEST_TIMEOUT", error_message=str(exception))
) from exception
elif isinstance(exception, requests.exceptions.ConnectionError):
raise MpesaApiException(
MpesaError(error_code="CONNECTION_ERROR", error_message=str(exception))
) from exception

raise MpesaApiException(
MpesaError(error_code="REQUEST_FAILED", error_message=str(exception))
) from exception

raise MpesaApiException(
MpesaError(
error_code="REQUEST_FAILED",
error_message="An unknown retry error occurred.",
)
)

Attributes:
base_url (str): The base URL for the M-Pesa API, depending on the environment.

def retry_enabled(enabled: bool):
"""Factory function to conditionally enable retries.

Args:
enabled (bool): Whether to enable retry logic.

Returns:
A retry condition function.
"""
base_retry = retry_if_exception_type(
requests.exceptions.Timeout
) | retry_if_exception_type(requests.exceptions.ConnectionError)

def _retry(retry_state):
if not enabled:
return False
return base_retry(retry_state)

return _retry


class MpesaHttpClient(HttpClient):
"""A client for making HTTP requests to the M-Pesa API."""

base_url: str
_session: Optional[requests.Session] = None

def __init__(self, env: str = "sandbox"):
"""Initializes the MpesaHttpClient with the specified environment.
def __init__(self, env: str = "sandbox", use_session: bool = False):
"""Initializes the MpesaHttpClient instance.

Args:
env (str): The environment to use, either 'sandbox' or 'production'.
Defaults to 'sandbox'.
env (str): The environment to connect to ('sandbox' or 'production').
use_session (bool): Whether to use a persistent session.
"""
self.base_url = self._resolve_base_url(env)
if use_session:
self._session = requests.Session()
self._session.trust_env = False

def _resolve_base_url(self, env: str) -> str:
if env.lower() == "production":
return "https://api.safaricom.co.ke"
return "https://sandbox.safaricom.co.ke"

@retry(
retry=retry_enabled(enabled=True),
wait=wait_random_exponential(multiplier=5, max=8),
stop=stop_after_attempt(3),
retry_error_callback=handle_retry_exception,
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def _raw_post(
self, url: str, json: Dict[str, Any], headers: Dict[str, str], timeout: int = 10
) -> requests.Response:
"""Low-level POST request - may raise requests exceptions."""
full_url = urljoin(self.base_url, url)
if self._session:
return self._session.post(
full_url, json=json, headers=headers, timeout=timeout
)
else:
return requests.post(full_url, json=json, headers=headers, timeout=timeout)

def post(
self, url: str, json: Dict[str, Any], headers: Dict[str, str]
self, url: str, json: Dict[str, Any], headers: Dict[str, str], timeout: int = 10
) -> Dict[str, Any]:
"""Sends a POST request to the M-Pesa API.

Args:
url (str): The endpoint URL to send the POST request to.
json (Dict[str, Any]): The JSON payload to include in the request body.
headers (Dict[str, str]): The headers to include in the request.
url (str): The URL path for the request.
json (Dict[str, Any]): The JSON payload for the request body.
headers (Dict[str, str]): The HTTP headers for the request.
timeout (int): The timeout for the request in seconds.

Returns:
Dict[str, Any]: The JSON response from the M-Pesa API.

Raises:
MpesaApiException: If the request fails or returns an error response.
Dict[str, Any]: The JSON response from the API.
"""
response: requests.Response | None = None
try:
full_url = f"{self.base_url}{url}"
response = requests.post(full_url, json=json, headers=headers, timeout=10)

try:
response_data = response.json()
except ValueError:
response_data = {"errorMessage": response.text.strip() or ""}

if not response.ok:
error_message = response_data.get("errorMessage", "")
raise MpesaApiException(
MpesaError(
error_code=f"HTTP_{response.status_code}",
error_message=error_message,
status_code=response.status_code,
raw_response=response_data,
)
)

return response_data

except requests.Timeout:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_TIMEOUT",
error_message="Request to Mpesa timed out.",
status_code=None,
)
)
except requests.ConnectionError:
raise MpesaApiException(
MpesaError(
error_code="CONNECTION_ERROR",
error_message="Failed to connect to Mpesa API. Check network or URL.",
status_code=None,
)
)
except requests.RequestException as e:
response = self._raw_post(url, json, headers, timeout)
handle_request_error(response)
return response.json()
except (requests.RequestException, ValueError) as e:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_FAILED",
error_message=f"HTTP request failed: {str(e)}",
status_code=None,
raw_response=None,
error_message=str(e),
status_code=getattr(response, "status_code", None),
raw_response=getattr(response, "text", None),
)
) from e

@retry(
retry=retry_enabled(enabled=True),
wait=wait_random_exponential(multiplier=5, max=8),
stop=stop_after_attempt(3),
retry_error_callback=handle_retry_exception,
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def _raw_get(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> requests.Response:
"""Low-level GET request - may raise requests exceptions."""
if headers is None:
headers = {}
full_url = urljoin(self.base_url, url)
if self._session:
return self._session.get(
full_url, params=params, headers=headers, timeout=10
)
else:
return requests.get(full_url, params=params, headers=headers, timeout=10)

def get(
self,
Expand All @@ -109,65 +201,38 @@ def get(
"""Sends a GET request to the M-Pesa API.

Args:
url (str): The endpoint URL to send the GET request to.
params (Optional[Dict[str, Any]]): The query parameters to include in the request.
headers (Optional[Dict[str, str]]): The headers to include in the request.
url (str): The URL path for the request.
params (Optional[Dict[str, Any]]): The URL parameters.
headers (Optional[Dict[str, str]]): The HTTP headers.

Returns:
Dict[str, Any]: The JSON response from the M-Pesa API.

Raises:
MpesaApiException: If the request fails or returns an error response.
Dict[str, Any]: The JSON response from the API.
"""
response: requests.Response | None = None
try:
if headers is None:
headers = {}
full_url = f"{self.base_url}{url}"

response = requests.get(
full_url, params=params, headers=headers, timeout=10
) # Add timeout

try:
response_data = response.json()
except ValueError:
response_data = {"errorMessage": response.text.strip() or ""}

if not response.ok:
error_message = response_data.get("errorMessage", "")
raise MpesaApiException(
MpesaError(
error_code=f"HTTP_{response.status_code}",
error_message=error_message,
status_code=response.status_code,
raw_response=response_data,
)
)

return response_data

except requests.Timeout:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_TIMEOUT",
error_message="Request to Mpesa timed out.",
status_code=None,
)
)
except requests.ConnectionError:
raise MpesaApiException(
MpesaError(
error_code="CONNECTION_ERROR",
error_message="Failed to connect to Mpesa API. Check network or URL.",
status_code=None,
)
)
except requests.RequestException as e:
response = self._raw_get(url, params, headers)
handle_request_error(response)
return response.json()
except (requests.RequestException, ValueError) as e:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_FAILED",
error_message=f"HTTP request failed: {str(e)}",
status_code=None,
raw_response=None,
error_message=str(e),
status_code=getattr(response, "status_code", None),
raw_response=getattr(response, "text", None),
)
)
) from e

def close(self) -> None:
"""Closes the persistent session if it exists."""
if self._session:
self._session.close()
self._session = None

def __enter__(self) -> "MpesaHttpClient":
"""Context manager entry point."""
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit point. Closes the session."""
self.close()
4 changes: 2 additions & 2 deletions mpesakit/mpesa_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ class MpesaClient:
"""Unified client for all M-PESA services."""

def __init__(
self, consumer_key: str, consumer_secret: str, environment: str = "sandbox"
self, consumer_key: str, consumer_secret: str, environment: str = "sandbox",use_session: bool = False
) -> None:
"""Initialize the MpesaClient with all service facades."""
self.http_client = MpesaHttpClient(env=environment)
self.http_client = MpesaHttpClient(env=environment,use_session=use_session)
self.token_manager = TokenManager(
http_client=self.http_client,
consumer_key=consumer_key,
Expand Down
Loading