Skip to content

Commit f1dd814

Browse files
committed
feat: improve job pagination
1 parent 9a4aff1 commit f1dd814

File tree

2 files changed

+62
-86
lines changed

2 files changed

+62
-86
lines changed

api/app/extensions/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
class RQSchedulerRegistry(Scheduler):
77
"""Custom Scheduler class for RQ Manager."""
88

9-
def get_jobs_ids(self, until=None, with_times=False, offset=None, length=None) -> list[str]:
9+
def get_job_ids(self, until=None, with_times=False, offset=None, length=None) -> list[str]:
1010
"""
1111
Returns a list of job ids that will be queued until the given
1212
time. If no 'until' argument is given all jobs are returned.

api/app/services/jobs.py

Lines changed: 61 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Job service that interacts with RQ to get job information."""
22

33
import logging
4+
import datetime as dt
45

56
from typing import Optional
67

@@ -70,107 +71,82 @@ def list_jobs(self, filters: JobListFilters | None = None) -> tuple[list[JobDeta
7071
if filters is None:
7172
filters = JobListFilters()
7273

73-
all_jobs: list[JobDetails] = []
74-
total_count = 0
75-
offset = filters.offset or 0
76-
limit = filters.limit or 50
77-
7874
try:
7975
queues = Queue.all(connection=self.redis)
76+
except Exception as e:
77+
logger.error(f"Error listing queues: {e}")
78+
return [], 0
8079

81-
# Collect jobs from all queues and registries
82-
for queue in queues:
83-
if filters.queue and queue.name != filters.queue:
80+
collected: list["JobDetails"] = []
81+
total_count = 0
82+
83+
for queue in queues:
84+
if filters.queue and queue.name != filters.queue:
85+
continue
86+
87+
job_sources = [
88+
(queue.get_job_ids(), JobStatus.QUEUED),
89+
(StartedJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.STARTED),
90+
(FinishedJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.FINISHED),
91+
(FailedJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.FAILED),
92+
(DeferredJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.DEFERRED),
93+
(ScheduledJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.SCHEDULED),
94+
(CanceledJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.CANCELED),
95+
]
96+
97+
if settings.APP_ENABLE_RQ_SCHEDULER:
98+
job_sources.append((RQSchedulerRegistry(queue.name, connection=self.redis).get_job_ids(), JobStatus.SCHEDULED))
99+
100+
for job_ids, job_status in job_sources:
101+
if filters.status and job_status != filters.status:
84102
continue
85103

86-
job_sources = [
87-
(queue.get_job_ids(), JobStatus.QUEUED),
88-
(StartedJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.STARTED),
89-
(FinishedJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.FINISHED),
90-
(FailedJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.FAILED),
91-
(DeferredJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.DEFERRED),
92-
(ScheduledJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.SCHEDULED),
93-
(CanceledJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.CANCELED),
94-
]
95-
96-
if settings.APP_ENABLE_RQ_SCHEDULER:
97-
job_sources.append(
98-
(RQSchedulerRegistry(queue.name, connection=self.redis).get_jobs_ids(), JobStatus.SCHEDULED)
99-
)
100-
101-
for job_ids, job_status in job_sources:
102-
# Pre-filter by status to avoid unnecessary job fetching
103-
if filters.status and job_status != filters.status:
104-
total_count += len(job_ids)
105-
continue
104+
total_count += len(job_ids)
105+
if not job_ids:
106+
continue
106107

107-
# Apply early limiting for finished/failed jobs to avoid fetching too many
108-
if job_status in [JobStatus.FINISHED, JobStatus.FAILED]:
109-
# For these registries, we get the most recent jobs first
110-
# Calculate how many we need based on remaining limit
111-
remaining_needed = limit - len(all_jobs) + offset
112-
if remaining_needed > 0:
113-
job_ids = job_ids[-min(remaining_needed, len(job_ids)) :]
114-
else:
115-
# We already have enough jobs, just count
116-
total_count += len(job_ids)
117-
continue
108+
page_ids = job_ids[filters.offset : filters.offset + filters.limit]
118109

119-
total_count += len(job_ids)
110+
try:
111+
jobs = Job.fetch_many(page_ids, connection=self.redis)
112+
except Exception as e:
113+
logger.warning(f"Error fetching jobs {page_ids}: {e}")
114+
continue
120115

121-
# Only fetch jobs if we haven't reached our pagination limit
122-
if len(all_jobs) >= (offset + limit):
116+
for job in jobs:
117+
if job is None:
123118
continue
124119

125-
for job_id in job_ids:
126-
try:
127-
if len(all_jobs) >= (offset + limit):
128-
break
129-
130-
job = Job.fetch(job_id, connection=self.redis)
131-
132-
if filters.function and job.func_name != filters.function:
133-
continue
134-
135-
if filters.worker and job.worker_name != filters.worker:
136-
continue
137-
138-
if filters.search:
139-
search_text = f"{job.func_name} {str(job.args)} {str(job.kwargs)}".lower()
140-
if filters.search.lower() not in search_text:
141-
continue
142-
143-
# Filter by tags (if we can extract them)
144-
if filters.tags and hasattr(job, 'meta') and job.meta:
145-
job_tags = job.meta.get('tags', [])
146-
if not any(tag in job_tags for tag in filters.tags):
147-
continue
148-
149-
job_detail = self._map_rq_job_to_schema(job, queue.name)
150-
job_detail.status = job_status # Override with registry status
151-
152-
if filters.created_after and job_detail.created_at < filters.created_after:
153-
continue
154-
if filters.created_before and job_detail.created_at > filters.created_before:
155-
continue
120+
if filters.function and job.func_name != filters.function:
121+
continue
122+
if filters.worker and job.worker_name != filters.worker:
123+
continue
124+
if filters.search:
125+
search_text = f"{job.func_name} {job.args} {job.kwargs}".lower()
126+
if filters.search.lower() not in search_text:
127+
continue
128+
if filters.tags and hasattr(job, "meta"):
129+
job_tags = job.meta.get("tags", [])
130+
if not any(tag in job_tags for tag in filters.tags):
131+
continue
156132

157-
all_jobs.append(job_detail)
133+
job_detail = self._map_rq_job_to_schema(job, queue.name)
134+
job_detail.status = job_status
158135

159-
except Exception as e:
160-
logger.warning(f"Error processing job {job_id}: {e}")
136+
if filters.created_after and job_detail.created_at < filters.created_after:
137+
continue
138+
if filters.created_before and job_detail.created_at > filters.created_before:
139+
continue
161140

162-
except Exception as e:
163-
logger.error(f"Error listing jobs: {e}")
141+
collected.append(job_detail)
164142

165143
sort_by = filters.sort_by or "created_at"
166144
sort_order = filters.sort_order or "desc"
167-
168-
def get_sort_key(job):
169-
value = getattr(job, sort_by, get_timezone_aware_min())
170-
return ensure_timezone_aware(value)
171-
172-
all_jobs.sort(key=get_sort_key, reverse=(sort_order == "desc"))
173-
return all_jobs, total_count
145+
collected.sort(
146+
key=lambda j: getattr(j, sort_by, dt.datetime.min),
147+
reverse=(sort_order == "desc"),
148+
)
149+
return collected, total_count
174150

175151
def get_jobs_for_worker(self, worker_name: str) -> list[JobDetails]:
176152
"""Get all jobs associated with a specific worker.

0 commit comments

Comments
 (0)