Skip to content

Commit 750f8db

Browse files
committed
fix: job order
1 parent f1dd814 commit 750f8db

File tree

3 files changed

+71
-81
lines changed

3 files changed

+71
-81
lines changed

api/app/extensions/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
class RQSchedulerRegistry(Scheduler):
77
"""Custom Scheduler class for RQ Manager."""
88

9-
def get_job_ids(self, until=None, with_times=False, offset=None, length=None) -> list[str]:
9+
def get_job_ids(self, until=None, with_times=False, offset=None, length=None, desc=False, cleanup=False) -> list[str]:
1010
"""
1111
Returns a list of job ids that will be queued until the given
1212
time. If no 'until' argument is given all jobs are returned.

api/app/services/jobs.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -85,32 +85,41 @@ def list_jobs(self, filters: JobListFilters | None = None) -> tuple[list[JobDeta
8585
continue
8686

8787
job_sources = [
88-
(queue.get_job_ids(), JobStatus.QUEUED),
89-
(StartedJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.STARTED),
90-
(FinishedJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.FINISHED),
91-
(FailedJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.FAILED),
92-
(DeferredJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.DEFERRED),
93-
(ScheduledJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.SCHEDULED),
94-
(CanceledJobRegistry(queue.name, connection=self.redis).get_job_ids(cleanup=False), JobStatus.CANCELED),
88+
(queue, JobStatus.QUEUED, True),
89+
(StartedJobRegistry(queue.name, connection=self.redis), JobStatus.STARTED, True),
90+
(FinishedJobRegistry(queue.name, connection=self.redis), JobStatus.FINISHED, True),
91+
(FailedJobRegistry(queue.name, connection=self.redis), JobStatus.FAILED, True),
92+
(DeferredJobRegistry(queue.name, connection=self.redis), JobStatus.DEFERRED, True),
93+
(ScheduledJobRegistry(queue.name, connection=self.redis), JobStatus.SCHEDULED, False),
94+
(CanceledJobRegistry(queue.name, connection=self.redis), JobStatus.CANCELED, False),
9595
]
9696

9797
if settings.APP_ENABLE_RQ_SCHEDULER:
98-
job_sources.append((RQSchedulerRegistry(queue.name, connection=self.redis).get_job_ids(), JobStatus.SCHEDULED))
98+
job_sources.append((RQSchedulerRegistry(queue.name, connection=self.redis), JobStatus.SCHEDULED, False))
99+
100+
jobs_to_fetch_per_registry = filters.limit * 3
101+
for registry, job_status, desc_order in job_sources:
102+
if job_status == JobStatus.QUEUED:
103+
job_ids = registry.get_job_ids() # type: ignore
104+
else:
105+
job_ids = registry.get_job_ids(cleanup=False, desc=desc_order) # type: ignore
99106

100-
for job_ids, job_status in job_sources:
101107
if filters.status and job_status != filters.status:
102108
continue
103109

104-
total_count += len(job_ids)
105110
if not job_ids:
106111
continue
107112

108-
page_ids = job_ids[filters.offset : filters.offset + filters.limit]
113+
limited_job_ids = job_ids[:jobs_to_fetch_per_registry]
114+
total_count += len(job_ids)
109115

110116
try:
111-
jobs = Job.fetch_many(page_ids, connection=self.redis)
117+
page_ids_str = [
118+
job_id.decode('utf-8') if isinstance(job_id, bytes) else str(job_id) for job_id in limited_job_ids
119+
]
120+
jobs = Job.fetch_many(page_ids_str, connection=self.redis)
112121
except Exception as e:
113-
logger.warning(f"Error fetching jobs {page_ids}: {e}")
122+
logger.warning(f"Error fetching jobs {limited_job_ids}: {e}")
114123
continue
115124

116125
for job in jobs:
@@ -146,7 +155,9 @@ def list_jobs(self, filters: JobListFilters | None = None) -> tuple[list[JobDeta
146155
key=lambda j: getattr(j, sort_by, dt.datetime.min),
147156
reverse=(sort_order == "desc"),
148157
)
149-
return collected, total_count
158+
159+
paginated_jobs = collected[filters.offset : filters.offset + filters.limit]
160+
return paginated_jobs, total_count
150161

151162
def get_jobs_for_worker(self, worker_name: str) -> list[JobDetails]:
152163
"""Get all jobs associated with a specific worker.
@@ -276,7 +287,7 @@ def create_job(self, job_data: JobCreate) -> Optional[BaseJob]:
276287
func=job_data.func_name,
277288
args=job_data.args,
278289
kwargs=job_data.kwargs,
279-
meta=job_data.meta or {},
290+
meta=dict(job_data.meta) if job_data.meta else None,
280291
connection=self.redis,
281292
)
282293
q = Queue(name=job_data.queue or "default", connection=self.redis)

front/src/services/jobsService.ts

Lines changed: 44 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,105 +1,82 @@
1+
import { useSettings } from "@/contexts/SettingsContext";
12
import { Job, JobsQueryParams, JobStatus } from "@/pages/Jobs/types";
23
import { api } from "@/utils/api";
34
import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
4-
import { useSettings } from "@/contexts/SettingsContext";
55

