Skip to content

feat(event_handler): add File parameter support for multipart/form-data uploads in OpenAPI utility #7132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dataclasses
import json
import logging
import re
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Callable, Mapping, MutableMapping, Sequence
from urllib.parse import parse_qs
Expand Down Expand Up @@ -35,6 +36,7 @@
CONTENT_DISPOSITION_NAME_PARAM = "name="
APPLICATION_JSON_CONTENT_TYPE = "application/json"
APPLICATION_FORM_CONTENT_TYPE = "application/x-www-form-urlencoded"
MULTIPART_FORM_CONTENT_TYPE = "multipart/form-data"


class OpenAPIRequestValidationMiddleware(BaseMiddlewareHandler):
Expand Down Expand Up @@ -125,8 +127,12 @@ def _get_body(self, app: EventHandlerInstance) -> dict[str, Any]:
elif content_type.startswith(APPLICATION_FORM_CONTENT_TYPE):
return self._parse_form_data(app)

# Handle multipart form data
elif content_type.startswith(MULTIPART_FORM_CONTENT_TYPE):
return self._parse_multipart_data(app, content_type)

else:
raise NotImplementedError("Only JSON body or Form() are supported")
raise NotImplementedError(f"Content type '{content_type}' is not supported")

def _parse_json_data(self, app: EventHandlerInstance) -> dict[str, Any]:
"""Parse JSON data from the request body."""
Expand Down Expand Up @@ -169,6 +175,120 @@ def _parse_form_data(self, app: EventHandlerInstance) -> dict[str, Any]:
],
) from e

def _parse_multipart_data(self, app: EventHandlerInstance, content_type: str) -> dict[str, Any]:
"""Parse multipart/form-data."""
try:
decoded_bytes = self._decode_request_body(app)
boundary_bytes = self._extract_boundary_bytes(content_type)
return self._parse_multipart_sections(decoded_bytes, boundary_bytes)

except Exception as e:
raise RequestValidationError(
[
{
"type": "multipart_invalid",
"loc": ("body",),
"msg": "Invalid multipart form data",
"input": {},
"ctx": {"error": str(e)},
},
],
) from e

def _decode_request_body(self, app: EventHandlerInstance) -> bytes:
"""Decode the request body, handling base64 encoding if necessary."""
import base64

body = app.current_event.body or ""

if app.current_event.is_base64_encoded:
try:
return base64.b64decode(body)
except Exception:
# If decoding fails, use body as-is
return body.encode("utf-8") if isinstance(body, str) else body
else:
return body.encode("utf-8") if isinstance(body, str) else body

def _extract_boundary_bytes(self, content_type: str) -> bytes:
"""Extract and return the boundary bytes from the content type header."""
boundary_match = re.search(r"boundary=([^;,\s]+)", content_type)

if not boundary_match:
# Handle WebKit browsers that may use different boundary formats
webkit_match = re.search(r"WebKitFormBoundary([a-zA-Z0-9]+)", content_type)
if webkit_match:
boundary = "WebKitFormBoundary" + webkit_match.group(1)
else:
raise ValueError("No boundary found in multipart content-type")
else:
boundary = boundary_match.group(1).strip('"')

return ("--" + boundary).encode("utf-8")

def _parse_multipart_sections(self, decoded_bytes: bytes, boundary_bytes: bytes) -> dict[str, Any]:
"""Parse individual multipart sections from the decoded body."""
parsed_data: dict[str, Any] = {}

if not decoded_bytes:
return parsed_data

sections = decoded_bytes.split(boundary_bytes)

for section in sections[1:-1]: # Skip first empty and last closing parts
if not section.strip():
continue

field_name, content = self._parse_multipart_section(section)
if field_name:
parsed_data[field_name] = content

return parsed_data

def _parse_multipart_section(self, section: bytes) -> tuple[str | None, bytes | str]:
"""Parse a single multipart section to extract field name and content."""
headers_part, content = self._split_section_headers_and_content(section)

