|
1 | 1 | """File handling utilities for input/output operations.""" |
2 | 2 |
|
| 3 | +import contextlib |
3 | 4 | import io |
4 | 5 | import os |
5 | 6 | from pathlib import Path |
@@ -38,11 +39,30 @@ def prepare_file_input(file_input: FileInput) -> Tuple[bytes, str]: |
38 | 39 | return file_input, "document" |
39 | 40 | elif hasattr(file_input, "read"): |
40 | 41 | # Handle file-like objects |
| 42 | + # Save current position if seekable |
| 43 | + current_pos = None |
| 44 | + if hasattr(file_input, "seek") and hasattr(file_input, "tell"): |
| 45 | + try: |
| 46 | + current_pos = file_input.tell() |
| 47 | + file_input.seek(0) # Read from beginning |
| 48 | + except (OSError, io.UnsupportedOperation): |
| 49 | + pass |
| 50 | + |
41 | 51 | content = file_input.read() |
42 | 52 | if isinstance(content, str): |
43 | 53 | content = content.encode() |
| 54 | + |
| 55 | + # Restore position if we saved it |
| 56 | + if current_pos is not None: |
| 57 | + with contextlib.suppress(OSError, io.UnsupportedOperation): |
| 58 | + file_input.seek(current_pos) |
| 59 | + |
44 | 60 | filename = getattr(file_input, "name", "document") |
45 | | - if hasattr(filename, "__fspath__") or isinstance(filename, (str, bytes)): |
| 61 | + if hasattr(filename, "__fspath__"): |
| 62 | + filename = os.path.basename(os.fspath(filename)) |
| 63 | + elif isinstance(filename, bytes): |
| 64 | + filename = os.path.basename(filename.decode()) |
| 65 | + elif isinstance(filename, str): |
46 | 66 | filename = os.path.basename(filename) |
47 | 67 | return content, str(filename) |
48 | 68 | else: |
@@ -93,6 +113,10 @@ def prepare_file_for_upload( |
93 | 113 | elif hasattr(file_input, "read"): |
94 | 114 | filename = getattr(file_input, "name", "document") |
95 | 115 | if hasattr(filename, "__fspath__"): |
| 116 | + filename = os.path.basename(os.fspath(filename)) |
| 117 | + elif isinstance(filename, bytes): |
| 118 | + filename = os.path.basename(filename.decode()) |
| 119 | + elif isinstance(filename, str): |
96 | 120 | filename = os.path.basename(filename) |
97 | 121 | return field_name, (str(filename), file_input, content_type) |
98 | 122 |
|
@@ -150,7 +174,10 @@ def get_file_size(file_input: FileInput) -> Optional[int]: |
150 | 174 | Returns: |
151 | 175 | File size in bytes, or None if size cannot be determined. |
152 | 176 | """ |
153 | | - if isinstance(file_input, str): |
| 177 | + if isinstance(file_input, Path): |
| 178 | + if file_input.exists(): |
| 179 | + return file_input.stat().st_size |
| 180 | + elif isinstance(file_input, str): |
154 | 181 | path = Path(file_input) |
155 | 182 | if path.exists(): |
156 | 183 | return path.stat().st_size |
|
0 commit comments