|
17 | 17 | import weakref |
18 | 18 | from collections import namedtuple |
19 | 19 | from contextlib import suppress |
| 20 | +from email.message import EmailMessage |
20 | 21 | from email.parser import HeaderParser |
| 22 | +from email.policy import HTTP |
21 | 23 | from email.utils import parsedate |
22 | 24 | from math import ceil |
23 | 25 | from pathlib import Path |
@@ -357,14 +359,40 @@ def parse_mimetype(mimetype: str) -> MimeType: |
357 | 359 | ) |
358 | 360 |
|
359 | 361 |
|
| 362 | +class EnsureOctetStream(EmailMessage): |
| 363 | + def __init__(self) -> None: |
| 364 | + super().__init__() |
| 365 | + # https://www.rfc-editor.org/rfc/rfc9110#section-8.3-5 |
| 366 | + self.set_default_type("application/octet-stream") |
| 367 | + |
| 368 | + def get_content_type(self) -> Any: |
| 369 | + """Re-implementation from Message |
| 370 | +
|
| 371 | + Returns application/octet-stream in place of plain/text when |
| 372 | + value is wrong. |
| 373 | +
|
| 374 | + The way this class is used guarantees that content-type will |
| 375 | + be present so simplify the checks wrt to the base implementation. |
| 376 | + """ |
| 377 | + value = self.get("content-type", "").lower() |
| 378 | + |
| 379 | + # Based on the implementation of _splitparam in the standard library |
| 380 | + ctype, _, _ = value.partition(";") |
| 381 | + ctype = ctype.strip() |
| 382 | + if ctype.count("/") != 1: |
| 383 | + return self.get_default_type() |
| 384 | + return ctype |
| 385 | + |
| 386 | + |
360 | 387 | @functools.lru_cache(maxsize=56) |
361 | 388 | def parse_content_type(raw: str) -> Tuple[str, MappingProxyType[str, str]]: |
362 | 389 | """Parse Content-Type header. |
363 | 390 |
|
364 | 391 | Returns a tuple of the parsed content type and a |
365 | | - MappingProxyType of parameters. |
| 392 | + MappingProxyType of parameters. The default returned value |
| 393 | + is `application/octet-stream` |
366 | 394 | """ |
367 | | - msg = HeaderParser().parsestr(f"Content-Type: {raw}") |
| 395 | + msg = HeaderParser(EnsureOctetStream, policy=HTTP).parsestr(f"Content-Type: {raw}") |
368 | 396 | content_type = msg.get_content_type() |
369 | 397 | params = msg.get_params(()) |
370 | 398 | content_dict = dict(params[1:]) # First element is content type again |
|
0 commit comments