if headers_part is None:
return None, b""

# Extract field name from Content-Disposition header
name_match = re.search(r'name="([^"]+)"', headers_part)
if not name_match:
return None, b""

field_name = name_match.group(1)

# Check if it's a file field and process accordingly
if "filename=" in headers_part:
# It's a file - store as bytes
return field_name, content
else:
# It's a regular form field - decode as string
return field_name, self._decode_form_field_content(content)

def _split_section_headers_and_content(self, section: bytes) -> tuple[str | None, bytes]:
"""Split a multipart section into headers and content parts."""
header_end = section.find(b"\r\n\r\n")
if header_end == -1:
header_end = section.find(b"\n\n")
if header_end == -1:
return None, b""
content = section[header_end + 2 :].strip()
else:
content = section[header_end + 4 :].strip()

headers_part = section[:header_end].decode("utf-8", errors="ignore")
return headers_part, content

def _decode_form_field_content(self, content: bytes) -> str | bytes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I guess this method returns the decode content of the file and while this is nice, I think developers must also have access to filename, headers, content-type to reconstruct the file in the Lambda..

I'm talking about something like FastAPI is doing with UploadFile class - https://fastapi.tiangolo.com/reference/uploadfile/#fastapi.UploadFile.file.

Can you investigate this, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look in to it and see how we can have that part of the implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leandrodamascena! Great feedback! I've implemented exactly what you requested - a FastAPI-inspired UploadFile class that provides developers with complete access to filename, headers, content-type, and all metadata needed to reconstruct files in Lambda functions.

UploadFile Response:
{
"filename": "important-document.pdf",
"content_type": "application/pdf",
"size": 52,
"headers": {
"Content-Type": "application/pdf",
"X-Upload-ID": "12345",
"X-File-Hash": "abc123def456"
},
"content_preview": "PDF file content with metadata for reconstruction.",
"can_reconstruct_file": true
}

Backward Compatibility Response:
{
"message": "Existing code works!",
"size": 27
}

"""Decode form field content as string, falling back to bytes if decoding fails."""
try:
return content.decode("utf-8")
except UnicodeDecodeError:
# If can't decode as text, keep as bytes
return content


class OpenAPIResponseValidationMiddleware(BaseMiddlewareHandler):
"""
Expand Down
20 changes: 15 additions & 5 deletions aws_lambda_powertools/event_handler/openapi/dependant.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
from aws_lambda_powertools.event_handler.openapi.params import (
Body,
Dependant,
File,
Form,
Header,
Param,
ParamTypes,
Query,
_File,
analyze_param,
create_response_field,
get_flat_dependant,
Expand Down Expand Up @@ -367,13 +367,23 @@ def get_body_field_info(
if not required:
body_field_info_kwargs["default"] = None

if any(isinstance(f.field_info, _File) for f in flat_dependant.body_params):
# MAINTENANCE: body_field_info: type[Body] = _File
raise NotImplementedError("_File fields are not supported in request bodies")
elif any(isinstance(f.field_info, Form) for f in flat_dependant.body_params):
# Check for File parameters
has_file_params = any(isinstance(f.field_info, File) for f in flat_dependant.body_params)
# Check for Form parameters
has_form_params = any(isinstance(f.field_info, Form) for f in flat_dependant.body_params)

if has_file_params:
# File parameters use multipart/form-data
body_field_info = Body
body_field_info_kwargs["media_type"] = "multipart/form-data"
body_field_info_kwargs["embed"] = True
elif has_form_params:
# Form parameters use application/x-www-form-urlencoded
body_field_info = Body
body_field_info_kwargs["media_type"] = "application/x-www-form-urlencoded"
body_field_info_kwargs["embed"] = True
else:
# Regular JSON body parameters
body_field_info = Body

body_param_media_types = [
Expand Down
15 changes: 8 additions & 7 deletions aws_lambda_powertools/event_handler/openapi/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
This turns the low-level function signature into typed, validated Pydantic models for consumption.
"""

