Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions statgpt/admin/fix_statuses.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
Script to fix statuses for channel dataset versions after migrations.
Sets failed status for any channel dataset versions that were left in processing state.
Script to fix statuses after migrations or crashes.
Sets failed status for any records that were left in a non-final state.
"""

import asyncio
Expand All @@ -12,12 +12,38 @@
_log = logging.getLogger(__name__)


async def fix_statuses():
"""Fix statuses from previous runs by setting failed status for stuck channel dataset versions."""
async with get_session_contex_manager() as session:
service = AdminPortalDataSetService(session)
await service.set_failed_status_for_channel_dataset_version()
_log.info("Successfully fixed statuses for channel dataset versions")
async def _fix_channel_dataset_versions() -> None:
try:
async with get_session_contex_manager() as session:
service = AdminPortalDataSetService(session)
await service.set_failed_status_for_channel_dataset_version()
except Exception:
_log.exception("Error fixing channel dataset version statuses:")


async def _fix_jobs() -> None:
try:
async with get_session_contex_manager() as session:
service = AdminPortalDataSetService(session)
await service.set_failed_status_for_stuck_jobs()
except Exception:
_log.exception("Error fixing job statuses:")


async def _fix_auto_update_jobs() -> None:
try:
async with get_session_contex_manager() as session:
service = AdminPortalDataSetService(session)
await service.set_failed_status_for_stuck_auto_update_jobs()
except Exception:
_log.exception("Error fixing auto-update job statuses:")


async def fix_statuses() -> None:
"""Fix statuses from previous runs by setting failed status for stuck records."""
await _fix_channel_dataset_versions()
await _fix_jobs()
await _fix_auto_update_jobs()


async def main():
Expand Down
61 changes: 49 additions & 12 deletions statgpt/admin/services/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1209,33 +1209,70 @@ async def clear_channel_dataset_versions_data(
else:
_log.info("No versions to clear data for.")

async def set_failed_status_for_channel_dataset_version(self) -> None:
"""Sets the status of all not-completed channel dataset versions to FAILED."""
async def _set_failed_status(
self,
model: Any,
status_column: Any,
status_field_name: str,
) -> int:
"""Sets the status of all stuck records to FAILED for the given model.

Returns:
the number of updated rows.
"""
table_name = model.__tablename__

_log.info("Setting FAILED status for all non-completed channel dataset versions...")
_log.info(f"Setting FAILED status for all non-completed {table_name}...")

query = (
update(models.ChannelDatasetVersion)
update(model)
.where(
models.ChannelDatasetVersion.preprocessing_status.notin_(
StatusEnum.final_statuses()
),
models.ChannelDatasetVersion.updated_at < text("NOW() - INTERVAL '12 hours'"),
status_column.notin_(StatusEnum.final_statuses()),
model.updated_at < text("NOW() - INTERVAL '12 hours'"),
)
.values(
preprocessing_status=StatusEnum.FAILED,
**{status_field_name: StatusEnum.FAILED},
reason_for_failure=func.coalesce(
models.ChannelDatasetVersion.reason_for_failure,
"The version had invalid status.",
model.reason_for_failure, # type: ignore[attr-defined]
"Stuck in a non-final status with no recorded failure reason."
" Marked as FAILED by fix_statuses script.",
),
updated_at=func.now(),
)
)

result = await self._session.execute(query)
row_count: int = result.rowcount # type: ignore[attr-defined]

_log.info(f"Updated {row_count} {table_name} record(s) to FAILED status")
return row_count

async def set_failed_status_for_channel_dataset_version(self) -> None:
"""Sets the status of all not-completed channel dataset versions to FAILED."""
await self._set_failed_status(
models.ChannelDatasetVersion,
models.ChannelDatasetVersion.preprocessing_status,
"preprocessing_status",
)
await self._session.commit()

_log.info(f"Updated {result.rowcount} channel dataset version(s) to FAILED status") # type: ignore[attr-defined]
async def set_failed_status_for_stuck_jobs(self) -> None:
"""Sets the status of all stuck Job records to FAILED."""
await self._set_failed_status(
models.Job,
models.Job.status,
"status",
)
await self._session.commit()

async def set_failed_status_for_stuck_auto_update_jobs(self) -> None:
"""Sets the status of all stuck AutoUpdateJob records to FAILED."""
await self._set_failed_status(
models.AutoUpdateJob,
models.AutoUpdateJob.status,
"status",
)
await self._session.commit()

async def reload_all_indicators(
self,
Expand Down
Loading