@@ -18,12 +18,51 @@ import {
1818} from "@/lib/queue/client"
1919import { isRedisConfigured , isRedisHealthy } from "@/lib/queue/connection"
2020import { JOB_TYPES , JobStatus , JobMetadata , QueueStats , QueueHealth } from "@/lib/queue/types"
21+ import { isRegisteredJobType } from "@/lib/queue/jobs"
2122import { isWorkerRunning } from "@/lib/queue/worker"
2223import { isWatchlistSyncEnabled , getWatchlistSyncInterval } from "@/lib/watchlist/lock"
24+ import { logAuditEvent , AuditEventType } from "@/lib/security/audit-log"
2325import { z } from "zod"
2426
2527const logger = createLogger ( "ADMIN_QUEUE_ACTIONS" )
2628
29+ // =============================================================================
30+ // Rate Limiting for Server Actions
31+ // =============================================================================
32+
33+ interface RateLimitEntry {
34+ count : number
35+ resetTime : number
36+ }
37+
38+ const actionRateLimitStore = new Map < string , RateLimitEntry > ( )
39+
40+ // Rate limit config: 30 actions per minute per admin user
41+ const RATE_LIMIT_WINDOW_MS = 60000
42+ const RATE_LIMIT_MAX = 30
43+
44+ /**
45+ * Check if an admin action is rate limited
46+ * Returns true if rate limit exceeded
47+ */
48+ function isRateLimited ( adminId : string ) : boolean {
49+ const now = Date . now ( )
50+ const key = `queue:${ adminId } `
51+ const entry = actionRateLimitStore . get ( key )
52+
53+ if ( ! entry || now > entry . resetTime ) {
54+ actionRateLimitStore . set ( key , { count : 1 , resetTime : now + RATE_LIMIT_WINDOW_MS } )
55+ return false
56+ }
57+
58+ entry . count ++
59+ if ( entry . count > RATE_LIMIT_MAX ) {
60+ return true
61+ }
62+
63+ return false
64+ }
65+
2766// =============================================================================
2867// Validation Schemas
2968// =============================================================================
@@ -125,7 +164,7 @@ export async function getQueueDashboardData(): Promise<
125164 logger . error ( "Error fetching queue dashboard data" , error )
126165 return {
127166 success : false ,
128- error : error instanceof Error ? error . message : "Failed to fetch queue data " ,
167+ error : "Failed to load queue dashboard. Please try again. " ,
129168 }
130169 }
131170}
@@ -183,7 +222,7 @@ export async function getQueueHealth(): Promise<
183222 logger . error ( "Error fetching queue health" , error )
184223 return {
185224 success : false ,
186- error : error instanceof Error ? error . message : "Failed to fetch health ",
225+ error : "Failed to load queue health. Please try again. ",
187226 }
188227 }
189228}
@@ -219,9 +258,12 @@ export async function getQueueJobs(
219258 const start = ( page - 1 ) * limit
220259 const end = start + limit - 1
221260
261+ // Validate jobType if provided
262+ const validJobType = jobType && isRegisteredJobType ( jobType ) ? jobType : undefined
263+
222264 const jobs = await getJobs ( {
223265 status : status as JobStatus | undefined ,
224- jobType : jobType as ( typeof JOB_TYPES ) [ keyof typeof JOB_TYPES ] | undefined ,
266+ jobType : validJobType as ( typeof JOB_TYPES ) [ keyof typeof JOB_TYPES ] | undefined ,
225267 start,
226268 end,
227269 } )
@@ -280,12 +322,18 @@ export async function getQueueJob(
280322export async function retryQueueJob (
281323 input : unknown
282324) : Promise < { success : true } | { success : false ; error : string } > {
325+ let session : Awaited < ReturnType < typeof requireAdmin > >
283326 try {
284- await requireAdmin ( )
327+ session = await requireAdmin ( )
285328 } catch {
286329 return { success : false , error : "Unauthorized" }
287330 }
288331
332+ const adminId = session . user . id
333+ if ( isRateLimited ( adminId ) ) {
334+ return { success : false , error : "Too many requests. Please try again later." }
335+ }
336+
289337 const validated = jobIdSchema . safeParse ( input )
290338 if ( ! validated . success ) {
291339 return { success : false , error : "Invalid job ID" }
@@ -297,11 +345,16 @@ export async function retryQueueJob(
297345
298346 try {
299347 await retryJob ( validated . data . jobId )
300- logger . info ( "Job retried by admin" , { jobId : validated . data . jobId } )
348+
349+ logAuditEvent ( AuditEventType . QUEUE_JOB_RETRIED , adminId , {
350+ jobId : validated . data . jobId ,
351+ } )
352+ logger . info ( "Job retried by admin" , { jobId : validated . data . jobId , adminId } )
353+
301354 return { success : true }
302355 } catch ( error ) {
303356 logger . error ( "Error retrying job" , error , { jobId : validated . data . jobId } )
304- return { success : false , error : "Failed to retry job" }
357+ return { success : false , error : "Failed to retry job. Please try again. " }
305358 }
306359}
307360
@@ -311,12 +364,18 @@ export async function retryQueueJob(
311364export async function removeQueueJob (
312365 input : unknown
313366) : Promise < { success : true } | { success : false ; error : string } > {
367+ let session : Awaited < ReturnType < typeof requireAdmin > >
314368 try {
315- await requireAdmin ( )
369+ session = await requireAdmin ( )
316370 } catch {
317371 return { success : false , error : "Unauthorized" }
318372 }
319373
374+ const adminId = session . user . id
375+ if ( isRateLimited ( adminId ) ) {
376+ return { success : false , error : "Too many requests. Please try again later." }
377+ }
378+
320379 const validated = jobIdSchema . safeParse ( input )
321380 if ( ! validated . success ) {
322381 return { success : false , error : "Invalid job ID" }
@@ -328,11 +387,16 @@ export async function removeQueueJob(
328387
329388 try {
330389 await removeJob ( validated . data . jobId )
331- logger . info ( "Job removed by admin" , { jobId : validated . data . jobId } )
390+
391+ logAuditEvent ( AuditEventType . QUEUE_JOB_REMOVED , adminId , {
392+ jobId : validated . data . jobId ,
393+ } )
394+ logger . info ( "Job removed by admin" , { jobId : validated . data . jobId , adminId } )
395+
332396 return { success : true }
333397 } catch ( error ) {
334398 logger . error ( "Error removing job" , error , { jobId : validated . data . jobId } )
335- return { success : false , error : "Failed to remove job" }
399+ return { success : false , error : "Failed to remove job. Please try again. " }
336400 }
337401}
338402
@@ -346,23 +410,32 @@ export async function removeQueueJob(
346410export async function pauseJobQueue ( ) : Promise <
347411 { success : true } | { success : false ; error : string }
348412> {
413+ let session : Awaited < ReturnType < typeof requireAdmin > >
349414 try {
350- await requireAdmin ( )
415+ session = await requireAdmin ( )
351416 } catch {
352417 return { success : false , error : "Unauthorized" }
353418 }
354419
420+ const adminId = session . user . id
421+ if ( isRateLimited ( adminId ) ) {
422+ return { success : false , error : "Too many requests. Please try again later." }
423+ }
424+
355425 if ( ! isRedisConfigured ( ) ) {
356426 return { success : false , error : "Redis is not configured" }
357427 }
358428
359429 try {
360430 await pauseQueue ( )
361- logger . info ( "Queue paused by admin" )
431+
432+ logAuditEvent ( AuditEventType . QUEUE_PAUSED , adminId )
433+ logger . info ( "Queue paused by admin" , { adminId } )
434+
362435 return { success : true }
363436 } catch ( error ) {
364437 logger . error ( "Error pausing queue" , error )
365- return { success : false , error : "Failed to pause queue" }
438+ return { success : false , error : "Failed to pause queue. Please try again. " }
366439 }
367440}
368441
@@ -372,23 +445,32 @@ export async function pauseJobQueue(): Promise<
372445export async function resumeJobQueue ( ) : Promise <
373446 { success : true } | { success : false ; error : string }
374447> {
448+ let session : Awaited < ReturnType < typeof requireAdmin > >
375449 try {
376- await requireAdmin ( )
450+ session = await requireAdmin ( )
377451 } catch {
378452 return { success : false , error : "Unauthorized" }
379453 }
380454
455+ const adminId = session . user . id
456+ if ( isRateLimited ( adminId ) ) {
457+ return { success : false , error : "Too many requests. Please try again later." }
458+ }
459+
381460 if ( ! isRedisConfigured ( ) ) {
382461 return { success : false , error : "Redis is not configured" }
383462 }
384463
385464 try {
386465 await resumeQueue ( )
387- logger . info ( "Queue resumed by admin" )
466+
467+ logAuditEvent ( AuditEventType . QUEUE_RESUMED , adminId )
468+ logger . info ( "Queue resumed by admin" , { adminId } )
469+
388470 return { success : true }
389471 } catch ( error ) {
390472 logger . error ( "Error resuming queue" , error )
391- return { success : false , error : "Failed to resume queue" }
473+ return { success : false , error : "Failed to resume queue. Please try again. " }
392474 }
393475}
394476
@@ -402,12 +484,18 @@ export async function resumeJobQueue(): Promise<
402484export async function triggerWatchlistSyncJob (
403485 input : unknown
404486) : Promise < { success : true ; data : { jobId : string } } | { success : false ; error : string } > {
487+ let session : Awaited < ReturnType < typeof requireAdmin > >
405488 try {
406- await requireAdmin ( )
489+ session = await requireAdmin ( )
407490 } catch {
408491 return { success : false , error : "Unauthorized" }
409492 }
410493
494+ const adminId = session . user . id
495+ if ( isRateLimited ( adminId ) ) {
496+ return { success : false , error : "Too many requests. Please try again later." }
497+ }
498+
411499 const validated = triggerSyncSchema . safeParse ( input )
412500 if ( ! validated . success ) {
413501 return { success : false , error : "Invalid input" }
@@ -426,19 +514,32 @@ export async function triggerWatchlistSyncJob(
426514 userId,
427515 triggeredBy : "admin" ,
428516 } )
429- logger . info ( "User watchlist sync job triggered by admin" , { userId, jobId } )
517+
518+ logAuditEvent ( AuditEventType . QUEUE_SYNC_TRIGGERED , adminId , {
519+ targetUserId : userId ,
520+ syncType : "user" ,
521+ jobId,
522+ } )
523+ logger . info ( "User watchlist sync job triggered by admin" , { userId, jobId, adminId } )
524+
430525 return { success : true , data : { jobId } }
431526 } else {
432527 // Sync all enabled users
433528 const jobId = await addJob ( JOB_TYPES . WATCHLIST_SYNC_ALL , {
434529 triggeredBy : "admin" ,
435530 } )
436- logger . info ( "Batch watchlist sync job triggered by admin" , { jobId } )
531+
532+ logAuditEvent ( AuditEventType . QUEUE_SYNC_TRIGGERED , adminId , {
533+ syncType : "all" ,
534+ jobId,
535+ } )
536+ logger . info ( "Batch watchlist sync job triggered by admin" , { jobId, adminId } )
537+
437538 return { success : true , data : { jobId } }
438539 }
439540 } catch ( error ) {
440541 logger . error ( "Error triggering sync job" , error )
441- return { success : false , error : "Failed to trigger sync" }
542+ return { success : false , error : "Failed to trigger sync. Please try again. " }
442543 }
443544}
444545
@@ -448,12 +549,18 @@ export async function triggerWatchlistSyncJob(
448549export async function updateWatchlistSyncSchedule (
449550 input : unknown
450551) : Promise < { success : true } | { success : false ; error : string } > {
552+ let session : Awaited < ReturnType < typeof requireAdmin > >
451553 try {
452- await requireAdmin ( )
554+ session = await requireAdmin ( )
453555 } catch {
454556 return { success : false , error : "Unauthorized" }
455557 }
456558
559+ const adminId = session . user . id
560+ if ( isRateLimited ( adminId ) ) {
561+ return { success : false , error : "Too many requests. Please try again later." }
562+ }
563+
457564 const validated = updateScheduleSchema . safeParse ( input )
458565 if ( ! validated . success ) {
459566 return { success : false , error : "Invalid interval" }
@@ -478,17 +585,27 @@ export async function updateWatchlistSyncSchedule(
478585 { triggeredBy : "scheduled" } ,
479586 intervalMs
480587 )
481- logger . info ( "Watchlist sync schedule updated" , { intervalMinutes } )
588+
589+ logAuditEvent ( AuditEventType . QUEUE_SCHEDULE_UPDATED , adminId , {
590+ intervalMinutes,
591+ action : "updated" ,
592+ } )
593+ logger . info ( "Watchlist sync schedule updated" , { intervalMinutes, adminId } )
482594 } else {
483595 // Remove the scheduler if sync is disabled
484596 await removeScheduledJob ( "watchlist-sync-scheduled" )
485- logger . info ( "Watchlist sync schedule removed (sync disabled)" )
597+
598+ logAuditEvent ( AuditEventType . QUEUE_SCHEDULE_UPDATED , adminId , {
599+ action : "removed" ,
600+ reason : "sync_disabled" ,
601+ } )
602+ logger . info ( "Watchlist sync schedule removed (sync disabled)" , { adminId } )
486603 }
487604
488605 return { success : true }
489606 } catch ( error ) {
490607 logger . error ( "Error updating sync schedule" , error )
491- return { success : false , error : "Failed to update schedule" }
608+ return { success : false , error : "Failed to update schedule. Please try again. " }
492609 }
493610}
494611
0 commit comments