|
2 | 2 | import json |
3 | 3 | from typing import Dict, List, Optional |
4 | 4 |
|
5 | | -from sqlalchemy import delete, select |
| 5 | +from sqlalchemy import Delete, delete, select |
6 | 6 | from sqlalchemy.orm import joinedload |
7 | 7 |
|
8 | 8 | from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT |
@@ -49,27 +49,29 @@ async def delete_metrics(): |
49 | 49 | finished_timestamp_micro_cutoff = ( |
50 | 50 | now_timestamp_micro - settings.SERVER_METRICS_FINISHED_TTL_SECONDS * 1_000_000 |
51 | 51 | ) |
| 52 | + await asyncio.gather( |
| 53 | + _execute_delete_statement( |
| 54 | + delete(JobMetricsPoint).where( |
| 55 | + JobMetricsPoint.job_id.in_( |
| 56 | + select(JobModel.id).where(JobModel.status.in_([JobStatus.RUNNING])) |
| 57 | + ), |
| 58 | + JobMetricsPoint.timestamp_micro < running_timestamp_micro_cutoff, |
| 59 | + ) |
| 60 | + ), |
| 61 | + _execute_delete_statement( |
| 62 | + delete(JobMetricsPoint).where( |
| 63 | + JobMetricsPoint.job_id.in_( |
| 64 | + select(JobModel.id).where(JobModel.status.in_(JobStatus.finished_statuses())) |
| 65 | + ), |
| 66 | + JobMetricsPoint.timestamp_micro < finished_timestamp_micro_cutoff, |
| 67 | + ) |
| 68 | + ), |
| 69 | + ) |
| 70 | + |
| 71 | + |
| 72 | +async def _execute_delete_statement(stmt: Delete) -> None: |
52 | 73 | async with get_session_ctx() as session: |
53 | | - await asyncio.gather( |
54 | | - session.execute( |
55 | | - delete(JobMetricsPoint).where( |
56 | | - JobMetricsPoint.job_id.in_( |
57 | | - select(JobModel.id).where(JobModel.status.in_([JobStatus.RUNNING])) |
58 | | - ), |
59 | | - JobMetricsPoint.timestamp_micro < running_timestamp_micro_cutoff, |
60 | | - ) |
61 | | - ), |
62 | | - session.execute( |
63 | | - delete(JobMetricsPoint).where( |
64 | | - JobMetricsPoint.job_id.in_( |
65 | | - select(JobModel.id).where( |
66 | | - JobModel.status.in_(JobStatus.finished_statuses()) |
67 | | - ) |
68 | | - ), |
69 | | - JobMetricsPoint.timestamp_micro < finished_timestamp_micro_cutoff, |
70 | | - ) |
71 | | - ), |
72 | | - ) |
| 74 | + await session.execute(stmt) |
73 | 75 | await session.commit() |
74 | 76 |
|
75 | 77 |
|
|
0 commit comments