Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 109 additions & 30 deletions python/numbersprotocol_capture/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import json
import mimetypes
import threading
import time
from collections.abc import Callable
from pathlib import Path
from typing import Any
from urllib.parse import urlencode
Expand Down Expand Up @@ -36,6 +39,11 @@
ASSET_SEARCH_API_URL = "https://us-central1-numbers-protocol-api.cloudfunctions.net/asset-search"
NFT_SEARCH_API_URL = "https://eofveg1f59hrbn.m.pipedream.net"

DEFAULT_TIMEOUT = 30.0
DEFAULT_MAX_RETRIES = 3
DEFAULT_RETRY_DELAY = 1.0
RETRYABLE_STATUS_CODES = frozenset([429, 500, 502, 503, 504])

# Common MIME types by extension
MIME_TYPES: dict[str, str] = {
"jpg": "image/jpeg",
Expand Down Expand Up @@ -134,6 +142,10 @@ def __init__(
*,
testnet: bool = False,
base_url: str | None = None,
timeout: float = DEFAULT_TIMEOUT,
max_retries: int = DEFAULT_MAX_RETRIES,
retry_delay: float = DEFAULT_RETRY_DELAY,
rate_limit: int | None = None,
options: CaptureOptions | None = None,
):
"""
Expand All @@ -143,20 +155,37 @@ def __init__(
token: Authentication token for API access.
testnet: Use testnet environment (default: False).
base_url: Custom base URL (overrides testnet setting).
timeout: Request timeout in seconds (default: 30.0).
max_retries: Maximum retry attempts for transient failures (default: 3).
retry_delay: Initial backoff delay in seconds (default: 1.0).
rate_limit: Maximum requests per second (default: None, no limiting).
options: CaptureOptions object (alternative to individual args).
"""
if options:
token = options.token
testnet = options.testnet
base_url = options.base_url
timeout = options.timeout
max_retries = options.max_retries
retry_delay = options.retry_delay
rate_limit = options.rate_limit

if not token:
raise ValidationError("token is required")

self._token = token
self._testnet = testnet
self._base_url = base_url or DEFAULT_BASE_URL
self._client = httpx.Client(timeout=30.0)
self._timeout = timeout
self._max_retries = max_retries
self._retry_delay = retry_delay
self._rate_limit = rate_limit
self._client = httpx.Client(timeout=timeout)

# Rate limiter state (token bucket)
self._rate_limit_lock = threading.Lock()
self._rate_limit_tokens = float(rate_limit) if rate_limit is not None else 0.0
self._rate_limit_last_time = time.monotonic()

def __enter__(self) -> Capture:
return self
Expand All @@ -168,6 +197,56 @@ def close(self) -> None:
"""Close the HTTP client."""
self._client.close()

def _acquire_rate_limit_token(self) -> None:
"""Acquires a rate-limit token using a token-bucket algorithm."""
if not self._rate_limit:
return
wait = 0.0
with self._rate_limit_lock:
now = time.monotonic()
elapsed = max(0.0, now - self._rate_limit_last_time)
self._rate_limit_tokens = min(
float(self._rate_limit),
self._rate_limit_tokens + elapsed * self._rate_limit,
)
self._rate_limit_last_time = now
if self._rate_limit_tokens >= 1.0:
self._rate_limit_tokens -= 1.0
return
wait = (1.0 - self._rate_limit_tokens) / self._rate_limit
self._rate_limit_tokens = 0.0
# Project last_time forward so the next caller correctly sees 0 tokens
self._rate_limit_last_time = now + wait
time.sleep(wait)

def _execute_with_retry(
self,
func: Callable[[], httpx.Response],
nid: str | None = None,
) -> httpx.Response:
"""Execute an HTTP request callable with retry and rate-limit logic."""
self._acquire_rate_limit_token()

final_error: Exception | None = None
final_response: httpx.Response | None = None

for attempt in range(self._max_retries + 1):
if attempt > 0:
delay = self._retry_delay * (2.0 ** (attempt - 1))
time.sleep(delay)
try:
response = func()
final_response = response
if response.status_code in RETRYABLE_STATUS_CODES and attempt < self._max_retries:
continue
return response
except httpx.RequestError as e:
final_error = e

if final_response is not None:
return final_response
raise create_api_error(0, f"Network error: {final_error}", nid) from final_error

def _request(
self,
method: str,
Expand All @@ -181,32 +260,31 @@ def _request(
"""Makes an authenticated API request."""
headers = {"Authorization": f"token {self._token}"}

try:
def build_and_send() -> httpx.Response:
if files:
response = self._client.request(
return self._client.request(
method,
url,
headers=headers,
data=data,
files=files,
)
elif json_body:
headers["Content-Type"] = "application/json"
response = self._client.request(
return self._client.request(
method,
url,
headers=headers,
headers={**headers, "Content-Type": "application/json"},
json=json_body,
)
else:
response = self._client.request(
return self._client.request(
method,
url,
headers=headers,
data=data,
)
except httpx.RequestError as e:
raise create_api_error(0, f"Network error: {e}", nid) from e

response = self._execute_with_retry(build_and_send, nid=nid)

if not response.is_success:
message = f"API request failed with status {response.status_code}"
Expand Down Expand Up @@ -451,10 +529,10 @@ def get_history(self, nid: str) -> list[Commit]:
"Authorization": f"token {self._token}",
}

try:
response = self._client.get(url, headers=headers)
except httpx.RequestError as e:
raise create_api_error(0, f"Network error: {e}", nid) from e
response = self._execute_with_retry(
lambda: self._client.get(url, headers=headers),
nid=nid,
)

if not response.is_success:
raise create_api_error(
Expand Down Expand Up @@ -516,14 +594,14 @@ def get_asset_tree(self, nid: str) -> AssetTree:
"Authorization": f"token {self._token}",
}

try:
response = self._client.post(
response = self._execute_with_retry(
lambda: self._client.post(
MERGE_TREE_API_URL,
headers=headers,
json=commit_data,
)
except httpx.RequestError as e:
raise create_api_error(0, f"Network error: {e}", nid) from e
),
nid=nid,
)

if not response.is_success:
raise create_api_error(
Expand Down Expand Up @@ -677,22 +755,23 @@ def search_asset(
# Verify Engine API requires token in Authorization header, not form data
headers = {"Authorization": f"token {self._token}"}

try:
if files_data:
response = self._client.post(
if files_data:
response = self._execute_with_retry(
lambda: self._client.post(
ASSET_SEARCH_API_URL,
headers=headers,
data=form_data,
files=files_data,
)
else:
response = self._client.post(
)
else:
response = self._execute_with_retry(
lambda: self._client.post(
ASSET_SEARCH_API_URL,
headers=headers,
data=form_data,
)
except httpx.RequestError as e:
raise create_api_error(0, f"Network error: {e}") from e
)

if not response.is_success:
message = f"Asset search failed with status {response.status_code}"
Expand Down Expand Up @@ -745,14 +824,14 @@ def search_nft(self, nid: str) -> NftSearchResult:
"Authorization": f"token {self._token}",
}

try:
response = self._client.post(
response = self._execute_with_retry(
lambda: self._client.post(
NFT_SEARCH_API_URL,
headers=headers,
json={"nid": nid},
)
except httpx.RequestError as e:
raise create_api_error(0, f"Network error: {e}", nid) from e
),
nid=nid,
)

if not response.is_success:
message = f"NFT search failed with status {response.status_code}"
Expand Down
12 changes: 12 additions & 0 deletions python/numbersprotocol_capture/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ class CaptureOptions:
base_url: str | None = None
"""Custom base URL (overrides testnet setting)."""

timeout: float = 30.0
"""Request timeout in seconds (default: 30.0)."""

max_retries: int = 3
"""Maximum number of retry attempts for transient failures (default: 3)."""

retry_delay: float = 1.0
"""Initial delay in seconds for exponential backoff (default: 1.0)."""

rate_limit: int | None = None
"""Maximum requests per second for client-side rate limiting (default: None)."""


@dataclass
class SignOptions:
Expand Down
Loading