__all__ = ["Path", "Query", "Header", "Body", "Form", "File"]


class ParamTypes(Enum):
query = "query"
Expand Down Expand Up @@ -809,7 +811,7 @@ def __init__(
)


class _File(Form):
class File(Form):
"""
A class used to represent a file parameter in a path operation.
"""
Expand Down Expand Up @@ -849,12 +851,11 @@ def __init__(
**extra: Any,
):
# For file uploads, ensure the OpenAPI schema has the correct format
# Also we can't test it
file_schema_extra = {"format": "binary"} # pragma: no cover
if json_schema_extra: # pragma: no cover
json_schema_extra.update(file_schema_extra) # pragma: no cover
else: # pragma: no cover
json_schema_extra = file_schema_extra # pragma: no cover
file_schema_extra = {"format": "binary"}
if json_schema_extra:
json_schema_extra.update(file_schema_extra)
else:
json_schema_extra = file_schema_extra

super().__init__(
default=default,
Expand Down
92 changes: 92 additions & 0 deletions examples/event_handler_rest/src/file_parameter_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
Example demonstrating File parameter usage for handling file uploads.
"""

from __future__ import annotations

from typing import Annotated, Union

from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from aws_lambda_powertools.event_handler.openapi.params import File, Form

# Initialize resolver with OpenAPI validation enabled
app = APIGatewayRestResolver(enable_validation=True)


@app.post("/upload")
def upload_single_file(file: Annotated[bytes, File(description="File to upload")]):
"""Upload a single file."""
return {"status": "uploaded", "file_size": len(file), "message": "File uploaded successfully"}


@app.post("/upload-with-metadata")
def upload_file_with_metadata(
file: Annotated[bytes, File(description="File to upload")],
description: Annotated[str, Form(description="File description")],
tags: Annotated[Union[str, None], Form(description="Optional tags")] = None, # noqa: UP007
):
"""Upload a file with additional form metadata."""
return {
"status": "uploaded",
"file_size": len(file),
"description": description,
"tags": tags,
"message": "File and metadata uploaded successfully",
}


@app.post("/upload-multiple")
def upload_multiple_files(
primary_file: Annotated[bytes, File(alias="primary", description="Primary file")],
secondary_file: Annotated[bytes, File(alias="secondary", description="Secondary file")],
):
"""Upload multiple files."""
return {
"status": "uploaded",
"primary_size": len(primary_file),
"secondary_size": len(secondary_file),
"total_size": len(primary_file) + len(secondary_file),
"message": "Multiple files uploaded successfully",
}


@app.post("/upload-with-constraints")
def upload_small_file(file: Annotated[bytes, File(description="Small file only", max_length=1024)]):
"""Upload a file with size constraints (max 1KB)."""
return {
"status": "uploaded",
"file_size": len(file),
"message": f"Small file uploaded successfully ({len(file)} bytes)",
}


@app.post("/upload-optional")
def upload_optional_file(
message: Annotated[str, Form(description="Required message")],
file: Annotated[Union[bytes, None], File(description="Optional file")] = None, # noqa: UP007
):
"""Upload with an optional file parameter."""
return {
"status": "processed",
"message": message,
"has_file": file is not None,
"file_size": len(file) if file else 0,
}


# Lambda handler function
def lambda_handler(event, context):
"""AWS Lambda handler function."""
return app.resolve(event, context)


# The File parameter provides:
# 1. Automatic multipart/form-data parsing
# 2. OpenAPI schema generation with proper file upload documentation
# 3. Request validation with meaningful error messages
# 4. Support for file constraints (max_length, etc.)
# 5. Compatibility with WebKit and other browser boundary formats
# 6. Base64-encoded request handling (common in AWS Lambda)
# 7. Mixed file and form data support
# 8. Multiple file upload support
# 9. Optional file parameters
Loading