Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/batch.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"custom_id": "request-2", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "facebook/opt-125m", "messages": [{"role": "system", "content": "You are an unhelpful assistant."},{"role": "user", "content": "Hello world!"}],"max_completion_tokens": 1000}}
{"custom_id": "request-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "facebook/opt-125m", "messages": [{"role": "system", "content": "You are an unhelpful assistant."},{"role": "user", "content": "Hello world!"}],"max_completion_tokens": 1000}}
42 changes: 42 additions & 0 deletions examples/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
This script uploads JSONL files to the server, which can be used to run
batch inference on the VLLM model.
"""

from pathlib import Path

import rich
from openai import OpenAI

# get the current directory
current_dir = Path(__file__).parent
# generate this file using `./generate_file.sh`
filepath = current_dir / "batch.jsonl"

# Modify OpenAI's API key and API base to use vLLM's API server.
openai_api_key = "EMPTY"
openai_api_base = "http://localhost:8000/v1"

client = OpenAI(
api_key=openai_api_key,
base_url=openai_api_base,
)


def from_in_memory() -> None:
file = client.files.create(
file=filepath.read_bytes(),
purpose="batch",
)
return file


if __name__ == "__main__":
file = from_in_memory()

# get the file according to the file id
retrieved = client.files.retrieve(file.id)
rich.print(retrieved)

file_content = client.files.retrieve_content(file.id)
rich.print(file_content.encode("utf-8"))
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"kubernetes",
"prometheus_client",
"uhashring",
"aiofiles",
"python-multipart",
],
entry_points={
"console_scripts": [
Expand Down
10 changes: 10 additions & 0 deletions src/vllm_router/files/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from vllm_router.files.file_storage import FileStorage
from vllm_router.files.files import OpenAIFile
from vllm_router.files.storage import Storage, initialize_storage

__all__ = [
"OpenAIFile",
"Storage",
"FileStorage",
"initialize_storage",
]
123 changes: 123 additions & 0 deletions src/vllm_router/files/file_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import os
import uuid
from typing import List

import aiofiles

from vllm_router.files.files import OpenAIFile
from vllm_router.files.storage import Storage
from vllm_router.log import init_logger

logger = init_logger(__name__)


class FileStorage(Storage):
"""
File storage implementation using the local filesystem.

Files are stored in the following directory structure:
/tmp/vllm_files/<user_id>/<file_id>

user_id is not
"""

def __init__(self, base_path: str = "/tmp/vllm_files"):
self.base_path = base_path
logger.info(f"Using local file storage at {base_path}")
os.makedirs(base_path, exist_ok=True)

def _get_user_path(self, user_id: str) -> str:
"""Get user-specific directory path"""
user_path = os.path.join(self.base_path, user_id)
os.makedirs(user_path, exist_ok=True)
return user_path

async def save_file(
self,
file_id: str = None,
user_id: str = Storage.DEFAULT_USER_ID,
file_name: str = None,
content: bytes = None,
purpose: str = Storage.DEFAULT_PURPOSE,
) -> OpenAIFile:
"""Save file content to disk"""
if content is None:
raise ValueError("Content cannot be None")
if file_id is None:
file_id = f"file-{uuid.uuid4().hex[:6]}"

# Save file to disk. File name is the same as file_id.
user_path = self._get_user_path(user_id)
file_path = os.path.join(user_path, file_id)
async with aiofiles.open(file_path, "wb") as f:
await f.write(content)

# Create OpenAIFile object.
file_size = len(content)
created_at = int(os.path.getctime(file_path))
return OpenAIFile(
id=file_id,
object="file",
bytes=file_size,
created_at=created_at,
filename=file_name or file_id,
purpose=purpose,
)

async def save_file_chunk(
self,
file_id: str,
user_id: str = Storage.DEFAULT_USER_ID,
chunk: bytes = None,
purpose: str = Storage.DEFAULT_PURPOSE,
offset: int = 0,
) -> None:
"""Save file chunk to disk at specified offset"""
user_path = self._get_user_path(user_id)
file_path = os.path.join(user_path, file_id)
async with aiofiles.open(file_path, "r+b") as f:
await f.seek(offset)
await f.write(chunk)

async def get_file(
self, file_id: str, user_id: str = Storage.DEFAULT_USER_ID
) -> OpenAIFile:
"""Retrieve file metadata from disk"""
user_path = self._get_user_path(user_id)
file_path = os.path.join(user_path, file_id)
if not os.path.exists(file_path):
logger.error(f"File {file_id} not found, returning empty file")
raise FileNotFoundError(f"File {file_id} not found")
file_size = os.path.getsize(file_path)
created_at = int(os.path.getctime(file_path))
return OpenAIFile(
id=file_id,
object="file",
bytes=file_size,
created_at=created_at,
filename=file_id,
purpose=Storage.DEFAULT_PURPOSE,
)

async def get_file_content(
self, file_id: str, user_id: str = Storage.DEFAULT_USER_ID
) -> bytes:
"""Retrieve file content from disk"""
user_path = self._get_user_path(user_id)
file_path = os.path.join(user_path, file_id)
if not os.path.exists(file_path):
raise FileNotFoundError(f"File {file_id} not found")
async with aiofiles.open(file_path, "rb") as f:
return await f.read()

async def list_files(self, user_id: str = Storage.DEFAULT_USER_ID) -> List[str]:
"""List all files in storage"""
user_path = self._get_user_path(user_id)
return os.listdir(user_path)

async def delete_file(self, file_id: str, user_id: str = Storage.DEFAULT_USER_ID):
"""Delete file from disk"""
user_path = self._get_user_path(user_id)
file_path = os.path.join(user_path, file_id)
if os.path.exists(file_path):
os.remove(file_path)
48 changes: 48 additions & 0 deletions src/vllm_router/files/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from dataclasses import dataclass
from typing import Literal


@dataclass
class OpenAIFile:
"""
Represents a file object

https://platform.openai.com/docs/api-reference/files/object
"""

id: str
object: Literal["file"]
bytes: int
created_at: int
filename: str
purpose: str

@classmethod
def from_dict(cls, data: dict) -> "OpenAIFile":
return cls(
id=data["id"],
object=data["object"],
bytes=data["bytes"],
created_at=data["created_at"],
filename=data["filename"],
purpose=data["purpose"],
)

def to_dict(self) -> dict:
return {
"id": self.id,
"object": self.object,
"bytes": self.bytes,
"created_at": self.created_at,
"filename": self.filename,
"purpose": self.purpose,
}

def metadata(self) -> dict:
return {
"id": self.id,
"bytes": self.bytes,
"created_at": self.created_at,
"filename": self.filename,
"purpose": self.purpose,
}
59 changes: 59 additions & 0 deletions src/vllm_router/files/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from abc import ABC, abstractmethod
from typing import List

from vllm_router.files.files import OpenAIFile


class Storage(ABC):
DEFAULT_USER_ID = "uid_default"
DEFAULT_PURPOSE = "batch"

@abstractmethod
async def save_file(
self,
file_id: str = None,
user_id: str = DEFAULT_USER_ID,
file_name: str = None,
content: bytes = None,
purpose: str = DEFAULT_PURPOSE,
) -> OpenAIFile:
pass

@abstractmethod
async def save_file_chunk(
self,
file_id: str,
user_id: str = DEFAULT_USER_ID,
chunk: bytes = None,
purpose: str = DEFAULT_PURPOSE,
offset: int = 0,
) -> None:
pass

async def get_file(
self, file_id: str, user_id: str = DEFAULT_USER_ID
) -> OpenAIFile:
pass

@abstractmethod
async def get_file_content(
self, file_id: str, user_id: str = DEFAULT_USER_ID
) -> bytes:
pass

@abstractmethod
async def list_files(self, user_id: str = DEFAULT_USER_ID) -> List[str]:
pass

@abstractmethod
async def delete_file(self, file_id: str, user_id: str = DEFAULT_USER_ID):
pass


def initialize_storage(storage_type: str, base_path: str = None) -> Storage:
if storage_type == "local_file":
from vllm_router.files.file_storage import FileStorage

return FileStorage(base_path)
else:
raise ValueError(f"Unsupported storage type: {storage_type}")
Loading