Skip to content

Commit 0ae044a

Browse files
committed
update queue: add priority, move reminders & notifications to their queues, define workers
1 parent bfcb425 commit 0ae044a

File tree

8 files changed

+635
-1
lines changed

8 files changed

+635
-1
lines changed

src/lib/queue/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,20 @@
11
// Export email queue functionality
22
export * from "./email";
33

4+
// Export reminder queue functionality
5+
export * from "./reminders";
6+
7+
// Export slack notification queue functionality
8+
export * from "./slack";
9+
10+
// Export priority email queue functionality
11+
export * from "./priority-email";
12+
413
// Export worker starter
514
export { startWorkers } from "./workers";
615

716
// Re-export types
817
export type { EmailJobData } from "./email";
18+
export type { ReminderJobData } from "./reminders";
19+
export type { SlackJobData } from "./slack";
20+
export type { PriorityEmailJobData } from "./priority-email";

src/lib/queue/priority-email.ts

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import Bull from "bull";
2+
import { queueOptions } from "./config";
3+
import type { ServerEvent } from "src/types";
4+
import type { EmailType } from "src/components/Email";
5+
6+
// Define the priority email job data type
7+
// Define type for email receiver to match the expected structure
8+
interface EmailReceiver {
9+
id: string;
10+
email: string | null;
11+
address: string | null;
12+
nickname: string;
13+
isBeta: boolean;
14+
profile?: {
15+
image?: string | null;
16+
} | null;
17+
}
18+
19+
// Define type for email data structure
20+
interface EmailData {
21+
event: ServerEvent;
22+
type: EmailType;
23+
receiver: EmailReceiver;
24+
}
25+
26+
export interface PriorityEmailJobData {
27+
event: ServerEvent;
28+
creatorId?: string;
29+
proposerEmails: EmailData[];
30+
attendeeEmails: EmailData[];
31+
}
32+
33+
// Create the priority email queue
34+
const priorityEmailQueue = new Bull<PriorityEmailJobData>(
35+
"priority-email-queue",
36+
queueOptions.redis as string,
37+
{
38+
defaultJobOptions: queueOptions.defaultJobOptions,
39+
settings: {
40+
stalledInterval: 30000, // 30 seconds
41+
lockDuration: 60000, // 60 seconds
42+
lockRenewTime: 15000, // 15 seconds
43+
maxStalledCount: 2,
44+
},
45+
}
46+
);
47+
48+
// Track active priority email jobs for monitoring
49+
let activePriorityEmailJobs = 0;
50+
let totalPriorityEmailJobs = 0;
51+
let completedPriorityEmailJobs = 0;
52+
let failedPriorityEmailJobs = 0;
53+
54+
// Add monitoring events
55+
priorityEmailQueue.on("active", () => {
56+
activePriorityEmailJobs = Math.max(0, activePriorityEmailJobs); // Reset if negative
57+
activePriorityEmailJobs++;
58+
console.log(
59+
`Priority email job started. Active jobs: ${activePriorityEmailJobs}`
60+
);
61+
});
62+
63+
priorityEmailQueue.on("completed", () => {
64+
activePriorityEmailJobs = Math.max(0, activePriorityEmailJobs - 1); // Never go below zero
65+
completedPriorityEmailJobs++;
66+
console.log(
67+
`Priority email job completed. Completed: ${completedPriorityEmailJobs}/${totalPriorityEmailJobs}`
68+
);
69+
});
70+
71+
priorityEmailQueue.on("failed", (job, err) => {
72+
activePriorityEmailJobs = Math.max(0, activePriorityEmailJobs - 1); // Never go below zero
73+
failedPriorityEmailJobs++;
74+
console.error(
75+
`Priority email job failed: ${err.message}. Failed: ${failedPriorityEmailJobs}/${totalPriorityEmailJobs}`
76+
);
77+
});
78+
79+
// Add priority email job to queue
80+
export const queuePriorityEmail = async (
81+
data: PriorityEmailJobData
82+
): Promise<string | number> => {
83+
const job = await priorityEmailQueue.add(data, {
84+
attempts: 3,
85+
backoff: {
86+
type: "exponential",
87+
delay: 10000,
88+
},
89+
removeOnComplete: true,
90+
});
91+
92+
totalPriorityEmailJobs++;
93+
console.log(
94+
`Priority email job for event ${data.event.id} queued with job ID: ${job.id}`
95+
);
96+
return job.id;
97+
};
98+
99+
// Get priority email queue statistics
100+
export const getPriorityEmailQueueStats = async () => {
101+
const [waiting, active, completed, failed, delayed] = await Promise.all([
102+
priorityEmailQueue.getWaitingCount(),
103+
priorityEmailQueue.getActiveCount(),
104+
priorityEmailQueue.getCompletedCount(),
105+
priorityEmailQueue.getFailedCount(),
106+
priorityEmailQueue.getDelayedCount(),
107+
]);
108+
109+
return {
110+
waiting,
111+
active,
112+
completed,
113+
failed,
114+
delayed,
115+
metrics: {
116+
activePriorityEmailJobs,
117+
totalPriorityEmailJobs,
118+
completedPriorityEmailJobs,
119+
failedPriorityEmailJobs,
120+
},
121+
};
122+
};
123+
124+
// Export the queue for worker processing
125+
export { priorityEmailQueue };

