Skip to content

Commit c5e6674

Browse files
committed
refactor: reduce cognitive complexity in multipart parsing
- Break down _parse_multipart_data method into smaller helper methods - Reduce cognitive complexity from 43 to under 15 per SonarCloud requirement - Improve code readability and maintainability - All existing tests continue to pass Helper methods created: - _decode_request_body: Handle base64 decoding - _extract_boundary_bytes: Extract multipart boundary - _parse_multipart_sections: Parse sections into data dict - _parse_multipart_section: Handle individual section parsing - _split_section_headers_and_content: Split headers/content - _decode_form_field_content: Decode form field as string Addresses SonarCloud cognitive complexity violation while maintaining all existing functionality for File parameter multipart parsing.
1 parent f074f30 commit c5e6674

File tree

1 file changed

+97
-67
lines changed

1 file changed

+97
-67
lines changed

aws_lambda_powertools/event_handler/middlewares/openapi_validation.py

Lines changed: 97 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -177,74 +177,10 @@ def _parse_form_data(self, app: EventHandlerInstance) -> dict[str, Any]:
177177

178178
def _parse_multipart_data(self, app: EventHandlerInstance, content_type: str) -> dict[str, Any]:
179179
"""Parse multipart/form-data."""
180-
import base64
181-
182180
try:
183-
# Get the raw body - it might be base64 encoded
184-
body = app.current_event.body or ""
185-
186-
# Handle base64 encoded body (common in Lambda)
187-
if app.current_event.is_base64_encoded:
188-
try:
189-
decoded_bytes = base64.b64decode(body)
190-
except Exception:
191-
# If decoding fails, use body as-is
192-
decoded_bytes = body.encode("utf-8") if isinstance(body, str) else body
193-
else:
194-
decoded_bytes = body.encode("utf-8") if isinstance(body, str) else body
195-
196-
# Extract boundary from content type - handle both standard and WebKit boundaries
197-
boundary_match = re.search(r"boundary=([^;,\s]+)", content_type)
198-
if not boundary_match:
199-
# Handle WebKit browsers that may use different boundary formats
200-
webkit_match = re.search(r"WebKitFormBoundary([a-zA-Z0-9]+)", content_type)
201-
if webkit_match:
202-
boundary = "WebKitFormBoundary" + webkit_match.group(1)
203-
else:
204-
raise ValueError("No boundary found in multipart content-type")
205-
else:
206-
boundary = boundary_match.group(1).strip('"')
207-
boundary_bytes = ("--" + boundary).encode("utf-8")
208-
209-
# Parse multipart sections
210-
parsed_data: dict[str, Any] = {}
211-
if decoded_bytes:
212-
sections = decoded_bytes.split(boundary_bytes)
213-
214-
for section in sections[1:-1]: # Skip first empty and last closing parts
215-
if not section.strip():
216-
continue
217-
218-
# Split headers and content
219-
header_end = section.find(b"\r\n\r\n")
220-
if header_end == -1:
221-
header_end = section.find(b"\n\n")
222-
if header_end == -1:
223-
continue
224-
content = section[header_end + 2 :].strip()
225-
else:
226-
content = section[header_end + 4 :].strip()
227-
228-
headers_part = section[:header_end].decode("utf-8", errors="ignore")
229-
230-
# Extract field name from Content-Disposition header
231-
name_match = re.search(r'name="([^"]+)"', headers_part)
232-
if name_match:
233-
field_name = name_match.group(1)
234-
235-
# Check if it's a file field
236-
if "filename=" in headers_part:
237-
# It's a file - store as bytes
238-
parsed_data[field_name] = content
239-
else:
240-
# It's a regular form field - decode as string
241-
try:
242-
parsed_data[field_name] = content.decode("utf-8")
243-
except UnicodeDecodeError:
244-
# If can't decode as text, keep as bytes
245-
parsed_data[field_name] = content
246-
247-
return parsed_data
181+
decoded_bytes = self._decode_request_body(app)
182+
boundary_bytes = self._extract_boundary_bytes(content_type)
183+
return self._parse_multipart_sections(decoded_bytes, boundary_bytes)
248184

