Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
## 0.4.0 (2025-10-31)

### Feat

- **service**: add cached services using aiocache (agnostic backend)

### Fix

- **deps**: forget to add aiocache to `all` extra

### Refactor

- correction of ty warning after upgrade packages

## 0.3.0 (2025-10-15)

### BREAKING CHANGE
Expand Down
59 changes: 59 additions & 0 deletions examples/example_cached.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio
import math
import os
import time
from contextlib import contextmanager

from fastapi_api_key import ApiKeyService, ApiKey
from fastapi_api_key.hasher.argon2 import Argon2ApiKeyHasher
from fastapi_api_key.repositories.in_memory import InMemoryApiKeyRepository
from fastapi_api_key.services.cached import CachedApiKeyService

# Set env var to override default pepper
# Using a strong, unique pepper is crucial for security
# Default pepper is insecure and should not be used in production
pepper = os.getenv("API_KEY_PEPPER", "change_me")
hasher = Argon2ApiKeyHasher(pepper=pepper)

# default hasher is Argon2 with a default pepper (to be changed in prod)
repo = InMemoryApiKeyRepository()


@contextmanager
def benchmark(n: int):
time_start = time.perf_counter()
yield
time_end = time.perf_counter()
time_elapsed = time_end - time_start

ops_per_sec = math.trunc(n / time_elapsed)
print(f" Elapsed time: {time_elapsed:.6f} seconds ({ops_per_sec:,} ops/sec)\n")


async def main():
n = 100

for service in [
# Must use Bcrypt hash each call
ApiKeyService(repo=repo, hasher=hasher),
# Use Bcrypt once and cache the result
CachedApiKeyService(repo=repo, hasher=hasher),
]:
print(f"{service.__class__.__name__}")
print("- First operation (uncached):")

entity = ApiKey(name="dev")
_, api_key = await service.create(entity)

with benchmark(1):
await service.verify_key(api_key)

print(f"- Subsequent {n} operations (cached where applicable):")

with benchmark(n):
for _ in range(n):
await service.verify_key(api_key)


if __name__ == "__main__":
asyncio.run(main())
42 changes: 40 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,48 @@
[project]
name = "fastapi-api-key"
version = "0.3.0"
description = "Add your description here"
version = "0.4.0"
description = "fastapi-api-key provides secure, production-ready API key management for FastAPI. It offers pluggable hashing strategies (Argon2, bcrypt, or custom), backend-agnostic persistence (SQLAlchemy, in-memory, or your own repository), and an optional cache layer (aiocache, Redis). Includes a Typer CLI and a FastAPI router for CRUD management of keys."
readme = "README.md"
authors = [
{ name = "Athroniaeth", email = "pierre.chaumont@hotmail.fr" }
]
keywords = [
"api",
"api key",
"authentication",
"authorization",
"security",
"backend-agnostic",
"fastapi",
"typer",
]

classifiers = [
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Framework :: FastAPI",
"Environment :: Web Environment",
"Topic :: Security",
"Topic :: Security :: Cryptography",
"Topic :: Internet :: WWW/HTTP",
"Typing :: Typed",
]
requires-python = ">=3.9"
dependencies = []

[project.urls]
Homepage = "https://github.com/Athroniaeth/fastapi-api-key"
Documentation = "https://github.com/Athroniaeth/fastapi-api-key#readme"
Issues = "https://github.com/Athroniaeth/fastapi-api-key/issues"
Changelog = "https://github.com/Athroniaeth/fastapi-api-key/blob/main/CHANGELOG.md"


