Skip to content
210 changes: 128 additions & 82 deletions src/openai/_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
import os
import pathlib
import mimetypes
from typing import overload
import asyncio
from typing import Optional, overload
from typing_extensions import TypeGuard

import anyio

from ._types import (
FileTypes,
FileContent,
Expand Down Expand Up @@ -39,40 +38,6 @@ def assert_is_file_content(obj: object, *, key: str | None = None) -> None:
) from None


def _guess_content_type_from_filename(filename: str | None) -> str | None:
"""Guess content type from filename using mimetypes module."""
if not filename:
return None
guessed, _ = mimetypes.guess_type(filename)
return guessed


def _sniff_content_type_from_bytes(data: bytes) -> str | None:
"""Minimal sniffing for common types we care about."""
# PDF
if data.startswith(b"%PDF-"):
return "application/pdf"
# PNG
if data.startswith(b"\x89PNG\r\n\x1a\n"):
return "image/png"
# JPEG
if data.startswith(b"\xff\xd8\xff"):
return "image/jpeg"
# GIF
if data.startswith(b"GIF87a") or data.startswith(b"GIF89a"):
return "image/gif"
return None


def _ensure_tuple_with_content_type(
filename: str | None, content: HttpxFileContent, inferred: str | None
) -> tuple[str | None, HttpxFileContent, str | None]:
"""Ensure we return a 3-tuple with content type if we inferred one."""
if inferred:
return (filename, content, inferred)
return (filename, content, None)


@overload
def to_httpx_files(files: None) -> None: ...

Expand All @@ -96,36 +61,59 @@ def to_httpx_files(files: RequestFiles | None) -> HttpxRequestFiles | None:


def _transform_file(file: FileTypes) -> HttpxFileTypes:
if is_tuple_t(file):
name = file[0]
content = read_file_content(file[1])

if len(file) >= 3 and file[2] is not None:
if len(file) >= 4:
return (name, content, file[2], file[3])
return (name, content, file[2])

inferred: Optional[str] = _guess_content_type_from_filename(name)
if inferred is None:
if isinstance(content, (bytes, bytearray)):
inferred = _sniff_content_type_from_bytes(bytes(content)) or "application/octet-stream"
elif isinstance(file[1], os.PathLike):
try:
inferred = _guess_content_type_from_filename(pathlib.Path(file[1]).name) # type: ignore[arg-type]
except Exception:
inferred = None

if len(file) >= 4:
return (name, content, inferred, file[3])
if inferred is not None:
return (name, content, inferred)
return (name, content)

if is_file_content(file):
if isinstance(file, os.PathLike):
path = pathlib.Path(file)
data = path.read_bytes()
filename = path.name
inferred = _guess_content_type_from_filename(filename)
return _ensure_tuple_with_content_type(filename, data, inferred)

if isinstance(file, bytes):
inferred = _sniff_content_type_from_bytes(file)
return _ensure_tuple_with_content_type(None, file, inferred)

if isinstance(file, io.IOBase):
# Attempt to use file name if available
filename = None
ctype = _guess_content_type_from_filename(path.name) or _sniff_content_type_from_bytes(data)
if ctype is not None:
return (path.name, data, ctype)
return (path.name, data)

elif isinstance(file, (bytes, bytearray)):
data = bytes(file)
ctype = _sniff_content_type_from_bytes(data) or "application/octet-stream"
name = _default_filename_for_content_type(ctype)
return (name, data, ctype)

elif isinstance(file, io.IOBase):
file_name = None
try:
name_attr = getattr(file, "name", None)
if isinstance(name_attr, str):
filename = os.path.basename(name_attr)
file_name = os.path.basename(name_attr)
except Exception:
pass

data = file.read()
inferred = _guess_content_type_from_filename(filename) or _sniff_content_type_from_bytes(data)
return _ensure_tuple_with_content_type(filename, data, inferred)
file_name = None

return file

if is_tuple_t(file):
return (file[0], read_file_content(file[1]), *file[2:])
ctype = _guess_content_type_from_filename(file_name)
if ctype is not None:
return (file_name, file, ctype)
return (file_name, file)

raise TypeError(f"Expected file types input to be a FileContent type or to be a tuple")