66
const JOBS_QUERY_KEY = "jobs";
77
const JOB_FUNCTIONS_QUERY_KEY = "job-functions";
88
const JOB_COUNTS_QUERY_KEY = "job-counts";
99

1010
// Transform frontend params to API params
1111
const transformQueryParams = (params: JobsQueryParams, queue?: string) => {
12-
const apiParams: any = {
12+
const apiParams: Record<string, unknown> = {
1313
limit: params.limit,
1414
offset: params.offset,
1515
status: params.status || undefined,
1616
worker: params.worker || undefined,
1717
function: params.function || undefined,
1818
search: params.search || undefined,
19+
sort_by: params.sort_by || "created_at",
20+
sort_order: params.sort_order || "desc",
1921
};
20-
22+
2123
// Only include queue if it's defined
2224
if (queue) {
2325
apiParams.queue = queue;
2426
}
25-
27+
2628
if (params.created_after || params.created_before) {
2729
apiParams.created_after = params.created_after;
2830
apiParams.created_before = params.created_before;
2931
}
30-
32+
3133
if (params.sort_by || params.sort_order) {
3234
apiParams.sort_by = params.sort_by;
3335
apiParams.sort_order = params.sort_order;
3436
}
35-
37+
3638
return apiParams;
3739
};
3840

3941
export const useJobs = (params: JobsQueryParams) => {
4042
return useQuery({
4143
queryKey: [JOBS_QUERY_KEY, params],
4244
queryFn: async () => {
43-
const queues = Array.isArray(params.queue) ? params.queue.filter(Boolean) : params.queue ? [params.queue] : [];
44-
45-
if (queues.length === 0) {
46-
// No queue filter - get all jobs
47-
const apiParams = transformQueryParams(params, undefined);
48-
const response = await api.get("/jobs", {
49-
params: apiParams,
50-
});
51-
const data = response.data;
52-
return {
53-
data: data.data as Job[],
54-
total: data.total,
55-
offset: data.offset,
56-
limit: data.limit,
57-
has_more: data.has_more,
58-
};
59-
} else if (queues.length === 1) {
60-
// Single queue filter
61-
const apiParams = transformQueryParams(params, queues[0]);
62-
const response = await api.get("/jobs", {
63-
params: apiParams,
64-
});
65-
const data = response.data;
66-
return {
67-
data: data.data as Job[],
68-
total: data.total,
69-
offset: data.offset,
70-
limit: data.limit,
71-
has_more: data.has_more,
72-
};
73-
} else {
74-
// Multiple queues - fetch from each and combine
75-
const responses = await Promise.all(
76-
queues.map(queue =>
77-
api.get("/jobs", {
78-
params: transformQueryParams({ ...params, limit: Math.ceil(params.limit / queues.length) }, queue)
79-
})
80-
)
81-
);
82-
83-
const allJobs = responses.flatMap(response => response.data.data as Job[]);
84-
const totalJobs = responses.reduce((sum, response) => sum + response.data.total, 0);
85-
86-
// Sort and paginate combined results
87-
const sortedJobs = allJobs.sort((a, b) =>
88-
new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
45+
const queues = Array.isArray(params.queue)
46+
? params.queue.filter(Boolean)
47+
: params.queue
48+
? [params.queue]
49+
: [];
50+
51+
// Build API params with all filters including sorting and pagination
52+
const apiParams = transformQueryParams(params, undefined);
53+
54+
const response = await api.get("/jobs", {
55+
params: apiParams,
56+
});
57+
const data = response.data;
58+
59+
let filteredJobs = data.data as Job[];
60+
let totalJobs = data.total;
61+
62+
// Apply queue filtering on client side if needed
63+
if (queues.length > 0) {
64+
filteredJobs = filteredJobs.filter(
65+
(job) => job.queue && queues.includes(job.queue)
8966
);
90-
91-
const startIndex = params.offset;
92-
const endIndex = startIndex + params.limit;
93-
const paginatedJobs = sortedJobs.slice(startIndex, endIndex);
94-
95-
return {
96-
data: paginatedJobs,
97-
total: totalJobs,
98-
offset: params.offset,
99-
limit: params.limit,
100-
has_more: endIndex < sortedJobs.length,
101-
};
67+
// For accurate total count when filtering, we need to estimate
68+
// This is an approximation since the API doesn't support multi-queue filtering
69+
totalJobs = filteredJobs.length;
10270
}
71+
72+
// The backend now handles sorting and pagination, so we just return the data as is
73+
return {
74+
data: filteredJobs,
75+
total: totalJobs,
76+
offset: params.offset,
77+
limit: params.limit,
78+
has_more: filteredJobs.length === params.limit,
79+
};
10380
},
10481
staleTime: 5000,
10582
});
@@ -136,7 +113,9 @@ export const useJobCounts = () => {
136113
const response = await api.get("/jobs/counts");
137114
return response.data as Record<JobStatus, number>;
138115
},
139-
refetchInterval: settings.autoRefresh ? settings.jobsRefreshInterval : false,
116+
refetchInterval: settings.autoRefresh
117+
? settings.jobsRefreshInterval
118+
: false,
140119
});
141120
};
142121

0 commit comments

Comments
 (0)