[project.optional-dependencies]
bcrypt = [
"bcrypt>=5.0.0",
Expand All @@ -24,6 +58,9 @@ core = [
"argon2-cffi>=25.1.0",
"bcrypt>=5.0.0",
]
aiocache = [
"aiocache>=0.12.3",
]
fastapi = [
"fastapi>=0.118.0",
]
Expand All @@ -32,6 +69,7 @@ all = [
"sqlalchemy>=2.0.43",
"argon2-cffi>=25.1.0",
"bcrypt>=5.0.0",
"aiocache>=0.12.3",
]
cli = [
"typer>=0.12.5",
Expand Down
2 changes: 1 addition & 1 deletion src/fastapi_api_key/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import subprocess # nosec: B404

from fastapi_api_key.domain.entities import ApiKey
from fastapi_api_key.services.service import ApiKeyService
from fastapi_api_key.services.base import ApiKeyService
from fastapi_api_key.api import create_api_keys_router, create_depends_api_key
from fastapi_api_key.cli import create_api_keys_cli

Expand Down
2 changes: 1 addition & 1 deletion src/fastapi_api_key/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import warnings


from fastapi_api_key.services.service import AbstractApiKeyService
from fastapi_api_key.services.base import AbstractApiKeyService
from fastapi_api_key.types import SecurityHTTPBearer, SecurityAPIKeyHeader

try:
Expand Down
6 changes: 3 additions & 3 deletions src/fastapi_api_key/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import datetime, timezone
from typing import Any, Awaitable, Callable, Optional

from fastapi_api_key.domain.base import ApiKeyEntity
from fastapi_api_key import ApiKey
from fastapi_api_key.domain.errors import (
InvalidKey,
KeyExpired,
Expand Down Expand Up @@ -260,12 +260,12 @@ def _parse_datetime(value: str) -> datetime:
return parsed.astimezone(timezone.utc)


def _format_entity(entity: ApiKeyEntity) -> str:
def _format_entity(entity: ApiKey) -> str:
data = _serialize_entity(entity)
return json.dumps(data, indent=2, sort_keys=True)


def _serialize_entity(entity: ApiKeyEntity) -> dict[str, Any]:
def _serialize_entity(entity: ApiKey) -> dict[str, Any]:
if is_dataclass(entity):
data = asdict(entity)
else:
Expand Down
2 changes: 1 addition & 1 deletion src/fastapi_api_key/domain/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ApiKeyEntity(Protocol):
created_at: datetime
last_used_at: Optional[datetime]
key_id: str
key_hash: Optional[str]
key_hash: str
_key_secret: Optional[str]
_key_secret_first: Optional[str]
_key_secret_last: Optional[str]
Expand Down
14 changes: 11 additions & 3 deletions src/fastapi_api_key/repositories/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


from datetime import datetime
from typing import Callable, Generic, Type, TypeVar, List
from typing import Callable, Generic, Type, TypeVar, List, overload
from typing import Optional

from sqlalchemy import String, Text, Boolean, DateTime
Expand All @@ -22,6 +22,9 @@
from fastapi_api_key.utils import datetime_factory


NoneType = type(None)


class Base(DeclarativeBase): ...


Expand Down Expand Up @@ -157,8 +160,13 @@ def to_model(

return target

@staticmethod
def to_domain(model: Optional[M], model_cls: Type[D]) -> Optional[D]:
@overload
def to_domain(self, model: M, model_cls: Type[D]) -> D: ...

@overload
def to_domain(self, model: NoneType, model_cls: Type[D]) -> NoneType: ...

def to_domain(self, model: Optional[M], model_cls: Type[D]) -> Optional[D]:
if model is None:
return None

Expand Down
125 changes: 125 additions & 0 deletions src/fastapi_api_key/services/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from abc import ABC, abstractmethod
from typing import Generic, Optional, Type, Tuple, List

from fastapi_api_key import ApiKey
from fastapi_api_key.domain.base import D
from fastapi_api_key.domain.errors import KeyNotProvided, KeyNotFound, InvalidKey
from fastapi_api_key.hasher.argon2 import Argon2ApiKeyHasher
from fastapi_api_key.hasher.base import ApiKeyHasher
from fastapi_api_key.repositories.base import AbstractApiKeyRepository
from fastapi_api_key.utils import datetime_factory, key_secret_factory

DEFAULT_SEPARATOR = "-"
"""
Expand Down Expand Up @@ -137,3 +140,125 @@ async def verify_key(self, api_key: str) -> D:
an InvalidKey exception is raised. Else, the entity is returned.
"""
...


class ApiKeyService(AbstractApiKeyService[D]):
"""Generic service contract for a domain aggregate."""

def __init__(
self,
repo: AbstractApiKeyRepository[D],
hasher: Optional[ApiKeyHasher] = None,
domain_cls: Optional[Type[D]] = None,
separator: str = DEFAULT_SEPARATOR,
global_prefix: str = "ak",
) -> None:
domain_cls = domain_cls or ApiKey
super().__init__(
repo=repo,
hasher=hasher,
domain_cls=domain_cls,
separator=separator,
global_prefix=global_prefix,
)

async def get_by_id(self, id_: str) -> D:
if id_.strip() == "":
raise KeyNotProvided("No API key provided")

entity = await self._repo.get_by_id(id_)

if entity is None:
raise KeyNotFound(f"API key with ID '{id_}' not found")

return entity

async def get_by_key_id(self, key_id: str) -> D:
if not key_id.strip():
raise KeyNotProvided("No API key key_id provided (key_id cannot be empty)")

entity = await self._repo.get_by_key_id(key_id)

if entity is None:
raise KeyNotFound(f"API key with key_id '{key_id}' not found")

return entity

async def create(self, entity: D, key_secret: Optional[str] = None) -> Tuple[D, str]:
if entity.expires_at and entity.expires_at < datetime_factory():
raise ValueError("Expiration date must be in the future")

key_secret = key_secret or entity.key_secret or key_secret_factory()

full_key_secret = entity.full_key_secret(
self.global_prefix,
self.separator,
key_secret=key_secret,
)
entity.key_hash = self._hasher.hash(key_secret)
entity._key_secret = key_secret
return await self._repo.create(entity), full_key_secret

async def update(self, entity: D) -> D:
result = await self._repo.update(entity)

if result is None:
raise KeyNotFound(f"API key with ID '{entity.id_}' not found")

return result

async def delete_by_id(self, id_: str) -> bool:
result = await self._repo.delete_by_id(id_)

if not result:
raise KeyNotFound(f"API key with ID '{id_}' not found")

return result

async def list(self, limit: int = 100, offset: int = 0) -> list[D]:
return await self._repo.list(limit=limit, offset=offset)

async def verify_key(self, api_key: Optional[str] = None) -> D:
if api_key is None:
raise KeyNotProvided("Api key must be provided (not given)")

if api_key.strip() == "":
raise KeyNotProvided("Api key must be provided (empty)")

# Global key_id "ak" for "api key"
if not api_key.startswith(self.global_prefix):
raise InvalidKey("Api key is invalid (missing global key_id)")

# Get the key_id part from the plain key
try:
parts = api_key.split(self.separator)

if len(parts) != 3:
raise InvalidKey("API key format is invalid (wrong number of segments).")

global_prefix, prefix, secret = parts
except Exception as e:
raise InvalidKey(f"API key format is invalid: {str(e)}") from e

# Search entity by a key_id (can't brute force hashes)
entity = await self.get_by_key_id(prefix)

# Check if the entity can be used for authentication
# and refresh last_used_at if verified
entity.ensure_can_authenticate()

key_hash = entity.key_hash

if not secret:
raise InvalidKey("API key is invalid (empty secret)")

if not self._hasher.verify(key_hash, secret):
raise InvalidKey("API key is invalid (hash mismatch)")

entity.touch()
updated = await self._repo.update(entity)

if updated is None:
raise KeyNotFound(f"API key with ID '{entity.id_}' not found during touch update")

return updated
Loading