diff --git a/src/openai/_files.py b/src/openai/_files.py index 7b23ca084a..4e7a992348 100644 --- a/src/openai/_files.py +++ b/src/openai/_files.py @@ -3,11 +3,11 @@ import io import os import pathlib -from typing import overload +import mimetypes +import asyncio +from typing import Optional, overload from typing_extensions import TypeGuard -import anyio - from ._types import ( FileTypes, FileContent, @@ -61,15 +61,59 @@ def to_httpx_files(files: RequestFiles | None) -> HttpxRequestFiles | None: def _transform_file(file: FileTypes) -> HttpxFileTypes: + if is_tuple_t(file): + name = file[0] + content = read_file_content(file[1]) + + if len(file) >= 3 and file[2] is not None: + if len(file) >= 4: + return (name, content, file[2], file[3]) + return (name, content, file[2]) + + inferred: Optional[str] = _guess_content_type_from_filename(name) + if inferred is None: + if isinstance(content, (bytes, bytearray)): + inferred = _sniff_content_type_from_bytes(bytes(content)) or "application/octet-stream" + elif isinstance(file[1], os.PathLike): + try: + inferred = _guess_content_type_from_filename(pathlib.Path(file[1]).name) # type: ignore[arg-type] + except Exception: + inferred = None + + if len(file) >= 4: + return (name, content, inferred, file[3]) + if inferred is not None: + return (name, content, inferred) + return (name, content) + if is_file_content(file): if isinstance(file, os.PathLike): path = pathlib.Path(file) - return (path.name, path.read_bytes()) - - return file - - if is_tuple_t(file): - return (file[0], read_file_content(file[1]), *file[2:]) + data = path.read_bytes() + ctype = _guess_content_type_from_filename(path.name) or _sniff_content_type_from_bytes(data) + if ctype is not None: + return (path.name, data, ctype) + return (path.name, data) + + elif isinstance(file, (bytes, bytearray)): + data = bytes(file) + ctype = _sniff_content_type_from_bytes(data) or "application/octet-stream" + name = _default_filename_for_content_type(ctype) + return (name, data, ctype) + + elif isinstance(file, io.IOBase): + file_name = None + try: + name_attr = getattr(file, "name", None) + if isinstance(name_attr, str): + file_name = os.path.basename(name_attr) + except Exception: + file_name = None + + ctype = _guess_content_type_from_filename(file_name) + if ctype is not None: + return (file_name, file, ctype) + return (file_name, file) raise TypeError(f"Expected file types input to be a FileContent type or to be a tuple") @@ -103,21 +147,100 @@ async def async_to_httpx_files(files: RequestFiles | None) -> HttpxRequestFiles async def _async_transform_file(file: FileTypes) -> HttpxFileTypes: + if is_tuple_t(file): + name = file[0] + content = await async_read_file_content(file[1]) + + if len(file) >= 3 and file[2] is not None: + if len(file) >= 4: + return (name, content, file[2], file[3]) + return (name, content, file[2]) + + inferred: Optional[str] = _guess_content_type_from_filename(name) + if inferred is None: + if isinstance(content, (bytes, bytearray)): + inferred = _sniff_content_type_from_bytes(bytes(content)) or "application/octet-stream" + elif isinstance(file[1], os.PathLike): + try: + inferred = _guess_content_type_from_filename(pathlib.Path(file[1]).name) # type: ignore[arg-type] + except Exception: + inferred = None + + if len(file) >= 4: + return (name, content, inferred, file[3]) + if inferred is not None: + return (name, content, inferred) + return (name, content) + if is_file_content(file): if isinstance(file, os.PathLike): - path = anyio.Path(file) - return (path.name, await path.read_bytes()) - - return file - - if is_tuple_t(file): - return (file[0], await async_read_file_content(file[1]), *file[2:]) + name = os.path.basename(os.fspath(file)) + data = await asyncio.to_thread(lambda: pathlib.Path(file).read_bytes()) + ctype = _guess_content_type_from_filename(name) or _sniff_content_type_from_bytes(data) + if ctype is not None: + return (name, data, ctype) + return (name, data) + + elif isinstance(file, (bytes, bytearray)): + data = bytes(file) + ctype = _sniff_content_type_from_bytes(data) or "application/octet-stream" + name = _default_filename_for_content_type(ctype) + return (name, data, ctype) + + elif isinstance(file, io.IOBase): + file_name = None + try: + name_attr = getattr(file, "name", None) + if isinstance(name_attr, str): + file_name = os.path.basename(name_attr) + except Exception: + file_name = None + + ctype = _guess_content_type_from_filename(file_name) + if ctype is not None: + return (file_name, file, ctype) + return (file_name, file) raise TypeError(f"Expected file types input to be a FileContent type or to be a tuple") async def async_read_file_content(file: FileContent) -> HttpxFileContent: if isinstance(file, os.PathLike): - return await anyio.Path(file).read_bytes() + return await asyncio.to_thread(lambda: pathlib.Path(file).read_bytes()) return file + + +def _guess_content_type_from_filename(filename: Optional[str]) -> Optional[str]: + if not filename: + return None + guessed, _ = mimetypes.guess_type(filename) + return guessed + + +def _sniff_content_type_from_bytes(data: bytes) -> Optional[str]: + # PDF: %PDF- + if len(data) >= 4 and data[:4] == b"%PDF": + return "application/pdf" + # PNG: 89 50 4E 47 0D 0A 1A 0A + if len(data) >= 8 and data[:8] == b"\x89PNG\r\n\x1a\n": + return "image/png" + # JPEG: FF D8 FF + if len(data) >= 3 and data[:3] == b"\xff\xd8\xff": + return "image/jpeg" + # GIF: GIF87a or GIF89a + if len(data) >= 6 and (data[:6] == b"GIF87a" or data[:6] == b"GIF89a"): + return "image/gif" + return None + + +def _default_filename_for_content_type(content_type: str) -> str: + if content_type == "application/pdf": + return "upload.pdf" + if content_type == "image/png": + return "upload.png" + if content_type == "image/jpeg": + return "upload.jpg" + if content_type == "image/gif": + return "upload.gif" + return "upload.bin"