src/lib/queue/reminders.ts

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import Bull from "bull";
2+
import { queueOptions } from "./config";
3+
4+
// Define the reminder job data type
5+
export interface ReminderJobData {
6+
eventId: string;
7+
recipientId: string;
8+
isProposer: boolean;
9+
isMaybe: boolean;
10+
}
11+
12+
// Create the reminder scheduling queue
13+
const reminderQueue = new Bull<ReminderJobData>(
14+
"reminder-queue",
15+
queueOptions.redis as string,
16+
{
17+
defaultJobOptions: queueOptions.defaultJobOptions,
18+
settings: {
19+
stalledInterval: 30000, // 30 seconds
20+
lockDuration: 60000, // 60 seconds
21+
lockRenewTime: 15000, // 15 seconds
22+
maxStalledCount: 2, // Allow a job to stall twice before marking as failed
23+
},
24+
}
25+
);
26+
27+
// Track active reminder scheduling jobs for monitoring
28+
let activeReminderJobs = 0;
29+
let totalReminderJobs = 0;
30+
let completedReminderJobs = 0;
31+
let failedReminderJobs = 0;
32+
33+
// Add monitoring events
34+
reminderQueue.on("active", () => {
35+
activeReminderJobs = Math.max(0, activeReminderJobs); // Reset if negative
36+
activeReminderJobs++;
37+
console.log(`Reminder job started. Active jobs: ${activeReminderJobs}`);
38+
});
39+
40+
reminderQueue.on("completed", () => {
41+
activeReminderJobs = Math.max(0, activeReminderJobs - 1); // Never go below zero
42+
completedReminderJobs++;
43+
console.log(
44+
`Reminder job completed. Completed: ${completedReminderJobs}/${totalReminderJobs}`
45+
);
46+
});
47+
48+
reminderQueue.on("failed", (job, err) => {
49+
activeReminderJobs = Math.max(0, activeReminderJobs - 1); // Never go below zero
50+
failedReminderJobs++;
51+
console.error(
52+
`Reminder job failed: ${err.message}. Failed: ${failedReminderJobs}/${totalReminderJobs}`
53+
);
54+
});
55+
56+
// Add single reminder job to queue
57+
export const queueReminderScheduling = async (
58+
data: ReminderJobData
59+
): Promise<string | number> => {
60+
const job = await reminderQueue.add(data, {
61+
attempts: 5,
62+
backoff: {
63+
type: "exponential",
64+
delay: 10000,
65+
},
66+
removeOnComplete: true,
67+
});
68+
69+
totalReminderJobs++;
70+
console.log(
71+
`Reminder scheduling for event ${data.eventId} and recipient ${data.recipientId} queued with job ID: ${job.id}`
72+
);
73+
return job.id;
74+
};
75+
76+
// Add multiple reminder jobs to queue
77+
export const queueReminderBatch = async (
78+
dataArray: ReminderJobData[]
79+
): Promise<(string | number)[]> => {
80+
const jobs = await Promise.all(
81+
dataArray.map((data) =>
82+
reminderQueue.add(data, {
83+
attempts: 5,
84+
backoff: {
85+
type: "exponential",
86+
delay: 10000,
87+
},
88+
removeOnComplete: true,
89+
})
90+
)
91+
);
92+
93+
totalReminderJobs += dataArray.length;
94+
console.log(`Batch of ${dataArray.length} reminder scheduling jobs queued`);
95+
return jobs.map((job) => job.id);
96+
};
97+
98+
// Get reminder queue statistics
99+
export const getReminderQueueStats = async () => {
100+
const [waiting, active, completed, failed, delayed] = await Promise.all([
101+
reminderQueue.getWaitingCount(),
102+
reminderQueue.getActiveCount(),
103+
reminderQueue.getCompletedCount(),
104+
reminderQueue.getFailedCount(),
105+
reminderQueue.getDelayedCount(),
106+
]);
107+
108+
return {
109+
waiting,
110+
active,
111+
completed,
112+
failed,
113+
delayed,
114+
metrics: {
115+
activeReminderJobs,
116+
totalReminderJobs,
117+
completedReminderJobs,
118+
failedReminderJobs,
119+
},
120+
};
121+
};
122+
123+
// Export the queue for worker processing
124+
export { reminderQueue };