Expand Down Expand Up @@ -159,42 +147,100 @@ async def async_to_httpx_files(files: RequestFiles | None) -> HttpxRequestFiles


async def _async_transform_file(file: FileTypes) -> HttpxFileTypes:
if is_tuple_t(file):
name = file[0]
content = await async_read_file_content(file[1])

if len(file) >= 3 and file[2] is not None:
if len(file) >= 4:
return (name, content, file[2], file[3])
return (name, content, file[2])

inferred: Optional[str] = _guess_content_type_from_filename(name)
if inferred is None:
if isinstance(content, (bytes, bytearray)):
inferred = _sniff_content_type_from_bytes(bytes(content)) or "application/octet-stream"
elif isinstance(file[1], os.PathLike):
try:
inferred = _guess_content_type_from_filename(pathlib.Path(file[1]).name) # type: ignore[arg-type]
except Exception:
inferred = None

if len(file) >= 4:
return (name, content, inferred, file[3])
if inferred is not None:
return (name, content, inferred)
return (name, content)

if is_file_content(file):
if isinstance(file, os.PathLike):
path = anyio.Path(file)
data: bytes = await path.read_bytes()
filename = os.path.basename(str(file))
inferred = _guess_content_type_from_filename(filename)
return _ensure_tuple_with_content_type(filename, data, inferred)

if isinstance(file, bytes):
inferred = _sniff_content_type_from_bytes(file)
return _ensure_tuple_with_content_type(None, file, inferred)

if isinstance(file, io.IOBase):
# Attempt to use file name if available
filename = None
name = os.path.basename(os.fspath(file))
data = await asyncio.to_thread(lambda: pathlib.Path(file).read_bytes())
ctype = _guess_content_type_from_filename(name) or _sniff_content_type_from_bytes(data)
if ctype is not None:
return (name, data, ctype)
return (name, data)

elif isinstance(file, (bytes, bytearray)):
data = bytes(file)
ctype = _sniff_content_type_from_bytes(data) or "application/octet-stream"
name = _default_filename_for_content_type(ctype)
return (name, data, ctype)

elif isinstance(file, io.IOBase):
file_name = None
try:
name_attr = getattr(file, "name", None)
if isinstance(name_attr, str):
filename = os.path.basename(name_attr)
file_name = os.path.basename(name_attr)
except Exception:
pass

data = file.read()
inferred = _guess_content_type_from_filename(filename) or _sniff_content_type_from_bytes(data)
return _ensure_tuple_with_content_type(filename, data, inferred)
file_name = None

return file

if is_tuple_t(file):
return (file[0], await async_read_file_content(file[1]), *file[2:])
ctype = _guess_content_type_from_filename(file_name)
if ctype is not None:
return (file_name, file, ctype)
return (file_name, file)

raise TypeError(f"Expected file types input to be a FileContent type or to be a tuple")


async def async_read_file_content(file: FileContent) -> HttpxFileContent:
if isinstance(file, os.PathLike):
return await anyio.Path(file).read_bytes()
return await asyncio.to_thread(lambda: pathlib.Path(file).read_bytes())

return file


def _guess_content_type_from_filename(filename: Optional[str]) -> Optional[str]:
if not filename:
return None
guessed, _ = mimetypes.guess_type(filename)
return guessed


def _sniff_content_type_from_bytes(data: bytes) -> Optional[str]:
# PDF: %PDF-
if len(data) >= 4 and data[:4] == b"%PDF":
return "application/pdf"
# PNG: 89 50 4E 47 0D 0A 1A 0A
if len(data) >= 8 and data[:8] == b"\x89PNG\r\n\x1a\n":
return "image/png"
# JPEG: FF D8 FF
if len(data) >= 3 and data[:3] == b"\xff\xd8\xff":
return "image/jpeg"
# GIF: GIF87a or GIF89a
if len(data) >= 6 and (data[:6] == b"GIF87a" or data[:6] == b"GIF89a"):
return "image/gif"
return None


def _default_filename_for_content_type(content_type: str) -> str:
if content_type == "application/pdf":
return "upload.pdf"
if content_type == "image/png":
return "upload.png"
if content_type == "image/jpeg":
return "upload.jpg"
if content_type == "image/gif":
return "upload.gif"
return "upload.bin"