Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""add_auto_update_jobs_table

Revision ID: 75cf43005ed1
Revises: 331d3d5b3b30
Create Date: 2026-02-03 16:08:14.339217
Updated Date: 2026-03-02 15:02:00

"""

from collections.abc import Sequence

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = '75cf43005ed1'
down_revision: str | None = '331d3d5b3b30'
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
autoupdateresult_enum = postgresql.ENUM(
'NO_COMPLETED_VERSION',
'NO_CHANGES',
'CONFIG_INCOMPATIBLE',
'CONFIG_UPDATED',
'REINDEX_TRIGGERED',
name='autoupdateresult',
create_type=False,
)
autoupdateresult_enum.create(op.get_bind(), checkfirst=True)

op.create_table(
'auto_update_jobs',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('channel_dataset_id', sa.Integer(), nullable=False),
sa.Column('base_version_id', sa.Integer(), nullable=True),
sa.Column('created_version_id', sa.Integer(), nullable=True),
sa.Column(
'status',
postgresql.ENUM(name='preprocessingstatusenum', create_type=False),
nullable=False,
),
sa.Column('result', autoupdateresult_enum, nullable=True),
sa.Column('details', sa.String(), nullable=True),
sa.Column('reason_for_failure', sa.String(), nullable=True),
sa.Column(
'created_at',
sa.DateTime(timezone=True),
server_default=sa.text('now()'),
nullable=False,
),
sa.Column(
'updated_at',
sa.DateTime(timezone=True),
server_default=sa.text('now()'),
nullable=False,
),
sa.ForeignKeyConstraint(['base_version_id'], ['channel_dataset_versions.id']),
sa.ForeignKeyConstraint(['channel_dataset_id'], ['channel_datasets.id']),
sa.ForeignKeyConstraint(['created_version_id'], ['channel_dataset_versions.id']),
sa.PrimaryKeyConstraint('id'),
)

op.create_index(
'ix_auto_update_jobs_channel_dataset_id',
'auto_update_jobs',
['channel_dataset_id'],
)


def downgrade() -> None:
op.drop_index('ix_auto_update_jobs_channel_dataset_id', table_name='auto_update_jobs')
op.drop_table('auto_update_jobs')
op.execute("DROP TYPE IF EXISTS autoupdateresult")
62 changes: 62 additions & 0 deletions statgpt/admin/routers/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,3 +473,65 @@ async def clear_channel_dataset_versions_data(
await DataSetService(session).clear_channel_dataset_versions_data(
channel_id=channel_id, dataset_id=dataset_id, auth_context=SystemUserAuthContext()
)


@router.post(
path="/{channel_id}/datasets/{dataset_id}/versions/auto-update-jobs",
status_code=status.HTTP_202_ACCEPTED,
)
async def trigger_auto_update(
background_tasks: BackgroundTasks,
channel_id: int,
dataset_id: int,
session: AsyncSession = Depends(models.get_session),
) -> schemas.AutoUpdateJob:
"""Trigger an auto-update check for a channel dataset.

Creates an auto-update job that checks for changes and reindexes if needed.
The job runs in the background. Poll the job status to track progress.
"""
return await DataSetService(session).trigger_auto_update(
background_tasks=background_tasks,
channel_id=channel_id,
dataset_id=dataset_id,
auth_context=SystemUserAuthContext(),
)


@router.get(path="/{channel_id}/datasets/{dataset_id}/versions/auto-update-jobs")
async def get_auto_update_jobs(
channel_id: int,
dataset_id: int,
limit: int = 100,
offset: int = 0,
session: AsyncSession = Depends(models.get_session),
_=Depends(cancel_on_disconnect),
) -> schemas.ListResponse[schemas.AutoUpdateJob]:
"""Get a paginated list of auto-update jobs for a channel dataset."""
service = DataSetService(session)
channel_dataset = await service.get_channel_dataset_model_or_raise(
channel_id=channel_id, dataset_id=dataset_id
)
jobs = await service.get_auto_update_jobs(
channel_dataset_id=channel_dataset.id,
limit=limit,
offset=offset,
)
total = await service.get_auto_update_jobs_count(channel_dataset.id)
return schemas.ListResponse[schemas.AutoUpdateJob](
data=jobs,
limit=limit,
offset=offset,
count=len(jobs),
total=total,
)


@router.get(path="/auto-update-jobs/{job_id}")
async def get_auto_update_job_by_id(
job_id: int,
session: AsyncSession = Depends(models.get_session),
_=Depends(cancel_on_disconnect),
) -> schemas.AutoUpdateJob:
"""Get an auto-update job by ID for polling status."""
return await DataSetService(session).get_auto_update_job_by_id(job_id)
Loading
Loading