src/lib/queue/slack.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import Bull from "bull";
2+
import { queueOptions } from "./config";
3+
4+
// Define the slack notification job data type
5+
export interface SlackJobData {
6+
eventId: string;
7+
host: string;
8+
type: "new" | "reminder" | "updated";
9+
}
10+
11+
// Create the slack notification queue
12+
const slackQueue = new Bull<SlackJobData>(
13+
"slack-queue",
14+
queueOptions.redis as string,
15+
{
16+
defaultJobOptions: queueOptions.defaultJobOptions,
17+
settings: {
18+
stalledInterval: 30000, // 30 seconds
19+
lockDuration: 60000, // 60 seconds
20+
lockRenewTime: 15000, // 15 seconds
21+
maxStalledCount: 2,
22+
},
23+
}
24+
);
25+
26+
// Track active slack notification jobs for monitoring
27+
let activeSlackJobs = 0;
28+
let totalSlackJobs = 0;
29+
let completedSlackJobs = 0;
30+
let failedSlackJobs = 0;
31+
32+
// Add monitoring events
33+
slackQueue.on("active", () => {
34+
activeSlackJobs = Math.max(0, activeSlackJobs); // Reset if negative
35+
activeSlackJobs++;
36+
console.log(
37+
`Slack notification job started. Active jobs: ${activeSlackJobs}`
38+
);
39+
});
40+
41+
slackQueue.on("completed", () => {
42+
activeSlackJobs = Math.max(0, activeSlackJobs - 1); // Never go below zero
43+
completedSlackJobs++;
44+
console.log(
45+
`Slack notification job completed. Completed: ${completedSlackJobs}/${totalSlackJobs}`
46+
);
47+
});
48+
49+
slackQueue.on("failed", (job, err) => {
50+
activeSlackJobs = Math.max(0, activeSlackJobs - 1); // Never go below zero
51+
failedSlackJobs++;
52+
console.error(
53+
`Slack notification job failed: ${err.message}. Failed: ${failedSlackJobs}/${totalSlackJobs}`
54+
);
55+
});
56+
57+
// Add slack notification job to queue
58+
export const queueSlackNotification = async (
59+
data: SlackJobData
60+
): Promise<string | number> => {
61+
const job = await slackQueue.add(data, {
62+
attempts: 3,
63+
backoff: {
64+
type: "exponential",
65+
delay: 10000,
66+
},
67+
removeOnComplete: true,
68+
});
69+
70+
totalSlackJobs++;
71+
console.log(
72+
`Slack notification for event ${data.eventId} queued with job ID: ${job.id}`
73+
);
74+
return job.id;
75+
};
76+
77+
// Get slack queue statistics
78+
export const getSlackQueueStats = async () => {
79+
const [waiting, active, completed, failed, delayed] = await Promise.all([
80+
slackQueue.getWaitingCount(),
81+
slackQueue.getActiveCount(),
82+
slackQueue.getCompletedCount(),
83+
slackQueue.getFailedCount(),
84+
slackQueue.getDelayedCount(),
85+
]);
86+
87+
return {
88+
waiting,
89+
active,
90+
completed,
91+
failed,
92+
delayed,
93+
metrics: {
94+
activeSlackJobs,
95+
totalSlackJobs,
96+
completedSlackJobs,
97+
failedSlackJobs,
98+
},
99+
};
100+
};
101+
102+
// Export the queue for worker processing
103+
export { slackQueue };

src/lib/queue/workers/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
import { startEmailWorker } from "./email";
2+
import { startReminderWorker } from "./reminder";
3+
import { startSlackWorker } from "./slack";
4+
import { startPriorityEmailWorker } from "./priority-email";
25
import * as dotenv from "dotenv";
36

47
// Load environment variables from .env file
@@ -8,7 +11,9 @@ dotenv.config();
811
export const startWorkers = () => {
912
console.log("Starting queue workers...");
1013
startEmailWorker();
11-
// Add other workers as needed
14+
startReminderWorker();
15+
startSlackWorker();
16+
startPriorityEmailWorker();
1217
console.log("All workers started");
1318
};
1419

0 commit comments

Comments
 (0)