|
1 | | -import json |
2 | | -import math |
3 | | -from dataclasses import dataclass, asdict |
4 | 1 | from starlette.datastructures import Headers |
5 | | -from typing import Optional, Self |
| 2 | +from pydantic import BaseModel, Field, field_validator, ByteSize, StrictStr, ConfigDict, AliasChoices |
| 3 | +from typing import Optional, Self, Annotated |
6 | 4 |
|
7 | 5 |
|
8 | | -@dataclass(frozen=True) |
9 | | -class FileMetadata: |
10 | | - size: int |
11 | | - name: str |
12 | | - content_type: Optional[str] = None |
| 6 | +class FileMetadata(BaseModel): |
| 7 | + name: StrictStr = Field(description="File name", min_length=2, max_length=255, validation_alias=AliasChoices('name', 'file_name')) |
| 8 | + size: ByteSize = Field(description="Size in bytes", gt=0,validation_alias=AliasChoices('size', 'file_size')) |
| 9 | + type: StrictStr = Field(description="MIME type", default='application/octet-stream',validation_alias=AliasChoices('type', 'file_type', 'content_type')) |
13 | 10 |
|
14 | | - def to_json(self) -> str: |
15 | | - return json.dumps(asdict(self), skipkeys=True) |
| 11 | + model_config = ConfigDict(validate_by_name=True, populate_by_name=True) |
16 | 12 |
|
17 | | - def to_readable_dict(self) -> dict: |
18 | | - return dict( |
19 | | - file_name=self.name, |
20 | | - file_size=self.format_size(self.size), |
21 | | - file_type=self.format_type(self.content_type) |
22 | | - ) |
| 13 | + @field_validator('name') |
| 14 | + @classmethod |
| 15 | + def validate_name(cls, v: str) -> str: |
| 16 | + safe_filename = str(v).translate(str.maketrans(':;|*@/\\', ' ')).strip() |
| 17 | + return safe_filename.encode('latin-1', 'ignore').decode('utf-8', 'ignore') |
23 | 18 |
|
24 | 19 | @classmethod |
25 | 20 | def from_json(cls, data: str) -> Self: |
26 | | - return cls(**json.loads(data)) |
| 21 | + return cls.model_validate_json(data) |
| 22 | + |
| 23 | + def to_json(self) -> str: |
| 24 | + return self.model_dump_json() |
27 | 25 |
|
28 | 26 | @classmethod |
29 | 27 | def get_from_http_headers(cls, headers: Headers, filename: str) -> Self: |
| 28 | + """Create metadata from headers of an HTTP upload request.""" |
30 | 29 | return cls( |
31 | | - name=cls.escape_filename(filename), |
32 | | - size=cls.process_length(headers.get('content-length', '0')), |
33 | | - content_type=headers.get('content-type', '') |
| 30 | + name=filename, |
| 31 | + size=headers.get('content-length', '0'), |
| 32 | + type=headers.get('content-type', '') or None |
34 | 33 | ) |
35 | 34 |
|
36 | 35 | @classmethod |
37 | 36 | def get_from_json(cls, header: dict) -> Self: |
38 | | - return cls( |
39 | | - name=cls.escape_filename(header['file_name']), |
40 | | - size=cls.process_length(header['file_size']), |
41 | | - content_type=header['file_type'] |
42 | | - ) |
43 | | - |
44 | | - @staticmethod |
45 | | - def escape_filename(filename: str | int) -> str: |
46 | | - """Escape special characters in the filename.""" |
47 | | - safe_filename = str(filename).translate(str.maketrans('', '', ':;|*@/\\')) |
48 | | - return safe_filename.encode('latin-1', 'ignore').decode('utf-8', 'ignore') |
49 | | - |
50 | | - @staticmethod |
51 | | - def format_size(size_bytes: int) -> str: |
52 | | - """Return human-readable file size.""" |
53 | | - if size_bytes == 0: |
54 | | - return "0 B" |
55 | | - units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB") |
56 | | - i = math.floor(math.log(size_bytes, 1024)) |
57 | | - p = math.pow(1024, i) |
58 | | - s = round(size_bytes / p, 1) |
59 | | - return f"{s} {units[i]}" |
60 | | - |
61 | | - @staticmethod |
62 | | - def format_type(content_type: Optional[str]) -> str: |
63 | | - """Return human-readable file type.""" |
64 | | - return content_type or "unknown" |
| 37 | + """Create metadata from a JSON dictionary.""" |
| 38 | + return cls(**header) |
65 | 39 |
|
66 | | - @staticmethod |
67 | | - def process_length(length: str | int) -> int: |
68 | | - """Convert size string to bytes.""" |
69 | | - try: |
70 | | - size = int(str(length).strip().replace(' ', '')) |
71 | | - except ValueError: |
72 | | - raise ValueError(f"Invalid size format: {length}") |
73 | | - if size <= 0: |
74 | | - raise ValueError("File size has to be positive.") |
75 | | - return size |
| 40 | + def to_readable_dict(self) -> dict: |
| 41 | + return dict( |
| 42 | + file_name=self.name, |
| 43 | + file_size=self.size.human_readable(), |
| 44 | + file_type=self.type, |
| 45 | + ) |
76 | 46 |
|
77 | 47 | def __str__(self): |
78 | | - return f"{self.name} ({self.size/(1024**2):.1f} MiB - {self.content_type})" |
| 48 | + return f"{self.name} ({self.size.human_readable()} - {self.type})" |
79 | 49 |
|
80 | 50 | def __repr__(self): |
81 | | - return f"FileMetadata(name={self.name!r}, size={self.size/(1024**2):.1f}, content_type={self.content_type!r})" |
| 51 | + return f"FileMetadata(name={self.name!r}, size={self.size.human_readable()}, type={self.type!r})" |
0 commit comments