Skip to content

Commit 1373adf

Browse files
authored
Merge pull request #252 from mchestr/feature/redis-job-queue
feat: add Redis-based job queue with BullMQ
2 parents bc44554 + 1406c50 commit 1373adf

File tree

24 files changed

+3014
-47
lines changed

24 files changed

+3014
-47
lines changed

actions/admin/queue.ts

Lines changed: 653 additions & 0 deletions
Large diffs are not rendered by default.

actions/admin/watchlist.ts

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,10 @@ export async function getWatchlistSyncStats() {
146146

147147
/**
148148
* Force sync for a specific user (admin only)
149+
* Uses the job queue if Redis is configured, otherwise falls back to direct execution
149150
*/
150151
export async function forceUserWatchlistSync(userId: string) {
152+
const session = await getServerSession(authOptions)
151153
const isAdmin = await requireAdmin()
152154
if (!isAdmin) {
153155
return { success: false as const, error: "Unauthorized" }
@@ -164,7 +166,37 @@ export async function forceUserWatchlistSync(userId: string) {
164166
return { success: false as const, error: "User not found" }
165167
}
166168

167-
// Perform sync
169+
// Check if queue is available (Redis configured)
170+
const { isRedisConfigured } = await import("@/lib/queue/connection")
171+
172+
if (isRedisConfigured()) {
173+
// Use job queue
174+
const { addJob } = await import("@/lib/queue/client")
175+
const { JOB_TYPES } = await import("@/lib/queue/types")
176+
177+
const jobId = await addJob(JOB_TYPES.WATCHLIST_SYNC_USER, {
178+
userId,
179+
triggeredBy: "admin",
180+
triggeredByUserId: session?.user?.id,
181+
})
182+
183+
logger.info("Admin triggered watchlist sync job", {
184+
userId,
185+
adminUserId: session?.user?.id,
186+
jobId,
187+
})
188+
189+
return {
190+
success: true as const,
191+
data: {
192+
queued: true,
193+
jobId,
194+
message: "Sync job has been queued and will be processed shortly",
195+
},
196+
}
197+
}
198+
199+
// Fallback: Direct execution when Redis is not configured
168200
const result = await syncUserWatchlist(userId)
169201

170202
if (!result.success) {
@@ -173,7 +205,10 @@ export async function forceUserWatchlistSync(userId: string) {
173205

174206
return {
175207
success: true as const,
176-
data: result.data,
208+
data: {
209+
queued: false,
210+
...result.data,
211+
},
177212
}
178213
} catch (error) {
179214
logger.error("Error forcing user watchlist sync", error)

actions/watchlist.ts

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ export async function updateWatchlistSyncSettings(data: unknown) {
169169

170170
/**
171171
* Trigger a manual sync for the current user
172+
* Uses the job queue if Redis is configured, otherwise falls back to direct execution
172173
*/
173174
export async function triggerWatchlistSync() {
174175
const session = await getServerSession(authOptions)
@@ -186,7 +187,33 @@ export async function triggerWatchlistSync() {
186187
return { success: false as const, error: "Watchlist sync is not enabled" }
187188
}
188189

189-
// Perform sync
190+
// Check if queue is available (Redis configured)
191+
const { isRedisConfigured } = await import("@/lib/queue/connection")
192+
193+
if (isRedisConfigured()) {
194+
// Use job queue
195+
const { addJob } = await import("@/lib/queue/client")
196+
const { JOB_TYPES } = await import("@/lib/queue/types")
197+
198+
const jobId = await addJob(JOB_TYPES.WATCHLIST_SYNC_USER, {
199+
userId: session.user.id,
200+
triggeredBy: "manual",
201+
triggeredByUserId: session.user.id,
202+
})
203+
204+
logger.info("Watchlist sync job queued", { userId: session.user.id, jobId })
205+
206+
return {
207+
success: true as const,
208+
data: {
209+
queued: true,
210+
jobId,
211+
message: "Sync job has been queued and will be processed shortly",
212+
},
213+
}
214+
}
215+
216+
// Fallback: Direct execution when Redis is not configured
190217
const result = await syncUserWatchlist(session.user.id)
191218

192219
if (!result.success) {
@@ -195,7 +222,10 @@ export async function triggerWatchlistSync() {
195222

196223
return {
197224
success: true as const,
198-
data: result.data,
225+
data: {
226+
queued: false,
227+
...result.data,
228+
},
199229
}
200230
} catch (error) {
201231
logger.error("Error triggering watchlist sync", error)

app/admin/queues/page.tsx

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import { Suspense } from "react"
2+
import { getQueueDashboardData, getQueueJobs } from "@/actions/admin/queue"
3+
import { QueueStatus } from "@/components/admin/queues/queue-status"
4+
import { QueueStats } from "@/components/admin/queues/queue-stats"
5+
import { QueueControls } from "@/components/admin/queues/queue-controls"
6+
import { QueueFilters } from "@/components/admin/queues/queue-filters"
7+
import { JobTable } from "@/components/admin/queues/job-table"
8+
import { SchedulerInfo } from "@/components/admin/queues/scheduler-info"
9+
10+
export const dynamic = "force-dynamic"
11+
12+
interface QueueDashboardPageProps {
13+
searchParams: Promise<{
14+
status?: string
15+
jobType?: string
16+
page?: string
17+
}>
18+
}
19+
20+
export default async function QueueDashboardPage({
21+
searchParams,
22+
}: QueueDashboardPageProps) {
23+
const params = await searchParams
24+
25+
// Fetch dashboard data and jobs in parallel
26+
const [dashboardResult, jobsResult] = await Promise.all([
27+
getQueueDashboardData(),
28+
getQueueJobs({
29+
status: params.status as "waiting" | "active" | "completed" | "failed" | "delayed" | undefined,
30+
jobType: params.jobType,
31+
page: parseInt(params.page ?? "1", 10),
32+
limit: 50,
33+
}),
34+
])
35+
36+
const dashboard = dashboardResult.success ? dashboardResult.data : null
37+
const jobs = jobsResult.success ? jobsResult.data : null
38+
const error = !dashboardResult.success ? dashboardResult.error : null
39+
40+
return (
41+
<div className="p-4 sm:p-6">
42+
<div className="max-w-7xl mx-auto">
43+
{/* Header */}
44+
<div className="mb-6">
45+
<h1 className="text-2xl sm:text-3xl font-bold text-white mb-2">
46+
Job Queue Dashboard
47+
</h1>
48+
<p className="text-sm text-slate-400">
49+
Monitor and manage background job processing
50+
</p>
51+
</div>
52+
53+
{/* Error State */}
54+
{error && (
55+
<div className="mb-6 bg-red-500/10 border border-red-500/30 rounded-lg p-4">
56+
<div className="flex items-start gap-3">
57+
<svg
58+
className="w-5 h-5 text-red-400 mt-0.5 flex-shrink-0"
59+
fill="none"
60+
stroke="currentColor"
61+
viewBox="0 0 24 24"
62+
>
63+
<path
64+
strokeLinecap="round"
65+
strokeLinejoin="round"
66+
strokeWidth={2}
67+
d="M12 8v4m0 4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"
68+
/>
69+
</svg>
70+
<div>
71+
<h3 className="text-red-400 font-medium">Queue Unavailable</h3>
72+
<p className="text-sm text-red-300 mt-1">{error}</p>
73+
</div>
74+
</div>
75+
</div>
76+
)}
77+
78+
{/* Queue Status */}
79+
<Suspense
80+
fallback={
81+
<div className="mb-6 h-20 bg-slate-800/50 rounded-lg animate-pulse" />
82+
}
83+
>
84+
<QueueStatus
85+
workerRunning={dashboard?.workerRunning ?? false}
86+
isPaused={dashboard?.isPaused ?? false}
87+
redisConnected={dashboard?.redisConnected ?? false}
88+
/>
89+
</Suspense>
90+
91+
{/* Statistics Cards */}
92+
<QueueStats stats={dashboard?.stats ?? null} />
93+
94+
{/* Controls and Scheduler Info */}
95+
<div className="grid grid-cols-1 lg:grid-cols-2 gap-6 mb-6">
96+
<QueueControls
97+
isPaused={dashboard?.isPaused ?? false}
98+
disabled={!dashboard?.redisConnected}
99+
/>
100+
<SchedulerInfo schedulers={dashboard?.schedulers ?? []} />
101+
</div>
102+
103+
{/* Filters */}
104+
<QueueFilters />
105+
106+
{/* Job Table */}
107+
<div className="bg-slate-800/50 backdrop-blur-sm border border-slate-700 rounded-lg overflow-hidden">
108+
<div className="p-4 border-b border-slate-700 flex items-center justify-between">
109+
<h2 className="text-lg font-semibold text-white">Jobs</h2>
110+
{jobs && (
111+
<span className="text-sm text-slate-400">
112+
Page {jobs.page}
113+
{jobs.hasMore && "+"}
114+
</span>
115+
)}
116+
</div>
117+
<JobTable
118+
jobs={jobs?.jobs ?? []}
119+
page={jobs?.page ?? 1}
120+
hasMore={jobs?.hasMore ?? false}
121+
disabled={!dashboard?.redisConnected}
122+
/>
123+
</div>
124+
</div>
125+
</div>
126+
)
127+
}

components/__tests__/admin-shared.test.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -800,8 +800,8 @@ describe('AdminNav', () => {
800800
expect(firstMenuItem).toHaveFocus()
801801
})
802802

803-
// Navigate to last item (Sign Out) - there are 8 items total (6 menu items + Home + Sign Out)
804-
for (let i = 0; i < 7; i++) {
803+
// Navigate to last item (Sign Out) - there are 9 items total (7 menu items + Home + Sign Out)
804+
for (let i = 0; i < 8; i++) {
805805
await user.keyboard('{ArrowDown}')
806806
}
807807

0 commit comments

Comments
 (0)