249185
except Exception as e:
250186
raise RequestValidationError(
@@ -259,6 +195,100 @@ def _parse_multipart_data(self, app: EventHandlerInstance, content_type: str) ->
259195
]
260196
) from e
261197

198+
def _decode_request_body(self, app: EventHandlerInstance) -> bytes:
199+
"""Decode the request body, handling base64 encoding if necessary."""
200+
import base64
201+
202+
body = app.current_event.body or ""
203+
204+
if app.current_event.is_base64_encoded:
205+
try:
206+
return base64.b64decode(body)
207+
except Exception:
208+
# If decoding fails, use body as-is
209+
return body.encode("utf-8") if isinstance(body, str) else body
210+
else:
211+
return body.encode("utf-8") if isinstance(body, str) else body
212+
213+
def _extract_boundary_bytes(self, content_type: str) -> bytes:
214+
"""Extract and return the boundary bytes from the content type header."""
215+
boundary_match = re.search(r"boundary=([^;,\s]+)", content_type)
216+
217+
if not boundary_match:
218+
# Handle WebKit browsers that may use different boundary formats
219+
webkit_match = re.search(r"WebKitFormBoundary([a-zA-Z0-9]+)", content_type)
220+
if webkit_match:
221+
boundary = "WebKitFormBoundary" + webkit_match.group(1)
222+
else:
223+
raise ValueError("No boundary found in multipart content-type")
224+
else:
225+
boundary = boundary_match.group(1).strip('"')
226+
227+
return ("--" + boundary).encode("utf-8")
228+
229+
def _parse_multipart_sections(self, decoded_bytes: bytes, boundary_bytes: bytes) -> dict[str, Any]:
230+
"""Parse individual multipart sections from the decoded body."""
231+
parsed_data: dict[str, Any] = {}
232+
233+
if not decoded_bytes:
234+
return parsed_data
235+
236+
sections = decoded_bytes.split(boundary_bytes)
237+
238+
for section in sections[1:-1]: # Skip first empty and last closing parts
239+
if not section.strip():
240+
continue
241+
242+
field_name, content = self._parse_multipart_section(section)
243+
if field_name:
244+
parsed_data[field_name] = content
245+
246+
return parsed_data
247+
248+
def _parse_multipart_section(self, section: bytes) -> tuple[str | None, bytes | str]:
249+
"""Parse a single multipart section to extract field name and content."""
250+
headers_part, content = self._split_section_headers_and_content(section)
251+
252+
if headers_part is None:
253+
return None, b""
254+
255+
# Extract field name from Content-Disposition header
256+
name_match = re.search(r'name="([^"]+)"', headers_part)
257+
if not name_match:
258+
return None, b""
259+
260+
field_name = name_match.group(1)
261+
262+
# Check if it's a file field and process accordingly
263+
if "filename=" in headers_part:
264+
# It's a file - store as bytes
265+
return field_name, content
266+
else:
267+
# It's a regular form field - decode as string
268+
return field_name, self._decode_form_field_content(content)
269+
270+
def _split_section_headers_and_content(self, section: bytes) -> tuple[str | None, bytes]:
271+
"""Split a multipart section into headers and content parts."""
272+
header_end = section.find(b"\r\n\r\n")
273+
if header_end == -1:
274+
header_end = section.find(b"\n\n")
275+
if header_end == -1:
276+
return None, b""
277+
content = section[header_end + 2:].strip()
278+
else:
279+
content = section[header_end + 4:].strip()
280+
281+
headers_part = section[:header_end].decode("utf-8", errors="ignore")
282+
return headers_part, content
283+
284+
def _decode_form_field_content(self, content: bytes) -> str | bytes:
285+
"""Decode form field content as string, falling back to bytes if decoding fails."""
286+
try:
287+
return content.decode("utf-8")
288+
except UnicodeDecodeError:
289+
# If can't decode as text, keep as bytes
290+
return content
291+
262292

263293
class OpenAPIResponseValidationMiddleware(BaseMiddlewareHandler):
264294
"""

0 commit comments

Comments
 (0)