Skip to content

Commit fbdf1a1

Browse files
Refactor auto-update script: log exceptions, simplify channel discovery
- Log exceptions from asyncio.gather in job processing and deduplication - Move auto-update channel filtering from ChannelService to the script - Remove unused get_auto_update_channels and selectinload import - Use most_common() for deterministic result summary ordering - Batch commit instead of per-iteration flush when creating jobs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a60bf13 commit fbdf1a1

File tree

3 files changed

+23
-28
lines changed

3 files changed

+23
-28
lines changed

statgpt/admin/auto_update.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,18 @@ async def _discover_and_create_jobs() -> list[schemas.AutoUpdateJob]:
2424
"""Find auto-update channels and create jobs for their datasets."""
2525
_log.info(_SEPARATOR)
2626
async with get_session_contex_manager() as session:
27-
channels = await AdminPortalChannelService(session).get_auto_update_channels()
28-
_log.info(f"Found {len(channels)} channel(s) with auto-update enabled")
29-
30-
if not channels:
27+
channel_service = AdminPortalChannelService(session)
28+
all_channels = await channel_service.get_channels_schemas(limit=None, offset=0)
29+
channel_ids = [
30+
ch.id
31+
for ch in all_channels
32+
if (dq := ch.details.data_query) is not None and dq.details.allow_auto_update
33+
]
34+
_log.info(f"Found {len(channel_ids)} channel(s) with auto-update enabled")
35+
36+
if not channel_ids:
3137
return []
3238

33-
channel_ids = [ch.id for ch in channels]
3439
return await AdminPortalDataSetService(session).create_auto_update_jobs(channel_ids)
3540

3641

@@ -39,13 +44,16 @@ async def _process_jobs(jobs: list[schemas.AutoUpdateJob], auth_context: AuthCon
3944
_log.info(_SEPARATOR)
4045
_log.info(f"Created {len(jobs)} auto-update job(s), starting processing...")
4146

42-
await asyncio.gather(
47+
results = await asyncio.gather(
4348
*(
4449
auto_update_in_background_task(auto_update_job_id=job.id, auth_context=auth_context)
4550
for job in jobs
4651
),
4752
return_exceptions=True,
4853
)
54+
for job, result in zip(jobs, results):
55+
if isinstance(result, Exception):
56+
_log.error(f"Auto-update job {job.id} failed with exception:", exc_info=result)
4957

5058

5159
async def _get_reindex_channel_ids(job_ids: list[int]) -> set[int]:
@@ -81,7 +89,7 @@ async def _deduplicate_channels(channel_ids: set[int], auth_context: AuthContext
8189
f"Running deduplication for {len(channel_ids)} channel(s) "
8290
f"with reindex: {sorted(channel_ids)}"
8391
)
84-
await asyncio.gather(
92+
results = await asyncio.gather(
8593
*(
8694
deduplicate_dimensions_in_background_task(
8795
channel_id=channel_id, auth_context=auth_context
@@ -90,6 +98,11 @@ async def _deduplicate_channels(channel_ids: set[int], auth_context: AuthContext
9098
),
9199
return_exceptions=True,
92100
)
101+
for channel_id, result in zip(channel_ids, results):
102+
if isinstance(result, Exception):
103+
_log.error(
104+
f"Deduplication for channel {channel_id} failed with exception:", exc_info=result
105+
)
93106
_log.info("Deduplication complete")
94107

95108

statgpt/admin/services/dataset.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2020,8 +2020,8 @@ async def create_auto_update_jobs(self, channel_ids: list[int]) -> list[schemas.
20202020
status=StatusEnum.QUEUED,
20212021
)
20222022
self._session.add(job)
2023-
await self._session.flush()
20242023
jobs.append(job)
2024+
await self._session.commit()
20252025

20262026
# Log per-channel summary
20272027
channels_by_id = {cd.channel.id: cd.channel for cd in channel_datasets}
@@ -2033,7 +2033,6 @@ async def create_auto_update_jobs(self, channel_ids: list[int]) -> list[schemas.
20332033
f"'{ch.deployment_id}' (id={ch_id})"
20342034
)
20352035

2036-
await self._session.commit()
20372036
return [schemas.AutoUpdateJob.model_validate(job, from_attributes=True) for job in jobs]
20382037

20392038
async def get_reindex_channel_ids(self, job_ids: list[int]) -> set[int]:
@@ -2097,7 +2096,7 @@ def _format_result_summary(jobs: list[models.AutoUpdateJob]) -> str:
20972096
result_counts = Counter(job_statuses)
20982097

20992098
parts: list[str] = []
2100-
for job_status, count in result_counts.items():
2099+
for job_status, count in result_counts.most_common():
21012100
part = f"{count} {job_status}"
21022101
if job_status in reindex_statuses:
21032102
breakdown = ", ".join(f"{c} {s}" for s, c in reindex_statuses[job_status].items())

statgpt/common/services/channel.py

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from fastapi import HTTPException, status
22
from sqlalchemy import select
33
from sqlalchemy.ext.asyncio import AsyncSession
4-
from sqlalchemy.orm import selectinload
54
from sqlalchemy.sql.expression import func
65

76
import statgpt.common.models as models
@@ -28,7 +27,7 @@ async def get_channels_db(self, limit: int | None, offset: int) -> list[models.C
2827
q_result = await self._session.execute(query)
2928
return [item for item in q_result.scalars().all()]
3029

31-
async def get_channels_schemas(self, limit: int, offset: int) -> list[schemas.Channel]:
30+
async def get_channels_schemas(self, limit: int | None, offset: int) -> list[schemas.Channel]:
3231
channels = await self.get_channels_db(limit, offset)
3332
return [ChannelSerializer.db_to_schema(item) for item in channels]
3433

@@ -60,19 +59,3 @@ def is_channel_hybrid(channel: models.Channel) -> bool:
6059
return False
6160
indexer_version = channel_config.data_query.details.indexer_version
6261
return indexer_version == schemas.IndexerVersion.hybrid
63-
64-
@staticmethod
65-
def _is_auto_update_enabled(channel: models.Channel) -> bool:
66-
"""Returns `True` if the channel has auto-update enabled in data_query config."""
67-
config = schemas.ChannelConfig.model_validate(channel.details)
68-
if config.data_query is None:
69-
return False
70-
return config.data_query.details.allow_auto_update
71-
72-
async def get_auto_update_channels(self) -> list[models.Channel]:
73-
"""Get all channels with auto-update enabled, with mapped_datasets eager-loaded."""
74-
result = await self._session.execute(
75-
select(models.Channel).options(selectinload(models.Channel.mapped_datasets))
76-
)
77-
channels = list(result.scalars().all())
78-
return [ch for ch in channels if self._is_auto_update_enabled(ch)]

0 commit comments

Comments
 (0)