Skip to content

Commit 8244e51

Browse files
zhangjyrJingyuan Zhang
andauthored
[Feat] Support OpenAI Files API and refactor storage library to support local, s3, tos, and redis (for metadata) (#1583)
* Basic metadata server with new batchjob work with legacy local batch driver. * Connect job_manager with batch API * Finish e2e batch server test. * lint-fix * Review fix. * Refactored storage by add general storage adapters optimized for batch job. Original storage in batch folder now behavior as a group of helper that bridge raw data and batch request. Local driver (request_proxy) has more logic to work with storage adapters. * Fix local storage errors to make e2e tests pass. * Provide a special implementation for small file multipart upload. * Support redis as background and add pagination to storage list. * Conditionally enabling redis tests. * Adjust unit tests' folder. * Lint fix * bug fix * Support metadata file API, including OpenAI files compatible get file and using Head method. * Refactor multi-upload logic to unify small and large file multi-upload * Format * Fix test_path_traversal_protection in test_local_storage.py * Fix generate_filename bug. * Lint fix Signed-off-by: Jingyuan Zhang <[email protected]> Co-authored-by: Jingyuan Zhang <[email protected]>
1 parent 0991af1 commit 8244e51

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+8514
-905
lines changed

python/aibrix/aibrix/batch/driver.py

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,38 @@
1313
# limitations under the License.
1414

1515
import asyncio
16-
from typing import Optional
16+
from typing import Any, Dict, List, Optional
1717

1818
import aibrix.batch.storage as _storage
1919
from aibrix.batch.constant import DEFAULT_JOB_POOL_SIZE
2020
from aibrix.batch.job_entity import JobEntityManager
2121
from aibrix.batch.job_manager import JobManager
2222
from aibrix.batch.request_proxy import RequestProxy
2323
from aibrix.batch.scheduler import JobScheduler
24+
from aibrix.batch.storage.batch_metastore import initialize_batch_metastore
2425
from aibrix.metadata.logger import init_logger
26+
from aibrix.storage import StorageType
2527

2628
logger = init_logger(__name__)
2729

2830

2931
class BatchDriver:
30-
def __init__(self, job_entity_manager: Optional[JobEntityManager] = None):
32+
def __init__(
33+
self,
34+
job_entity_manager: Optional[JobEntityManager] = None,
35+
storage_type: StorageType = StorageType.AUTO,
36+
metastore_type: StorageType = StorageType.AUTO,
37+
):
3138
"""
3239
This is main entrance to bind all components to serve job requests.
3340
"""
34-
_storage.initialize_storage()
41+
_storage.initialize_storage(storage_type)
42+
initialize_batch_metastore(metastore_type)
3543
self._storage = _storage
3644
self._job_manager: JobManager = JobManager(job_entity_manager)
3745
self._scheduler: Optional[JobScheduler] = None
3846
self._scheduling_task: Optional[asyncio.Task] = None
39-
self._proxy: RequestProxy = RequestProxy(self._storage, self._job_manager)
47+
self._proxy: RequestProxy = RequestProxy(self._job_manager)
4048
# Only create jobs_running_loop if JobEntityManager does not have its own sched
4149
if not job_entity_manager or not job_entity_manager.is_scheduler_enabled():
4250
self._scheduler = JobScheduler(self._job_manager, DEFAULT_JOB_POOL_SIZE)
@@ -47,14 +55,11 @@ def __init__(self, job_entity_manager: Optional[JobEntityManager] = None):
4755
def job_manager(self) -> JobManager:
4856
return self._job_manager
4957

50-
def upload_batch_data(self, input_file_name):
51-
file_id = self._storage.submit_job_input(input_file_name)
52-
return file_id
58+
async def upload_job_data(self, input_file_name) -> str:
59+
return await self._storage.upload_input_data(input_file_name)
5360

54-
def retrieve_job_result(self, file_id):
55-
num_requests = _storage.get_job_num_request(file_id)
56-
req_results = _storage.get_job_results(file_id, 0, num_requests)
57-
return req_results
61+
async def retrieve_job_result(self, file_id) -> List[Dict[str, Any]]:
62+
return await self._storage.download_output_data(file_id)
5863

5964
async def jobs_running_loop(self):
6065
"""
@@ -66,7 +71,17 @@ async def jobs_running_loop(self):
6671
while True:
6772
one_job = await self._scheduler.round_robin_get_job()
6873
if one_job:
69-
await self._proxy.execute_queries(one_job)
74+
try:
75+
await self._proxy.execute_queries(one_job)
76+
except Exception as e:
77+
job = self._job_manager.mark_job_failed(one_job)
78+
logger.error(
79+
"Failed to execute job",
80+
job_id=one_job,
81+
status=job.status.state.value,
82+
error=e,
83+
)
84+
raise
7085
await asyncio.sleep(0)
7186

7287
async def close(self):
@@ -84,15 +99,15 @@ async def close(self):
8499
if self._scheduler:
85100
await self._scheduler.close()
86101

87-
def clear_job(self, job_id):
102+
async def clear_job(self, job_id):
88103
job = self._job_manager.get_job(job_id)
89104
if job is None:
90105
return
91106

92107
self._job_manager.job_deleted_handler(job)
93108
if self._job_manager.get_job(job_id) is None:
94-
self._storage.delete_job(job.spec.input_file_id)
109+
await self._storage.remove_job_data(job.spec.input_file_id)
95110
if job.status.output_file_id is not None:
96-
self._storage.delete_job(job.status.output_file_id)
111+
await self._storage.remove_job_data(job.status.output_file_id)
97112
if job.status.error_file_id is not None:
98-
self._storage.delete_job(job.status.error_file_id)
113+
await self._storage.remove_job_data(job.status.error_file_id)

python/aibrix/aibrix/batch/job_entity/batch_job.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ class BatchJobState(str, Enum):
5858
CANCELLING = "cancelling"
5959
CANCELED = "canceled"
6060

61+
def is_finished(self):
62+
return self in [
63+
BatchJobState.COMPLETED,
64+
BatchJobState.FAILED,
65+
BatchJobState.CANCELED,
66+
BatchJobState.EXPIRED,
67+
]
68+
6169

6270
class BatchJobErrorCode(str, Enum):
6371
"""Error codes for batch job."""
@@ -228,6 +236,9 @@ class RequestCountStats(NoExtraBaseModel):
228236
"""Holds the statistics on the processing of the batch."""
229237

230238
total: int = Field(default=0, description="Total number of requests in the batch")
239+
launched: int = Field(
240+
default=0, description="Number of requests that have been launched"
241+
)
231242
completed: int = Field(
232243
default=0,
233244
description="Number of requests that have been successfully completed",
@@ -299,6 +310,17 @@ class BatchJobStatus(NoExtraBaseModel):
299310
description="List of errors that occurred during the batch job processing",
300311
)
301312

313+
temp_output_file_id: Optional[str] = Field(
314+
default=None,
315+
alias="tempOutputFileID",
316+
description="The ID of the file containing the results of successfully completed requests",
317+
)
318+
temp_error_file_id: Optional[str] = Field(
319+
default=None,
320+
alias="tempErrorFileID",
321+
description="The ID of the file containing details for any failed requests",
322+
)
323+
302324
output_file_id: Optional[str] = Field(
303325
default=None,
304326
alias="outputFileID",
@@ -310,8 +332,8 @@ class BatchJobStatus(NoExtraBaseModel):
310332
description="The ID of the file containing details for any failed requests",
311333
)
312334

313-
request_counts: Optional[RequestCountStats] = Field(
314-
default=None,
335+
request_counts: RequestCountStats = Field(
336+
default_factory=RequestCountStats,
315337
alias="requestCounts",
316338
description="Statistics on the processing of the batch",
317339
)

0 commit comments

Comments
 (0)