diff --git a/aws_lambda_powertools/event_handler/__init__.py b/aws_lambda_powertools/event_handler/__init__.py index f374590428d..6b926e6248a 100644 --- a/aws_lambda_powertools/event_handler/__init__.py +++ b/aws_lambda_powertools/event_handler/__init__.py @@ -17,6 +17,7 @@ BedrockFunctionResponse, ) from aws_lambda_powertools.event_handler.events_appsync.appsync_events import AppSyncEventsResolver +from aws_lambda_powertools.event_handler.http_resolver import HttpResolverLocal from aws_lambda_powertools.event_handler.lambda_function_url import ( LambdaFunctionUrlResolver, ) @@ -34,6 +35,7 @@ "BedrockResponse", "BedrockFunctionResponse", "CORSConfig", + "HttpResolverLocal", "LambdaFunctionUrlResolver", "Response", "VPCLatticeResolver", diff --git a/aws_lambda_powertools/event_handler/http_resolver.py b/aws_lambda_powertools/event_handler/http_resolver.py new file mode 100644 index 00000000000..5b6ff3f5adf --- /dev/null +++ b/aws_lambda_powertools/event_handler/http_resolver.py @@ -0,0 +1,485 @@ +from __future__ import annotations + +import asyncio +import base64 +import inspect +import warnings +from typing import TYPE_CHECKING, Any, Callable +from urllib.parse import parse_qs + +from aws_lambda_powertools.event_handler.api_gateway import ( + ApiGatewayResolver, + BaseRouter, + ProxyEventType, + Response, + Route, +) +from aws_lambda_powertools.shared.headers_serializer import BaseHeadersSerializer +from aws_lambda_powertools.utilities.data_classes.common import BaseProxyEvent + +if TYPE_CHECKING: + from aws_lambda_powertools.shared.cookies import Cookie + + +class HttpHeadersSerializer(BaseHeadersSerializer): + """Headers serializer for native HTTP responses.""" + + def serialize(self, headers: dict[str, str | list[str]], cookies: list[Cookie]) -> dict[str, Any]: + """Serialize headers for HTTP response format.""" + combined_headers: dict[str, str] = {} + for key, values in headers.items(): + if values is None: # pragma: no cover + continue + if isinstance(values, str): + combined_headers[key] = values + else: + combined_headers[key] = ", ".join(values) + + # Add cookies as Set-Cookie headers + cookie_headers = [str(cookie) for cookie in cookies] if cookies else [] + + return {"headers": combined_headers, "cookies": cookie_headers} + + +class HttpProxyEvent(BaseProxyEvent): + """ + A proxy event that wraps native HTTP request data. + + This allows the same route handlers to work with both Lambda and native HTTP servers. + """ + + def __init__( + self, + method: str, + path: str, + headers: dict[str, str] | None = None, + body: str | bytes | None = None, + query_string: str | None = None, + path_parameters: dict[str, str] | None = None, + request_context: dict[str, Any] | None = None, + ): + # Parse query string + query_params: dict[str, str] = {} + multi_query_params: dict[str, list[str]] = {} + + if query_string: + parsed = parse_qs(query_string, keep_blank_values=True) + multi_query_params = parsed + query_params = {k: v[-1] for k, v in parsed.items()} + + # Normalize body to string + body_str = None + if body is not None: + body_str = body.decode("utf-8") if isinstance(body, bytes) else body + + # Build the internal dict structure that BaseProxyEvent expects + data = { + "httpMethod": method.upper(), + "path": path, + "headers": headers or {}, + "body": body_str, + "isBase64Encoded": False, + "queryStringParameters": query_params, + "multiValueQueryStringParameters": multi_query_params, + "pathParameters": path_parameters or {}, + "requestContext": request_context + or { + "stage": "local", + "requestId": "local-request-id", + "http": {"method": method.upper(), "path": path}, + }, + } + + super().__init__(data) + + @classmethod + def _from_dict(cls, data: dict[str, Any]) -> HttpProxyEvent: + """Create HttpProxyEvent directly from a dict (used internally).""" + instance = object.__new__(cls) + BaseProxyEvent.__init__(instance, data) + return instance + + @classmethod + def from_asgi(cls, scope: dict[str, Any], body: bytes | None = None) -> HttpProxyEvent: + """ + Create an HttpProxyEvent from an ASGI scope dict. + + Parameters + ---------- + scope : dict + ASGI scope dictionary + body : bytes, optional + Request body + + Returns + ------- + HttpProxyEvent + Event object compatible with Powertools resolvers + """ + # Extract headers from ASGI format [(b"key", b"value"), ...] + headers: dict[str, str] = {} + for key, value in scope.get("headers", []): + header_name = key.decode("utf-8").lower() + header_value = value.decode("utf-8") + # Handle duplicate headers by joining with comma + if header_name in headers: + headers[header_name] = f"{headers[header_name]}, {header_value}" + else: + headers[header_name] = header_value + + return cls( + method=scope["method"], + path=scope["path"], + headers=headers, + body=body, + query_string=scope.get("query_string", b"").decode("utf-8"), + ) + + def header_serializer(self) -> BaseHeadersSerializer: + """Return the HTTP headers serializer.""" + return HttpHeadersSerializer() + + @property + def resolved_query_string_parameters(self) -> dict[str, list[str]]: + """Return query parameters in the format expected by OpenAPI validation.""" + return self.multi_value_query_string_parameters + + @property + def resolved_headers_field(self) -> dict[str, str]: + """Return headers in the format expected by OpenAPI validation.""" + return self.headers + + +class MockLambdaContext: + """Minimal Lambda context for HTTP adapter.""" + + function_name = "http-resolver" + memory_limit_in_mb = 128 + invoked_function_arn = "arn:aws:lambda:local:000000000000:function:http-resolver" + aws_request_id = "local-request-id" + log_group_name = "/aws/lambda/http-resolver" + log_stream_name = "local" + + def get_remaining_time_in_millis(self) -> int: # pragma: no cover + return 300000 # 5 minutes + + +class HttpResolverLocal(ApiGatewayResolver): + """ + ASGI-compatible HTTP resolver for local development and testing. + + This resolver is designed specifically for local development workflows. + It allows you to run your Powertools application locally with any ASGI server + (uvicorn, hypercorn, daphne, etc.) while maintaining full compatibility with Lambda. + + The same code works in both environments - locally via ASGI and in Lambda via the handler. + + Supports both sync and async route handlers. + + WARNING + ------- + This is intended for local development and testing only. + The API may change in future releases. Do not use in production environments. + + Example + ------- + ```python + from aws_lambda_powertools.event_handler import HttpResolverLocal + + app = HttpResolverLocal() + + @app.get("/hello/") + async def hello(name: str): + # Async handler - can use await + return {"message": f"Hello, {name}!"} + + @app.get("/sync") + def sync_handler(): + # Sync handlers also work + return {"sync": True} + + # Run locally with uvicorn: + # uvicorn app:app --reload + + # Deploy to Lambda (sync only): + # handler = app + ``` + """ + + def __init__( + self, + cors: Any = None, + debug: bool | None = None, + serializer: Callable[[dict], str] | None = None, + strip_prefixes: list[str | Any] | None = None, + enable_validation: bool = False, + ): + warnings.warn( + "HttpResolverLocal is intended for local development and testing only. " + "The API may change in future releases. Do not use in production environments.", + stacklevel=2, + ) + super().__init__( + proxy_type=ProxyEventType.APIGatewayProxyEvent, # Use REST API format internally + cors=cors, + debug=debug, + serializer=serializer, + strip_prefixes=strip_prefixes, + enable_validation=enable_validation, + ) + self._is_async_mode = False + + def _to_proxy_event(self, event: dict) -> BaseProxyEvent: + """Convert event dict to HttpProxyEvent.""" + # Create HttpProxyEvent directly from the dict data + # The dict already has queryStringParameters and multiValueQueryStringParameters + return HttpProxyEvent._from_dict(event) + + def _get_base_path(self) -> str: + """Return the base path for HTTP resolver (no stage prefix).""" + return "" + + async def _resolve_async(self) -> dict: + """Async version of resolve that supports async handlers.""" + method = self.current_event.http_method.upper() + path = self._remove_prefix(self.current_event.path) + + registered_routes = self._static_routes + self._dynamic_routes + + for route in registered_routes: + if method != route.method: + continue + match_results = route.rule.match(path) + if match_results: + self.append_context(_route=route, _path=path) + route_keys = self._convert_matches_into_route_keys(match_results) + return await self._call_route_async(route, route_keys) + + # Handle not found + return await self._handle_not_found_async() + + async def _call_route_async(self, route: Route, route_arguments: dict[str, str]) -> dict: + """Call route handler, supporting both sync and async handlers.""" + from aws_lambda_powertools.event_handler.api_gateway import ResponseBuilder + + try: + self._reset_processed_stack() + + # Get the route args (may be modified by validation middleware) + self.append_context(_route_args=route_arguments) + + # Run middleware chain (sync for now, handlers can be async) + response = await self._run_middleware_chain_async(route) + + response_builder: ResponseBuilder = ResponseBuilder( + response=response, + serializer=self._serializer, + route=route, + ) + + return response_builder.build(self.current_event, self._cors) + + except Exception as exc: + exc_response_builder = self._call_exception_handler(exc, route) + if exc_response_builder: + return exc_response_builder.build(self.current_event, self._cors) + raise + + async def _run_middleware_chain_async(self, route: Route) -> Response: + """Run the middleware chain, awaiting async handlers.""" + # Build middleware list + all_middlewares: list[Callable[..., Any]] = [] + + if hasattr(self, "_request_validation_middleware"): + all_middlewares.append(self._request_validation_middleware) + + all_middlewares.extend(self._router_middlewares + route.middlewares) + + if hasattr(self, "_response_validation_middleware"): + all_middlewares.append(self._response_validation_middleware) + + # Create the final handler that calls the route function + async def final_handler(app): + route_args = app.context.get("_route_args", {}) + result = route.func(**route_args) + + # Await if coroutine + if inspect.iscoroutine(result): + result = await result + + return self._to_response(result) + + # Build middleware chain from end to start + next_handler = final_handler + + for middleware in reversed(all_middlewares): + next_handler = self._wrap_middleware_async(middleware, next_handler) + + return await next_handler(self) + + def _wrap_middleware_async(self, middleware: Callable, next_handler: Callable) -> Callable: + """Wrap a middleware to work in async context.""" + + async def wrapped(app): + # Create a next_middleware that the sync middleware can call + def sync_next(app): + # This will be called by sync middleware + # We need to run the async next_handler + loop = asyncio.get_event_loop() + if loop.is_running(): + # We're in an async context, create a task + future = asyncio.ensure_future(next_handler(app)) + # Store for later await + app.context["_async_next_result"] = future + return Response(status_code=200, body="") # Placeholder + else: # pragma: no cover + return loop.run_until_complete(next_handler(app)) + + # Check if middleware is async + if inspect.iscoroutinefunction(middleware): + result = await middleware(app, next_handler) + else: + # Sync middleware - need special handling + result = middleware(app, sync_next) + + # Check if we stored an async result + if "_async_next_result" in app.context: + future = app.context.pop("_async_next_result") + result = await future + + return result + + return wrapped + + async def _handle_not_found_async(self) -> dict: + """Handle 404 responses, using custom not_found handler if registered.""" + from http import HTTPStatus + + from aws_lambda_powertools.event_handler.api_gateway import ResponseBuilder + from aws_lambda_powertools.event_handler.exceptions import NotFoundError + + # Check for custom not_found handler + custom_not_found_handler = self.exception_handler_manager.lookup_exception_handler(NotFoundError) + if custom_not_found_handler: + response = custom_not_found_handler(NotFoundError()) + else: + response = Response( + status_code=HTTPStatus.NOT_FOUND.value, + content_type="application/json", + body={"statusCode": HTTPStatus.NOT_FOUND.value, "message": "Not found"}, + ) + + response_builder: ResponseBuilder = ResponseBuilder( + response=response, + serializer=self._serializer, + route=None, + ) + + return response_builder.build(self.current_event, self._cors) + + async def asgi_handler(self, scope: dict, receive: Callable, send: Callable) -> None: + """ + ASGI interface - allows running with uvicorn/hypercorn/etc. + + Parameters + ---------- + scope : dict + ASGI connection scope + receive : Callable + ASGI receive function + send : Callable + ASGI send function + """ + if scope["type"] == "lifespan": + # Handle lifespan events (startup/shutdown) + while True: + message = await receive() + if message["type"] == "lifespan.startup": + await send({"type": "lifespan.startup.complete"}) + elif message["type"] == "lifespan.shutdown": + await send({"type": "lifespan.shutdown.complete"}) + return + + if scope["type"] != "http": + return + + # Read request body + body = b"" + while True: + message = await receive() + body += message.get("body", b"") + if not message.get("more_body", False): + break + + # Convert ASGI scope to HttpProxyEvent + event = HttpProxyEvent.from_asgi(scope, body) + + # Create mock Lambda context + context: Any = MockLambdaContext() + + # Set up resolver state (similar to resolve()) + BaseRouter.current_event = self._to_proxy_event(event._data) + BaseRouter.lambda_context = context + + self._is_async_mode = True + + try: + # Use async resolve + response = await self._resolve_async() + finally: + self._is_async_mode = False + self.clear_context() + + # Send HTTP response + await self._send_response(send, response) + + async def __call__( # type: ignore[override] + self, + scope: dict, + receive: Callable, + send: Callable, + ) -> None: + """ASGI interface - allows running with uvicorn/hypercorn/etc.""" + await self.asgi_handler(scope, receive, send) + + async def _send_response(self, send: Callable, response: dict) -> None: + """Send the response via ASGI.""" + status_code = response.get("statusCode", 200) + headers = response.get("headers", {}) + cookies = response.get("cookies", []) + body = response.get("body", "") + is_base64 = response.get("isBase64Encoded", False) + + # Build headers list for ASGI + header_list: list[tuple[bytes, bytes]] = [] + for key, value in headers.items(): + header_list.append((key.lower().encode(), str(value).encode())) + + # Add Set-Cookie headers + for cookie in cookies: + header_list.append((b"set-cookie", str(cookie).encode())) + + # Send response start + await send( + { + "type": "http.response.start", + "status": status_code, + "headers": header_list, + }, + ) + + # Prepare body + if is_base64: + body_bytes = base64.b64decode(body) + elif isinstance(body, str): + body_bytes = body.encode("utf-8") + else: # pragma: no cover + body_bytes = body + + # Send response body + await send( + { + "type": "http.response.body", + "body": body_bytes, + }, + ) diff --git a/docs/core/event_handler/api_gateway.md b/docs/core/event_handler/api_gateway.md index a79d6b118de..e262613046c 100644 --- a/docs/core/event_handler/api_gateway.md +++ b/docs/core/event_handler/api_gateway.md @@ -55,6 +55,7 @@ Before you decorate your functions to handle a given path and HTTP method(s), yo By default, we will use `APIGatewayRestResolver` throughout the documentation. You can use any of the following: + | Resolver | AWS service | | ------------------------------------------------------- | -------------------------------------- | | **[`APIGatewayRestResolver`](#api-gateway-rest-api)** | Amazon API Gateway REST API | @@ -62,6 +63,8 @@ By default, we will use `APIGatewayRestResolver` throughout the documentation. Y | **[`ALBResolver`](#application-load-balancer)** | Amazon Application Load Balancer (ALB) | | **[`LambdaFunctionUrlResolver`](#lambda-function-url)** | AWS Lambda Function URL | | **[`VPCLatticeResolver`](#vpc-lattice)** | Amazon VPC Lattice | +| **[`HttpResolverLocal`](#http-resolver-local)** | Local development with ASGI servers | + #### Response auto-serialization @@ -191,6 +194,8 @@ When using [VPC Lattice with AWS Lambda](https://docs.aws.amazon.com/lambda/late --8<-- "examples/event_handler_rest/src/getting_started_vpclattice_resolver.json" ``` +--8<-- "docs/includes/_http_resolver_local.md" + ### Dynamic routes You can use `/todos/` to configure dynamic URL paths, where `` will be resolved at runtime. diff --git a/docs/includes/_http_resolver_local.md b/docs/includes/_http_resolver_local.md new file mode 100644 index 00000000000..a9789a54746 --- /dev/null +++ b/docs/includes/_http_resolver_local.md @@ -0,0 +1,40 @@ + +#### Http Resolver (Local Development) + +???+ warning "Local Development Only" + `HttpResolverLocal` is intended for local development and testing only. + The API may change in future releases. **Do not use in production environments.** + +When developing locally, you can use `HttpResolverLocal` to run your API with any ASGI server like [uvicorn](https://www.uvicorn.org/){target="_blank"}. It implements the [ASGI specification](https://asgi.readthedocs.io/){target="_blank"}, is lightweight with no external dependencies, and the same code works on any compute platform, including Lambda. + +If your Lambda is behind [Lambda Web Adapter](https://github.com/awslabs/aws-lambda-web-adapter){target="_blank"} or any other HTTP proxy that speaks the HTTP protocol, it works seamlessly. + +All existing resolver features work out of the box: routing, middleware, validation, OpenAPI/Swagger, CORS, exception handling, and more. + +**Install uvicorn**: + +```bash +pip install uvicorn +``` + +=== "Basic Usage" + + ```python hl_lines="1 3" + --8<-- "examples/event_handler_rest/src/http_resolver_basic.py" + ``` + + Run locally: `uvicorn app:app --reload` + +=== "With Validation & Swagger" + + ```python hl_lines="13-16" + --8<-- "examples/event_handler_rest/src/http_resolver_validation_swagger.py" + ``` + + Access Swagger UI at `http://localhost:8000/swagger` + +=== "Exception Handling" + + ```python hl_lines="12-18 21-27" + --8<-- "examples/event_handler_rest/src/http_resolver_exception_handling.py" + ``` diff --git a/examples/event_handler_rest/src/http_resolver_basic.py b/examples/event_handler_rest/src/http_resolver_basic.py new file mode 100644 index 00000000000..63d291c8e47 --- /dev/null +++ b/examples/event_handler_rest/src/http_resolver_basic.py @@ -0,0 +1,12 @@ +from aws_lambda_powertools.event_handler import HttpResolverLocal + +app = HttpResolverLocal() + + +@app.get("/hello/") +def hello(name: str): + return {"message": f"Hello, {name}!"} + + +# Lambda handler - same code works in Lambda +handler = app diff --git a/examples/event_handler_rest/src/http_resolver_exception_handling.py b/examples/event_handler_rest/src/http_resolver_exception_handling.py new file mode 100644 index 00000000000..787687e6248 --- /dev/null +++ b/examples/event_handler_rest/src/http_resolver_exception_handling.py @@ -0,0 +1,36 @@ +from aws_lambda_powertools.event_handler import HttpResolverLocal, Response + +app = HttpResolverLocal() + + +class NotFoundError(Exception): + def __init__(self, resource: str): + self.resource = resource + + +@app.exception_handler(NotFoundError) +def handle_not_found_error(exc: NotFoundError): + return Response( + status_code=404, + content_type="application/json", + body={"error": "Not Found", "resource": exc.resource}, + ) + + +@app.not_found +def handle_not_found(exc: Exception): + return Response( + status_code=404, + content_type="application/json", + body={"error": "Route not found", "path": app.current_event.path}, + ) + + +@app.get("/users/") +def get_user(user_id: str): + if user_id == "0": + raise NotFoundError(f"User {user_id}") + return {"user_id": user_id} + + +handler = app diff --git a/examples/event_handler_rest/src/http_resolver_validation_swagger.py b/examples/event_handler_rest/src/http_resolver_validation_swagger.py new file mode 100644 index 00000000000..2c694c87bcc --- /dev/null +++ b/examples/event_handler_rest/src/http_resolver_validation_swagger.py @@ -0,0 +1,24 @@ +from pydantic import BaseModel + +from aws_lambda_powertools.event_handler import HttpResolverLocal + + +class User(BaseModel): + name: str + age: int + + +app = HttpResolverLocal(enable_validation=True) + +app.enable_swagger( + title="My API", + version="1.0.0", +) + + +@app.post("/users") +def create_user(user: User) -> dict: + return {"id": "123", "user": user.model_dump()} + + +handler = app diff --git a/tests/functional/event_handler/_pydantic/test_http_resolver_pydantic.py b/tests/functional/event_handler/_pydantic/test_http_resolver_pydantic.py new file mode 100644 index 00000000000..3e2806d3715 --- /dev/null +++ b/tests/functional/event_handler/_pydantic/test_http_resolver_pydantic.py @@ -0,0 +1,286 @@ +"""Tests for HttpResolverLocal with Pydantic validation.""" + +from __future__ import annotations + +import asyncio +import json +from typing import Annotated, Any + +import pytest +from pydantic import BaseModel, Field + +from aws_lambda_powertools.event_handler import HttpResolverLocal +from aws_lambda_powertools.event_handler.http_resolver import MockLambdaContext +from aws_lambda_powertools.event_handler.openapi.params import Query + +# Suppress warning for all tests +pytestmark = pytest.mark.filterwarnings("ignore:HttpResolverLocal is intended for local development") + + +# ============================================================================= +# ASGI Test Helpers +# ============================================================================= + + +def make_asgi_receive(body: bytes = b""): + """Create an ASGI receive callable.""" + + async def receive() -> dict[str, Any]: + await asyncio.sleep(0) + return {"type": "http.request", "body": body, "more_body": False} + + return receive + + +def make_asgi_send(): + """Create an ASGI send callable that captures response.""" + captured: dict[str, Any] = {"status_code": None, "body": b""} + + async def send(message: dict[str, Any]) -> None: + await asyncio.sleep(0) + if message["type"] == "http.response.start": + captured["status_code"] = message["status"] + elif message["type"] == "http.response.body": + captured["body"] = message["body"] + + return send, captured + + +class UserModel(BaseModel): + name: str = Field(min_length=1, max_length=100) + age: int = Field(ge=0, le=150) + email: str | None = None + + +class UserResponse(BaseModel): + id: str + user: UserModel + created: bool = True + + +# ============================================================================= +# Body Validation Tests +# ============================================================================= + + +def test_valid_body_validation(): + # GIVEN an app with validation enabled and a route expecting UserModel + app = HttpResolverLocal(enable_validation=True) + + @app.post("/users") + def create_user(user: UserModel) -> UserResponse: + return UserResponse(id="user-123", user=user) + + event = { + "httpMethod": "POST", + "path": "/users", + "headers": {"content-type": "application/json"}, + "queryStringParameters": {}, + "multiValueQueryStringParameters": {}, + "body": '{"name": "John", "age": 30}', + } + + # WHEN sending a valid body + result = app.resolve(event, MockLambdaContext()) + + # THEN it returns 200 with validated data + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert body["id"] == "user-123" + assert body["user"]["name"] == "John" + + +def test_invalid_body_validation(): + # GIVEN an app with validation enabled + app = HttpResolverLocal(enable_validation=True) + + @app.post("/users") + def create_user(user: UserModel) -> UserResponse: + return UserResponse(id="user-123", user=user) + + event = { + "httpMethod": "POST", + "path": "/users", + "headers": {"content-type": "application/json"}, + "queryStringParameters": {}, + "multiValueQueryStringParameters": {}, + "body": '{"name": "", "age": 30}', # Empty name - invalid + } + + # WHEN sending an invalid body + result = app.resolve(event, MockLambdaContext()) + + # THEN it returns 422 with validation error + assert result["statusCode"] == 422 + body = json.loads(result["body"]) + assert "detail" in body + + +def test_missing_required_field(): + # GIVEN an app with validation enabled + app = HttpResolverLocal(enable_validation=True) + + @app.post("/users") + def create_user(user: UserModel) -> UserResponse: + return UserResponse(id="user-123", user=user) + + event = { + "httpMethod": "POST", + "path": "/users", + "headers": {"content-type": "application/json"}, + "queryStringParameters": {}, + "multiValueQueryStringParameters": {}, + "body": '{"age": 30}', # Missing name + } + + # WHEN sending body with missing required field + result = app.resolve(event, MockLambdaContext()) + + # THEN it returns 422 + assert result["statusCode"] == 422 + + +# ============================================================================= +# Query Parameter Validation Tests +# ============================================================================= + + +def test_query_param_validation(): + # GIVEN an app with validated query parameters + app = HttpResolverLocal(enable_validation=True) + + @app.get("/search") + def search( + q: Annotated[str, Query(description="Search query")], + page: Annotated[int, Query(ge=1)] = 1, + limit: Annotated[int, Query(ge=1, le=100)] = 10, + ) -> dict: + return {"query": q, "page": page, "limit": limit} + + event = { + "httpMethod": "GET", + "path": "/search", + "headers": {}, + "queryStringParameters": {"q": "python", "page": "2", "limit": "50"}, + "multiValueQueryStringParameters": {"q": ["python"], "page": ["2"], "limit": ["50"]}, + "body": None, + } + + # WHEN sending valid query params + result = app.resolve(event, MockLambdaContext()) + + # THEN it returns 200 with parsed values + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert body["query"] == "python" + assert body["page"] == 2 + assert body["limit"] == 50 + + +def test_invalid_query_param(): + # GIVEN an app with validated query parameters + app = HttpResolverLocal(enable_validation=True) + + @app.get("/search") + def search( + q: Annotated[str, Query()], + limit: Annotated[int, Query(ge=1, le=100)] = 10, + ) -> dict: + return {"query": q, "limit": limit} + + event = { + "httpMethod": "GET", + "path": "/search", + "headers": {}, + "queryStringParameters": {"q": "test", "limit": "200"}, # limit > 100 + "multiValueQueryStringParameters": {"q": ["test"], "limit": ["200"]}, + "body": None, + } + + # WHEN sending invalid query param + result = app.resolve(event, MockLambdaContext()) + + # THEN it returns 422 + assert result["statusCode"] == 422 + + +# ============================================================================= +# Async Handler with Validation Tests +# ============================================================================= + + +@pytest.mark.asyncio +async def test_async_handler_with_validation(): + # GIVEN an app with async handler and validation + app = HttpResolverLocal(enable_validation=True) + + @app.post("/users") + async def create_user(user: UserModel) -> UserResponse: + await asyncio.sleep(0.001) + return UserResponse(id="async-123", user=user) + + scope = { + "type": "http", + "method": "POST", + "path": "/users", + "query_string": b"", + "headers": [(b"content-type", b"application/json")], + } + + receive = make_asgi_receive(b'{"name": "AsyncUser", "age": 25}') + send, captured = make_asgi_send() + + # WHEN called via ASGI interface + await app(scope, receive, send) + + # THEN validation works with async handler + assert captured["status_code"] == 200 + body = json.loads(captured["body"]) + assert body["id"] == "async-123" + assert body["user"]["name"] == "AsyncUser" + + +# ============================================================================= +# OpenAPI Tests +# ============================================================================= + + +def test_openapi_schema_generation(): + # GIVEN an app with validation and multiple routes + app = HttpResolverLocal(enable_validation=True) + + @app.get("/users/") + def get_user(user_id: str) -> dict: + return {"user_id": user_id} + + @app.post("/users") + def create_user(user: UserModel) -> UserResponse: + return UserResponse(id="123", user=user) + + # WHEN generating OpenAPI schema + schema = app.get_openapi_schema( + title="Test API", + version="1.0.0", + ) + + # THEN schema contains all routes + assert schema.info.title == "Test API" + assert schema.info.version == "1.0.0" + assert "/users/{user_id}" in schema.paths + assert "/users" in schema.paths + + +def test_openapi_schema_includes_validation_errors(): + # GIVEN an app with validation + app = HttpResolverLocal(enable_validation=True) + + @app.post("/users") + def create_user(user: UserModel) -> UserResponse: + return UserResponse(id="123", user=user) + + # WHEN generating OpenAPI schema + schema = app.get_openapi_schema(title="Test API", version="1.0.0") + + # THEN schema includes 422 response + post_operation = schema.paths["/users"].post + assert 422 in post_operation.responses diff --git a/tests/functional/event_handler/required_dependencies/test_http_resolver.py b/tests/functional/event_handler/required_dependencies/test_http_resolver.py new file mode 100644 index 00000000000..40fb3d20c64 --- /dev/null +++ b/tests/functional/event_handler/required_dependencies/test_http_resolver.py @@ -0,0 +1,1244 @@ +"""Tests for HttpResolverLocal - ASGI-compatible HTTP resolver for local development.""" + +from __future__ import annotations + +import asyncio +import json +from typing import Any + +import pytest + +from aws_lambda_powertools.event_handler import HttpResolverLocal, Response +from aws_lambda_powertools.event_handler.http_resolver import MockLambdaContext + +# Suppress warning for all tests +pytestmark = pytest.mark.filterwarnings("ignore:HttpResolverLocal is intended for local development") + + +# ============================================================================= +# ASGI Test Helpers +# ============================================================================= + + +def make_asgi_receive(body: bytes = b""): + """Create an ASGI receive callable.""" + + async def receive() -> dict[str, Any]: + await asyncio.sleep(0) # Yield control to satisfy async requirement + return {"type": "http.request", "body": body, "more_body": False} + + return receive + + +def make_asgi_send(): + """Create an ASGI send callable that captures response.""" + captured: dict[str, Any] = {"status_code": None, "body": b""} + + async def send(message: dict[str, Any]) -> None: + await asyncio.sleep(0) # Yield control to satisfy async requirement + if message["type"] == "http.response.start": + captured["status_code"] = message["status"] + elif message["type"] == "http.response.body": + captured["body"] = message["body"] + + return send, captured + + +# ============================================================================= +# Basic Routing Tests +# ============================================================================= + + +def test_simple_get_route(): + # GIVEN a simple GET route + app = HttpResolverLocal() + + @app.get("/hello") + def hello(): + return {"message": "Hello, World!"} + + event = { + "httpMethod": "GET", + "path": "/hello", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + + # WHEN the route is resolved + result = app.resolve(event, MockLambdaContext()) + + # THEN it returns 200 with the expected body + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert body["message"] == "Hello, World!" + + +def test_path_parameters(): + # GIVEN a route with path parameters + app = HttpResolverLocal() + + @app.get("/users/") + def get_user(user_id: str): + return {"user_id": user_id} + + event = { + "httpMethod": "GET", + "path": "/users/123", + "headers": {}, + "queryStringParameters": {}, + "pathParameters": {"user_id": "123"}, + "body": None, + } + + # WHEN the route is resolved + result = app.resolve(event, MockLambdaContext()) + + # THEN it extracts the path parameter correctly + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert body["user_id"] == "123" + + +def test_post_with_body(): + # GIVEN a POST route that reads the body + app = HttpResolverLocal() + + @app.post("/users") + def create_user(): + body = app.current_event.json_body + return {"created": True, "name": body["name"]} + + event = { + "httpMethod": "POST", + "path": "/users", + "headers": {"content-type": "application/json"}, + "queryStringParameters": {}, + "body": '{"name": "John"}', + } + + # WHEN the route is resolved + result = app.resolve(event, MockLambdaContext()) + + # THEN it parses the JSON body correctly + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert body["created"] is True + assert body["name"] == "John" + + +def test_query_parameters(): + # GIVEN a route that reads query parameters + app = HttpResolverLocal() + + @app.get("/search") + def search(): + q = app.current_event.get_query_string_value("q", "") + page = app.current_event.get_query_string_value("page", "1") + return {"query": q, "page": page} + + event = { + "httpMethod": "GET", + "path": "/search", + "headers": {}, + "queryStringParameters": {"q": "python", "page": "2"}, + "multiValueQueryStringParameters": {"q": ["python"], "page": ["2"]}, + "body": None, + } + + # WHEN the route is resolved + result = app.resolve(event, MockLambdaContext()) + + # THEN it extracts query parameters correctly + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert body["query"] == "python" + assert body["page"] == "2" + + +def test_custom_response(): + # GIVEN a route that returns a custom Response + app = HttpResolverLocal() + + @app.get("/custom") + def custom(): + return Response( + status_code=201, + content_type="application/json", + body={"status": "created"}, + headers={"X-Custom-Header": "value"}, + ) + + event = { + "httpMethod": "GET", + "path": "/custom", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + + # WHEN the route is resolved + result = app.resolve(event, MockLambdaContext()) + + # THEN it returns the custom status code and headers + assert result["statusCode"] == 201 + assert result["headers"]["X-Custom-Header"] == "value" + + +def test_not_found(): + # GIVEN an app with a defined route + app = HttpResolverLocal() + + @app.get("/exists") + def exists(): + return {"exists": True} + + event = { + "httpMethod": "GET", + "path": "/does-not-exist", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + + # WHEN requesting an unknown route + result = app.resolve(event, MockLambdaContext()) + + # THEN it returns 404 + assert result["statusCode"] == 404 + + +def test_custom_not_found_handler(): + # GIVEN an app with a custom not_found handler + app = HttpResolverLocal() + + @app.not_found + def custom_not_found(exc: Exception): + return Response( + status_code=404, + content_type="application/json", + body={"error": "Custom Not Found", "path": app.current_event.path}, + ) + + @app.get("/exists") + def exists(): + return {"exists": True} + + event = { + "httpMethod": "GET", + "path": "/unknown-route", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + + # WHEN requesting an unknown route + result = app.resolve(event, MockLambdaContext()) + + # THEN it calls the custom handler + assert result["statusCode"] == 404 + body = json.loads(result["body"]) + assert body["error"] == "Custom Not Found" + assert body["path"] == "/unknown-route" + + +# ============================================================================= +# Middleware Tests +# ============================================================================= + + +def test_middleware_execution(): + # GIVEN an app with middleware + app = HttpResolverLocal() + middleware_called = [] + + def test_middleware(app, next_middleware): + middleware_called.append("before") + response = next_middleware(app) + middleware_called.append("after") + return response + + app.use([test_middleware]) + + @app.get("/test") + def test_route(): + middleware_called.append("handler") + return {"ok": True} + + event = { + "httpMethod": "GET", + "path": "/test", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + + # WHEN the route is resolved + result = app.resolve(event, MockLambdaContext()) + + # THEN middleware executes in correct order + assert result["statusCode"] == 200 + assert middleware_called == ["before", "handler", "after"] + + +def test_middleware_can_short_circuit(): + # GIVEN an app with auth middleware + app = HttpResolverLocal() + + def auth_middleware(app, next_middleware): + auth_header = app.current_event.headers.get("authorization") + if not auth_header: + return Response(status_code=401, body={"error": "Unauthorized"}) + return next_middleware(app) + + app.use([auth_middleware]) + + @app.get("/protected") + def protected(): + return {"secret": "data"} + + # WHEN requesting without auth header + event = { + "httpMethod": "GET", + "path": "/protected", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + result = app.resolve(event, MockLambdaContext()) + + # THEN it returns 401 + assert result["statusCode"] == 401 + + # WHEN requesting with auth header + event["headers"] = {"authorization": "Bearer token"} + result = app.resolve(event, MockLambdaContext()) + + # THEN it returns 200 + assert result["statusCode"] == 200 + + +def test_multiple_middlewares(): + # GIVEN an app with multiple middlewares + app = HttpResolverLocal() + order = [] + + def middleware_1(app, next_middleware): + order.append("m1_before") + response = next_middleware(app) + order.append("m1_after") + return response + + def middleware_2(app, next_middleware): + order.append("m2_before") + response = next_middleware(app) + order.append("m2_after") + return response + + app.use([middleware_1, middleware_2]) + + @app.get("/test") + def test_route(): + order.append("handler") + return {"ok": True} + + event = { + "httpMethod": "GET", + "path": "/test", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + + # WHEN the route is resolved + app.resolve(event, MockLambdaContext()) + + # THEN middlewares execute in correct order (onion model) + assert order == ["m1_before", "m2_before", "handler", "m2_after", "m1_after"] + + +def test_route_specific_middleware(): + # GIVEN an app with route-specific middleware + app = HttpResolverLocal() + route_middleware_called = [] + + def route_middleware(app, next_middleware): + route_middleware_called.append("route_middleware") + return next_middleware(app) + + @app.get("/with-middleware", middlewares=[route_middleware]) + def with_middleware(): + return {"has_middleware": True} + + @app.get("/without-middleware") + def without_middleware(): + return {"has_middleware": False} + + # WHEN requesting route WITH middleware + event_with = { + "httpMethod": "GET", + "path": "/with-middleware", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + result = app.resolve(event_with, MockLambdaContext()) + + # THEN middleware is called + assert result["statusCode"] == 200 + assert route_middleware_called == ["route_middleware"] + + # WHEN requesting route WITHOUT middleware + route_middleware_called.clear() + event_without = { + "httpMethod": "GET", + "path": "/without-middleware", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + result = app.resolve(event_without, MockLambdaContext()) + + # THEN middleware is NOT called + assert result["statusCode"] == 200 + assert route_middleware_called == [] + + +def test_route_middleware_with_global_middleware(): + # GIVEN an app with both global and route-specific middleware + app = HttpResolverLocal() + order = [] + + def global_middleware(app, next_middleware): + order.append("global_before") + response = next_middleware(app) + order.append("global_after") + return response + + def route_middleware(app, next_middleware): + order.append("route_before") + response = next_middleware(app) + order.append("route_after") + return response + + app.use([global_middleware]) + + @app.get("/test", middlewares=[route_middleware]) + def test_route(): + order.append("handler") + return {"ok": True} + + event = { + "httpMethod": "GET", + "path": "/test", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + + # WHEN the route is resolved + app.resolve(event, MockLambdaContext()) + + # THEN global middleware runs first, then route middleware + assert order == ["global_before", "route_before", "handler", "route_after", "global_after"] + + +def test_route_middleware_can_modify_response(): + # GIVEN an app with middleware that modifies response + app = HttpResolverLocal() + + def add_header_middleware(app, next_middleware): + response = next_middleware(app) + response.headers["X-Custom-Header"] = "added-by-middleware" + return response + + @app.get("/test", middlewares=[add_header_middleware]) + def test_route(): + return {"ok": True} + + event = { + "httpMethod": "GET", + "path": "/test", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + + # WHEN the route is resolved + result = app.resolve(event, MockLambdaContext()) + + # THEN the response has the added header + assert result["statusCode"] == 200 + assert result["headers"]["X-Custom-Header"] == "added-by-middleware" + + +# ============================================================================= +# ASGI Tests +# ============================================================================= + + +@pytest.mark.asyncio +async def test_asgi_get_request(): + # GIVEN an app with a GET route + app = HttpResolverLocal() + + @app.get("/hello/") + def hello(name: str): + return {"message": f"Hello, {name}!"} + + scope = { + "type": "http", + "method": "GET", + "path": "/hello/World", + "query_string": b"", + "headers": [], + } + + receive = make_asgi_receive() + send, captured = make_asgi_send() + + # WHEN called via ASGI interface + await app(scope, receive, send) + + # THEN it returns the expected response + assert captured["status_code"] == 200 + body = json.loads(captured["body"]) + assert body["message"] == "Hello, World!" + + +@pytest.mark.asyncio +async def test_asgi_custom_not_found(): + # GIVEN an app with custom not_found handler + app = HttpResolverLocal() + + @app.not_found + def custom_not_found(exc: Exception): + return Response( + status_code=404, + content_type="application/json", + body={"error": "Custom 404", "path": app.current_event.path}, + ) + + @app.get("/exists") + def exists(): + return {"exists": True} + + scope = { + "type": "http", + "method": "GET", + "path": "/unknown-asgi-route", + "query_string": b"", + "headers": [], + } + + receive = make_asgi_receive() + send, captured = make_asgi_send() + + # WHEN requesting unknown route via ASGI + await app(scope, receive, send) + + # THEN custom handler is called + assert captured["status_code"] == 404 + body = json.loads(captured["body"]) + assert body["error"] == "Custom 404" + assert body["path"] == "/unknown-asgi-route" + + +@pytest.mark.asyncio +async def test_asgi_post_request(): + # GIVEN an app with a POST route + app = HttpResolverLocal() + + @app.post("/users") + def create_user(): + body = app.current_event.json_body + return {"created": True, "name": body["name"]} + + scope = { + "type": "http", + "method": "POST", + "path": "/users", + "query_string": b"", + "headers": [(b"content-type", b"application/json")], + } + + receive = make_asgi_receive(b'{"name": "John"}') + send, captured = make_asgi_send() + + # WHEN called via ASGI interface + await app(scope, receive, send) + + # THEN it parses the body correctly + assert captured["status_code"] == 200 + body = json.loads(captured["body"]) + assert body["created"] is True + assert body["name"] == "John" + + +@pytest.mark.asyncio +async def test_asgi_query_params(): + # GIVEN an app with a route that reads query params + app = HttpResolverLocal() + + @app.get("/search") + def search(): + q = app.current_event.get_query_string_value("q", "") + return {"query": q} + + scope = { + "type": "http", + "method": "GET", + "path": "/search", + "query_string": b"q=python", + "headers": [], + } + + receive = make_asgi_receive() + send, captured = make_asgi_send() + + # WHEN called via ASGI interface + await app(scope, receive, send) + + # THEN it extracts query params correctly + body = json.loads(captured["body"]) + assert body["query"] == "python" + + +# ============================================================================= +# Async Handler Tests +# ============================================================================= + + +@pytest.mark.asyncio +async def test_async_handler(): + # GIVEN an app with an async handler + app = HttpResolverLocal() + + @app.get("/async") + async def async_handler(): + await asyncio.sleep(0.001) + return {"async": True} + + scope = { + "type": "http", + "method": "GET", + "path": "/async", + "query_string": b"", + "headers": [], + } + + receive = make_asgi_receive() + send, captured = make_asgi_send() + + # WHEN called via ASGI interface + await app(scope, receive, send) + + # THEN async handler executes correctly + assert captured["status_code"] == 200 + body = json.loads(captured["body"]) + assert body["async"] is True + + +@pytest.mark.asyncio +async def test_async_handler_with_path_params(): + # GIVEN an app with async handler and path params + app = HttpResolverLocal() + + @app.get("/users/") + async def get_user(user_id: str): + await asyncio.sleep(0.001) + return {"user_id": user_id, "async": True} + + scope = { + "type": "http", + "method": "GET", + "path": "/users/456", + "query_string": b"", + "headers": [], + } + + receive = make_asgi_receive() + send, captured = make_asgi_send() + + # WHEN called via ASGI interface + await app(scope, receive, send) + + # THEN path params are extracted correctly + body = json.loads(captured["body"]) + assert body["user_id"] == "456" + assert body["async"] is True + + +@pytest.mark.asyncio +async def test_sync_handler_in_async_context(): + # GIVEN an app with a sync handler + app = HttpResolverLocal() + + @app.get("/sync") + def sync_handler(): + return {"sync": True} + + scope = { + "type": "http", + "method": "GET", + "path": "/sync", + "query_string": b"", + "headers": [], + } + + receive = make_asgi_receive() + send, captured = make_asgi_send() + + # WHEN called via ASGI interface + await app(scope, receive, send) + + # THEN sync handler works in async context + body = json.loads(captured["body"]) + assert body["sync"] is True + + +@pytest.mark.asyncio +async def test_mixed_sync_async_handlers(): + # GIVEN an app with both sync and async handlers + app = HttpResolverLocal() + + @app.get("/sync") + def sync_handler(): + return {"type": "sync"} + + @app.get("/async") + async def async_handler(): + await asyncio.sleep(0.001) + return {"type": "async"} + + receive = make_asgi_receive() + + # WHEN calling sync handler + send_sync, captured_sync = make_asgi_send() + await app( + {"type": "http", "method": "GET", "path": "/sync", "query_string": b"", "headers": []}, + receive, + send_sync, + ) + + # WHEN calling async handler + send_async, captured_async = make_asgi_send() + await app( + {"type": "http", "method": "GET", "path": "/async", "query_string": b"", "headers": []}, + receive, + send_async, + ) + + # THEN both work correctly + assert json.loads(captured_sync["body"])["type"] == "sync" + assert json.loads(captured_async["body"])["type"] == "async" + + +# ============================================================================= +# Exception Handler Tests +# ============================================================================= + + +def test_exception_handler(): + # GIVEN an app with a custom exception handler + app = HttpResolverLocal() + + class CustomError(Exception): + pass + + @app.exception_handler(CustomError) + def handle_custom_error(exc: CustomError): + return Response( + status_code=400, + content_type="application/json", + body={"error": "Custom error handled"}, + ) + + @app.get("/error") + def raise_error(): + raise CustomError("Something went wrong") + + event = { + "httpMethod": "GET", + "path": "/error", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + + # WHEN the route raises the exception + result = app.resolve(event, MockLambdaContext()) + + # THEN the custom handler catches it + assert result["statusCode"] == 400 + body = json.loads(result["body"]) + assert body["error"] == "Custom error handled" + + +@pytest.mark.asyncio +async def test_async_exception_handler(): + # GIVEN an app with exception handler and async route + app = HttpResolverLocal() + + class CustomError(Exception): + pass + + @app.exception_handler(CustomError) + def handle_custom_error(exc: CustomError): + return Response( + status_code=400, + content_type="application/json", + body={"error": "Async error handled"}, + ) + + @app.get("/error") + async def raise_error(): + await asyncio.sleep(0.001) + raise CustomError("Async error") + + scope = { + "type": "http", + "method": "GET", + "path": "/error", + "query_string": b"", + "headers": [], + } + + receive = make_asgi_receive() + send, captured = make_asgi_send() + + # WHEN the async route raises the exception + await app(scope, receive, send) + + # THEN the exception handler catches it + assert captured["status_code"] == 400 + body = json.loads(captured["body"]) + assert body["error"] == "Async error handled" + + +# ============================================================================= +# ASGI Lifespan Tests +# ============================================================================= + + +@pytest.mark.asyncio +async def test_asgi_lifespan_startup_shutdown(): + # GIVEN an app + app = HttpResolverLocal() + + @app.get("/hello") + def hello(): + return {"message": "Hello"} + + scope = {"type": "lifespan"} + messages_received: list[str] = [] + messages_sent: list[str] = [] + + async def receive() -> dict[str, Any]: + await asyncio.sleep(0) + if not messages_received: + messages_received.append("startup") + return {"type": "lifespan.startup"} + else: + messages_received.append("shutdown") + return {"type": "lifespan.shutdown"} + + async def send(message: dict[str, Any]) -> None: + await asyncio.sleep(0) + messages_sent.append(message["type"]) + + # WHEN handling lifespan events + await app(scope, receive, send) + + # THEN startup and shutdown are handled + assert "lifespan.startup.complete" in messages_sent + assert "lifespan.shutdown.complete" in messages_sent + + +@pytest.mark.asyncio +async def test_asgi_ignores_non_http_scope(): + # GIVEN an app + app = HttpResolverLocal() + + @app.get("/hello") + def hello(): + return {"message": "Hello"} + + scope = {"type": "websocket"} # Not HTTP + send_called = False + + async def receive() -> dict[str, Any]: + await asyncio.sleep(0) + return {"type": "websocket.connect"} + + async def send(message: dict[str, Any]) -> None: + nonlocal send_called + await asyncio.sleep(0) + send_called = True + + # WHEN handling non-HTTP scope + await app(scope, receive, send) + + # THEN nothing is sent (early return) + assert send_called is False + + +@pytest.mark.asyncio +async def test_asgi_binary_response(): + # GIVEN an app that returns binary data (bytes body is auto base64 encoded) + app = HttpResolverLocal() + binary_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00" # PNG header bytes + + @app.get("/image") + def get_image(): + # When body is bytes, Response auto base64 encodes it + return Response( + status_code=200, + content_type="image/png", + body=binary_data, + ) + + scope = { + "type": "http", + "method": "GET", + "path": "/image", + "query_string": b"", + "headers": [], + } + + receive = make_asgi_receive() + send, captured = make_asgi_send() + + # WHEN called via ASGI interface + await app(scope, receive, send) + + # THEN it decodes base64 and returns binary data + assert captured["status_code"] == 200 + assert captured["body"] == binary_data + + +@pytest.mark.asyncio +async def test_asgi_duplicate_headers(): + # GIVEN an ASGI request with duplicate headers + app = HttpResolverLocal() + + @app.get("/headers") + def get_headers(): + # Return the accept header which has duplicates + accept = app.current_event.headers.get("accept", "") + return {"accept": accept} + + scope = { + "type": "http", + "method": "GET", + "path": "/headers", + "query_string": b"", + "headers": [ + (b"accept", b"text/html"), + (b"accept", b"application/json"), # Duplicate header + ], + } + + receive = make_asgi_receive() + send, captured = make_asgi_send() + + # WHEN called via ASGI interface + await app(scope, receive, send) + + # THEN duplicate headers are joined with comma + assert captured["status_code"] == 200 + body = json.loads(captured["body"]) + assert body["accept"] == "text/html, application/json" + + +@pytest.mark.asyncio +async def test_asgi_with_cookies(): + # GIVEN an app that sets cookies + from aws_lambda_powertools.shared.cookies import Cookie + + app = HttpResolverLocal() + + @app.get("/set-cookie") + def set_cookie(): + cookie = Cookie(name="session", value="abc123") + return Response( + status_code=200, + content_type="application/json", + body={"message": "Cookie set"}, + cookies=[cookie], + ) + + scope = { + "type": "http", + "method": "GET", + "path": "/set-cookie", + "query_string": b"", + "headers": [], + } + + receive = make_asgi_receive() + captured_headers: list[tuple[bytes, bytes]] = [] + + async def send(message: dict[str, Any]) -> None: + await asyncio.sleep(0) + if message["type"] == "http.response.start": + captured_headers.extend(message.get("headers", [])) + + # WHEN called via ASGI interface + await app(scope, receive, send) + + # THEN Set-Cookie header is present + cookie_headers = [h for h in captured_headers if h[0] == b"set-cookie"] + assert len(cookie_headers) == 1 + assert b"session=abc123" in cookie_headers[0][1] + + +@pytest.mark.asyncio +async def test_async_middleware(): + # GIVEN an app with async middleware + app = HttpResolverLocal() + order: list[str] = [] + + async def async_middleware(app, next_middleware): + order.append("async_before") + await asyncio.sleep(0.001) + response = await next_middleware(app) + order.append("async_after") + return response + + app.use([async_middleware]) + + @app.get("/test") + async def test_route(): + order.append("handler") + return {"ok": True} + + scope = { + "type": "http", + "method": "GET", + "path": "/test", + "query_string": b"", + "headers": [], + } + + receive = make_asgi_receive() + send, captured = make_asgi_send() + + # WHEN called via ASGI interface + await app(scope, receive, send) + + # THEN async middleware executes correctly + assert captured["status_code"] == 200 + assert order == ["async_before", "handler", "async_after"] + + +def test_unhandled_exception_raises(): + # GIVEN an app without exception handler for ValueError + app = HttpResolverLocal() + + @app.get("/error") + def raise_error(): + raise ValueError("Unhandled error") + + event = { + "httpMethod": "GET", + "path": "/error", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + + # WHEN the route raises an unhandled exception + # THEN it propagates up + with pytest.raises(ValueError, match="Unhandled error"): + app.resolve(event, MockLambdaContext()) + + +def test_default_not_found_without_custom_handler(): + # GIVEN an app WITHOUT custom not_found handler + app = HttpResolverLocal() + + @app.get("/exists") + def exists(): + return {"exists": True} + + event = { + "httpMethod": "GET", + "path": "/unknown", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + + # WHEN requesting unknown route + result = app.resolve(event, MockLambdaContext()) + + # THEN default 404 response is returned + assert result["statusCode"] == 404 + body = json.loads(result["body"]) + assert body["message"] == "Not found" + + +def test_method_not_matching_continues_search(): + # GIVEN an app with routes for different methods on same path + app = HttpResolverLocal() + + @app.get("/resource") + def get_resource(): + return {"method": "GET"} + + @app.post("/resource") + def post_resource(): + return {"method": "POST"} + + # WHEN requesting with POST + event = { + "httpMethod": "POST", + "path": "/resource", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + result = app.resolve(event, MockLambdaContext()) + + # THEN it finds the POST handler (skipping GET) + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert body["method"] == "POST" + + +def test_list_headers_serialization(): + # GIVEN an app that returns list headers + app = HttpResolverLocal() + + @app.get("/multi-header") + def multi_header(): + return Response( + status_code=200, + content_type="application/json", + body={"ok": True}, + headers={"X-Custom": ["value1", "value2"]}, + ) + + event = { + "httpMethod": "GET", + "path": "/multi-header", + "headers": {}, + "queryStringParameters": {}, + "body": None, + } + + # WHEN the route is resolved + result = app.resolve(event, MockLambdaContext()) + + # THEN list headers are joined with comma + assert result["statusCode"] == 200 + assert result["headers"]["X-Custom"] == "value1, value2" + + +def test_string_body_in_event(): + # GIVEN an event with string body (not bytes) + app = HttpResolverLocal() + + @app.post("/echo") + def echo(): + return {"body": app.current_event.body} + + # Body is already a string, not bytes + event = { + "httpMethod": "POST", + "path": "/echo", + "headers": {"content-type": "text/plain"}, + "queryStringParameters": {}, + "body": "plain text body", + } + + # WHEN the route is resolved + result = app.resolve(event, MockLambdaContext()) + + # THEN string body is handled correctly + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert body["body"] == "plain text body" + + +@pytest.mark.asyncio +async def test_asgi_default_not_found(): + # GIVEN an app WITHOUT custom not_found handler + app = HttpResolverLocal() + + @app.get("/exists") + def exists(): + return {"exists": True} + + scope = { + "type": "http", + "method": "GET", + "path": "/unknown-route", + "query_string": b"", + "headers": [], + } + + receive = make_asgi_receive() + send, captured = make_asgi_send() + + # WHEN requesting unknown route via ASGI + await app(scope, receive, send) + + # THEN default 404 is returned + assert captured["status_code"] == 404 + body = json.loads(captured["body"]) + assert body["message"] == "Not found" + + +@pytest.mark.asyncio +async def test_asgi_unhandled_exception_raises(): + # GIVEN an app without exception handler for ValueError + app = HttpResolverLocal() + + @app.get("/error") + async def raise_error(): + raise ValueError("Async unhandled error") + + scope = { + "type": "http", + "method": "GET", + "path": "/error", + "query_string": b"", + "headers": [], + } + + receive = make_asgi_receive() + send, _ = make_asgi_send() + + # WHEN the route raises an unhandled exception + # THEN it propagates up + with pytest.raises(ValueError, match="Async unhandled error"): + await app(scope, receive, send) + + +@pytest.mark.asyncio +async def test_asgi_wrong_method_returns_not_found(): + # GIVEN an app with only a GET route + app = HttpResolverLocal() + + @app.get("/hello") + def hello(): + return {"message": "Hello"} + + # WHEN calling with POST method (route exists but method doesn't match) + scope = { + "type": "http", + "method": "POST", + "path": "/hello", + "query_string": b"", + "headers": [], + } + + receive = make_asgi_receive() + send, captured = make_asgi_send() + + await app(scope, receive, send) + + # THEN it returns 404 (method mismatch is treated as not found) + assert captured["status_code"] == 404