Skip to content

Commit 80da5ea

Browse files
committed
fix sonarqube code smell findings
1 parent d3b46be commit 80da5ea

File tree

2 files changed

+97
-52
lines changed

2 files changed

+97
-52
lines changed

aws_lambda_powertools/event_handler/api_gateway.py

Lines changed: 87 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
Server,
9393
Tag,
9494
)
95-
from aws_lambda_powertools.event_handler.openapi.params import Dependant
95+
from aws_lambda_powertools.event_handler.openapi.params import Dependant, Param
9696
from aws_lambda_powertools.event_handler.openapi.swagger_ui.oauth2 import (
9797
OAuth2Config,
9898
)
@@ -812,74 +812,110 @@ def _openapi_operation_parameters(
812812
"""
813813
Returns the OpenAPI operation parameters.
814814
"""
815-
from aws_lambda_powertools.event_handler.openapi.compat import get_schema_from_model_field
816815
from aws_lambda_powertools.event_handler.openapi.params import Param
817816

818-
parameters = []
819-
parameter: dict[str, Any] = {}
817+
parameters: list[dict[str, Any]] = []
820818

821819
for param in all_route_params:
822-
field_info = param.field_info
823-
field_info = cast(Param, field_info)
820+
field_info = cast(Param, param.field_info)
824821
if not field_info.include_in_schema:
825822
continue
826823

827824
# Check if this is a Pydantic model that should be expanded
828-
from pydantic import BaseModel
829-
830-
from aws_lambda_powertools.event_handler.openapi.compat import lenient_issubclass
831-
832-
if lenient_issubclass(field_info.annotation, BaseModel):
833-
# Expand Pydantic model into individual parameters
834-
model_class = cast(type[BaseModel], field_info.annotation)
835-
836-
for field_name, field_def in model_class.model_fields.items():
837-
if not field_def.annotation:
838-
continue
839-
# Create individual parameter for each model field
840-
param_name = field_def.alias or field_name
841-
842-
individual_param = {
843-
"name": param_name,
844-
"in": field_info.in_.value,
845-
"required": field_def.is_required()
846-
if hasattr(field_def, "is_required")
847-
else field_def.default is ...,
848-
"schema": Route._get_basic_type_schema(field_def.annotation),
849-
}
825+
if Route._is_pydantic_model_param(field_info):
826+
parameters.extend(Route._expand_pydantic_model_parameters(field_info))
827+
else:
828+
parameters.append(Route._create_regular_parameter(param, model_name_map, field_mapping))
850829

851-
if field_def.description:
852-
individual_param["description"] = field_def.description
830+
return parameters
853831

854-
parameters.append(individual_param)
855-
else:
856-
# Regular parameter processing
857-
param_schema = get_schema_from_model_field(
858-
field=param,
859-
model_name_map=model_name_map,
860-
field_mapping=field_mapping,
861-
)
832+
@staticmethod
833+
def _is_pydantic_model_param(field_info: ModelField | Param) -> bool:
834+
"""Check if the field info represents a Pydantic model parameter."""
835+
from pydantic import BaseModel
836+
837+
from aws_lambda_powertools.event_handler.openapi.compat import lenient_issubclass
838+
from aws_lambda_powertools.event_handler.openapi.params import Param
862839

863-
parameter = {
864-
"name": param.alias,
865-
"in": field_info.in_.value,
866-
"required": param.required,
867-
"schema": param_schema,
868-
}
840+
if not isinstance(field_info, Param):
841+
return False
842+
return lenient_issubclass(field_info.annotation, BaseModel)
869843

870-
if field_info.description:
871-
parameter["description"] = field_info.description
844+
@staticmethod
845+
def _expand_pydantic_model_parameters(field_info: Param) -> list[dict[str, Any]]:
846+
"""Expand a Pydantic model into individual OpenAPI parameters."""
847+
from pydantic import BaseModel
872848

873-
if field_info.openapi_examples:
874-
parameter["examples"] = field_info.openapi_examples
849+
model_class = cast(type[BaseModel], field_info.annotation)
850+
parameters: list[dict[str, Any]] = []
875851

876-
if field_info.deprecated:
877-
parameter["deprecated"] = field_info.deprecated
852+
for field_name, field_def in model_class.model_fields.items():
853+
if not field_def.annotation:
854+
continue
878855

879-
parameters.append(parameter)
856+
param_name = field_def.alias or field_name
857+
individual_param = Route._create_pydantic_field_parameter(
858+
param_name=param_name,
859+
field_def=field_def,
860+
param_location=field_info.in_.value,
861+
)
862+
parameters.append(individual_param)
880863

881864
return parameters
882865

866+
@staticmethod
867+
def _create_pydantic_field_parameter(
868+
param_name: str,
869+
field_def: Any,
870+
param_location: str,
871+
) -> dict[str, Any]:
872+
"""Create an OpenAPI parameter from a Pydantic field definition."""
873+
individual_param: dict[str, Any] = {
874+
"name": param_name,
875+
"in": param_location,
876+
"required": field_def.is_required() if hasattr(field_def, "is_required") else field_def.default is ...,
877+
"schema": Route._get_basic_type_schema(field_def.annotation or type(None)),
878+
}
879+
880+
if field_def.description:
881+
individual_param["description"] = field_def.description
882+
883+
return individual_param
884+
885+
@staticmethod
886+
def _create_regular_parameter(
887+
param: ModelField,
888+
model_name_map: dict[TypeModelOrEnum, str],
889+
field_mapping: dict[tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue],
890+
) -> dict[str, Any]:
891+
"""Create an OpenAPI parameter from a regular ModelField."""
892+
from aws_lambda_powertools.event_handler.openapi.compat import get_schema_from_model_field
893+
from aws_lambda_powertools.event_handler.openapi.params import Param
894+
895+
field_info = cast(Param, param.field_info)
896+
param_schema = get_schema_from_model_field(
897+
field=param,
898+
model_name_map=model_name_map,
899+
field_mapping=field_mapping,
900+
)
901+
902+
parameter: dict[str, Any] = {
903+
"name": param.alias,
904+
"in": field_info.in_.value,
905+
"required": param.required,
906+
"schema": param_schema,
907+
}
908+
909+
# Add optional attributes if present
910+
if field_info.description:
911+
parameter["description"] = field_info.description
912+
if field_info.openapi_examples:
913+
parameter["examples"] = field_info.openapi_examples
914+
if field_info.deprecated:
915+
parameter["deprecated"] = field_info.deprecated
916+
917+
return parameter
918+
883919
@staticmethod
884920
def _get_basic_type_schema(param_type: type) -> dict[str, str]:
885921
"""

tests/functional/event_handler/_pydantic/test_openapi_validation_middleware.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2449,7 +2449,7 @@ def query_model_advanced(params: Annotated[QueryAdvanced, Query()]) -> Dict[str,
24492449

24502450
# Test QuerySimple validation error (name doesn't start with "Powertools")
24512451
gw_event["queryStringParameters"] = {
2452-
"full_name": "Lambda Powertools", # Wrong order
2452+
"full_name": "Lambda Powertools",
24532453
"next_token": "dGVzdA==",
24542454
"search_id": "search-123",
24552455
}
@@ -2461,6 +2461,15 @@ def query_model_advanced(params: Annotated[QueryAdvanced, Query()]) -> Dict[str,
24612461
assert "detail" in body
24622462
errors = body["detail"]
24632463

2464+
# Should have validation error for full_name with proper location
2465+
full_name_error = next((e for e in errors if "full_name" in e["loc"]), None)
2466+
2467+
assert full_name_error is not None, "Should have error for full_name field"
2468+
2469+
# Check error details for full_name
2470+
assert full_name_error["loc"] == ["query", "params", "full_name"]
2471+
assert full_name_error["type"] == "value_error"
2472+
24642473
# Test QueryAdvanced with ConfigDict and alias_generator
24652474
gw_event["path"] = "/query-model-advanced"
24662475
gw_event["queryStringParameters"] = {

0 commit comments

Comments
 (0)