Skip to content

Commit c3ef7bd

Browse files
committed
refactor(event-handler): reduce cognitive complexity in _request_body_to_args
- Extract helper functions to reduce cognitive complexity from 24 to under 15 - _get_field_location: Extract field location logic - _get_field_value: Extract value retrieval logic with error handling - _resolve_field_type: Extract Union type resolution logic - _convert_value_type: Extract UploadFile/bytes conversion logic - Maintain all existing functionality and test coverage - Improve code readability and maintainability
1 parent bd19bee commit c3ef7bd

File tree

1 file changed

+51
-34
lines changed

1 file changed

+51
-34
lines changed

aws_lambda_powertools/event_handler/middlewares/openapi_validation.py

Lines changed: 51 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,47 @@ def _request_params_to_args(
488488
return values, errors
489489

490490

491+
def _get_field_location(field: ModelField, field_alias_omitted: bool) -> tuple[str, ...]:
492+
"""Get the location tuple for a field based on whether alias is omitted."""
493+
if field_alias_omitted:
494+
return ("body",)
495+
return ("body", field.alias)
496+
497+
498+
def _get_field_value(received_body: dict[str, Any] | None, field: ModelField) -> Any | None:
499+
"""Extract field value from received body, returning None if not found or on error."""
500+
if received_body is None:
501+
return None
502+
503+
try:
504+
return received_body.get(field.alias)
505+
except AttributeError:
506+
return None
507+
508+
509+
def _resolve_field_type(field_type: type) -> type:
510+
"""Resolve the actual field type, handling Union types by returning the first non-None type."""
511+
from typing import get_args, get_origin
512+
513+
if get_origin(field_type) is Union:
514+
union_args = get_args(field_type)
515+
non_none_types = [arg for arg in union_args if arg is not type(None)]
516+
if non_none_types:
517+
return non_none_types[0]
518+
return field_type
519+
520+
521+
def _convert_value_type(value: Any, field_type: type) -> Any:
522+
"""Convert value between UploadFile and bytes for type compatibility."""
523+
if isinstance(value, UploadFile) and field_type is bytes:
524+
# Convert UploadFile to bytes for backward compatibility
525+
return value.file
526+
elif isinstance(value, bytes) and field_type == UploadFile:
527+
# Convert bytes to UploadFile if that's what's expected
528+
return UploadFile(file=value)
529+
return value
530+
531+
491532
def _request_body_to_args(
492533
required_params: list[ModelField],
493534
received_body: dict[str, Any] | None,
@@ -505,53 +546,29 @@ def _request_body_to_args(
505546
)
506547

507548
for field in required_params:
508-
# This sets the location to:
509-
# { "user": { object } } if field.alias == user
510-
# { { object } if field_alias is omitted
511-
loc: tuple[str, ...] = ("body", field.alias)
512-
if field_alias_omitted:
513-
loc = ("body",)
514-
515-
value: Any | None = None
549+
loc = _get_field_location(field, field_alias_omitted)
550+
value = _get_field_value(received_body, field)
516551

517-
# Now that we know what to look for, try to get the value from the received body
518-
if received_body is not None:
552+
# Handle AttributeError from _get_field_value
553+
if received_body is not None and value is None:
519554
try:
520-
value = received_body.get(field.alias)
555+
# Double-check with direct access to distinguish None value from AttributeError
556+
received_body.get(field.alias)
521557
except AttributeError:
522558
errors.append(get_missing_field_error(loc))
523559
continue
524560

525-
# Determine if the field is required
561+
# Handle missing values
526562
if value is None:
527563
if field.required:
528564
errors.append(get_missing_field_error(loc))
529565
else:
530566
values[field.name] = deepcopy(field.default)
531567
continue
532568

533-
# MAINTENANCE: Handle byte and file fields
534-
# Check if we have an UploadFile but the field expects bytes
535-
from typing import get_args, get_origin
536-
537-
field_type = field.type_
538-
539-
# Handle Union types (e.g., Union[bytes, None] for optional parameters)
540-
if get_origin(field_type) is Union:
541-
# Get the non-None types from the Union
542-
union_args = get_args(field_type)
543-
non_none_types = [arg for arg in union_args if arg is not type(None)]
544-
if non_none_types:
545-
field_type = non_none_types[0] # Use the first non-None type
546-
547-
if isinstance(value, UploadFile) and field_type is bytes:
548-
# Convert UploadFile to bytes for backward compatibility
549-
value = value.file
550-
elif isinstance(value, bytes) and field_type == UploadFile:
551-
# Convert bytes to UploadFile if that's what's expected
552-
# This shouldn't normally happen in our current implementation,
553-
# but provides a fallback path
554-
value = UploadFile(file=value)
569+
# Handle type conversions for UploadFile/bytes compatibility
570+
field_type = _resolve_field_type(field.type_)
571+
value = _convert_value_type(value, field_type)
555572

556573
# Finally, validate the value
557574
values[field.name] = _validate_field(field=field, value=value, loc=loc, existing_errors=errors)

0 commit comments

Comments
 (0)