Skip to content

Commit 46998a1

Browse files
authored
Merge pull request quirrel-dev#771 from quirrel-dev/remove-all-jobs-on-queue
New endpoint to remove all jobs on a queue
2 parents dfe9462 + 37c005a commit 46998a1

File tree

3 files changed

+58
-0
lines changed

3 files changed

+58
-0
lines changed

src/api/scheduler/jobs-repo.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,23 @@ export class JobsRepo implements Closable {
145145
);
146146
}
147147

148+
public async emptyQueue(tokenId: string, endpoint: string) {
149+
let cursor = 0;
150+
const allPromises: Promise<any>[] = [];
151+
do {
152+
const { cursor: newCursor, jobs } = await this.find(tokenId, endpoint, {
153+
cursor,
154+
});
155+
cursor = newCursor;
156+
157+
for (const job of jobs) {
158+
allPromises.push(this.delete(tokenId, endpoint, job.id));
159+
}
160+
} while (cursor !== 0);
161+
162+
await Promise.all(allPromises);
163+
}
164+
148165
public async delete(tokenId: string, endpoint: string, id: string) {
149166
return await this.producer.delete(
150167
encodeQueueDescriptor(tokenId, endpoint),

src/api/scheduler/routes/queues.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,27 @@ const jobs: FastifyPluginCallback = (fastify, opts, done) => {
114114
}
115115
);
116116

117+
fastify.delete<{ Body: EnqueueJob; Params: QueuesEndpointParams }>(
118+
"/:endpoint",
119+
{
120+
schema: {
121+
...baseSchema,
122+
params: EndpointParamsSchema,
123+
summary: "Empty a Queue",
124+
},
125+
},
126+
async (request, reply) => {
127+
fastify.telemetrist?.dispatch("empty_queue");
128+
129+
const { tokenId } = request;
130+
const { endpoint } = request.params;
131+
132+
await jobsRepo.emptyQueue(tokenId, endpoint);
133+
134+
reply.status(204).send();
135+
}
136+
);
137+
117138
fastify.post<{
118139
Body: EnqueueJob[];
119140
Params: QueuesEndpointParams;

src/api/test/jobs.test.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,26 @@ describeAcrossBackends("Jobs", (backend) => {
338338
expect(lastBody).not.toEqual('{"iWill":"beDeleted"}');
339339
});
340340

341+
test("delete all jobs on a queue", async () => {
342+
await request(quirrel)
343+
.post("/queues/" + endpoint + "/batch")
344+
.send([
345+
{
346+
body: JSON.stringify({ iWill: "beDeleted_1" }),
347+
runAt: new Date(Date.now() + 300).toISOString(),
348+
},
349+
{
350+
body: JSON.stringify({ iWill: "beDeleted_2" }),
351+
runAt: new Date(Date.now() + 300).toISOString(),
352+
},
353+
])
354+
.expect(201);
355+
356+
await request(quirrel).delete(`/queues/${endpoint}`).expect(204);
357+
358+
await request(quirrel).get(`/queues/${endpoint}`).expect(200, { jobs: [], cursor: null });
359+
});
360+
341361
test("idempotent jobs", async () => {
342362
const id = "sameIdAcrossBothJobs";
343363

0 commit comments

Comments
 (0)