Skip to content

Commit 30ec775

Browse files
committed
Add purge queue command
1 parent da568e5 commit 30ec775

File tree

5 files changed

+171
-0
lines changed

5 files changed

+171
-0
lines changed

.changeset/lemon-areas-tap.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"wrangler": minor
3+
---
4+
5+
Add new command to purge a Queue
6+
7+
This new command can be used to delete all existing messages in a Queue

packages/wrangler/src/__tests__/queues.test.ts

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ describe("wrangler", () => {
3535
wrangler queues consumer Configure Queue consumers
3636
wrangler queues pause-delivery <name> Pause message delivery for a Queue
3737
wrangler queues resume-delivery <name> Resume message delivery for a Queue
38+
wrangler queues purge <name> Purge messages from a Queue
3839
3940
GLOBAL FLAGS
4041
-c, --config Path to Wrangler configuration file [string]
@@ -2066,4 +2067,97 @@ describe("wrangler", () => {
20662067
`);
20672068
});
20682069
});
2070+
2071+
describe("purge", () => {
2072+
function mockPurgeRequest() {
2073+
const requests = { count: 0 };
2074+
2075+
msw.use(
2076+
http.post(
2077+
"*/accounts/:accountId/queues/:queueId/purge",
2078+
async ({ request }) => {
2079+
requests.count += 1;
2080+
2081+
const body = (await request.json()) as {
2082+
delete_messages_permanently: boolean;
2083+
};
2084+
expect(body.delete_messages_permanently).toEqual(true);
2085+
return HttpResponse.json({
2086+
success: true,
2087+
errors: [],
2088+
messages: [],
2089+
result: {
2090+
started_on: "01-01-2001",
2091+
complete: false,
2092+
},
2093+
});
2094+
},
2095+
{ once: true }
2096+
)
2097+
);
2098+
return requests;
2099+
}
2100+
function mockGetQueueRequest(queueName: string) {
2101+
const requests = { count: 0 };
2102+
msw.use(
2103+
http.get(
2104+
"*/accounts/:accountId/queues?*",
2105+
async () => {
2106+
requests.count += 1;
2107+
return HttpResponse.json({
2108+
success: true,
2109+
errors: [],
2110+
messages: [],
2111+
result: [
2112+
{
2113+
queue_name: queueName,
2114+
created_on: "",
2115+
producers: [],
2116+
consumers: [],
2117+
producers_total_count: 1,
2118+
consumers_total_count: 0,
2119+
modified_on: "",
2120+
queue_id: "queueId",
2121+
},
2122+
],
2123+
});
2124+
},
2125+
{ once: true }
2126+
)
2127+
);
2128+
return requests;
2129+
}
2130+
2131+
it("should show the correct help text", async () => {
2132+
await runWrangler("queues purge --help");
2133+
expect(std.err).toMatchInlineSnapshot(`""`);
2134+
expect(std.out).toMatchInlineSnapshot(`
2135+
"wrangler queues purge <name>
2136+
2137+
Purge messages from a Queue
2138+
2139+
POSITIONALS
2140+
name The name of the queue [string] [required]
2141+
2142+
GLOBAL FLAGS
2143+
-c, --config Path to Wrangler configuration file [string]
2144+
--cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string]
2145+
-e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string]
2146+
-h, --help Show help [boolean]
2147+
-v, --version Show version number [boolean]"
2148+
`);
2149+
});
2150+
2151+
it("should call the /purge API endpoint", async () => {
2152+
const getrequests = mockGetQueueRequest("testQueue");
2153+
const requests = mockPurgeRequest();
2154+
2155+
await runWrangler("queues purge testQueue");
2156+
2157+
expect(requests.count).toEqual(1);
2158+
expect(getrequests.count).toEqual(1);
2159+
2160+
expect(std.out).toMatchInlineSnapshot(`"Started a purge operation for Queue 'testQueue'"`);
2161+
});
2162+
});
20692163
});

packages/wrangler/src/queues/cli/commands/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
options as pauseResumeOptions,
1010
resumeHandler,
1111
} from "./pause-resume";
12+
import { handler as purgeHandler, options as purgeOptions } from "./purge";
1213
import { handler as updateHandler, options as updateOptions } from "./update";
1314
import type { CommonYargsArgv } from "../../../yargs-types";
1415

@@ -65,5 +66,12 @@ export function queues(yargs: CommonYargsArgv) {
6566
resumeHandler
6667
);
6768

69+
yargs.command(
70+
"purge <name>",
71+
"Purge messages from a Queue",
72+
purgeOptions,
73+
purgeHandler
74+
);
75+
6876
yargs.fail(HandleUnauthorizedError);
6977
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { readConfig } from "../../../config";
2+
import { prompt } from "../../../dialogs";
3+
import { FatalError } from "../../../errors";
4+
import isInteractive from "../../../is-interactive";
5+
import { logger } from "../../../logger";
6+
import { purgeQueue } from "../../client";
7+
import type {
8+
CommonYargsArgv,
9+
StrictYargsOptionsToInterface,
10+
} from "../../../yargs-types";
11+
12+
export function options(yargs: CommonYargsArgv) {
13+
return yargs.positional("name", {
14+
type: "string",
15+
demandOption: true,
16+
description: "The name of the queue",
17+
});
18+
}
19+
20+
export async function handler(
21+
args: StrictYargsOptionsToInterface<typeof options>
22+
) {
23+
const config = readConfig(args);
24+
25+
if (isInteractive()) {
26+
const result = await prompt(
27+
`This operation will permanently delete all the messages in Queue ${args.name}. Type ${args.name} to proceed.`
28+
);
29+
if (result !== args.name) {
30+
throw new FatalError(
31+
"Incorrect queue name provided. Skipping purge operation"
32+
);
33+
}
34+
}
35+
36+
await purgeQueue(config, args.name);
37+
38+
logger.log(`Started a purge operation for Queue '${args.name}'`);
39+
}

packages/wrangler/src/queues/client.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ export interface ConsumerSettings {
8585
retry_delay?: number;
8686
}
8787

88+
export interface PurgeQueueBody {
89+
delete_messages_permanently: boolean;
90+
}
91+
92+
export interface PurgeQueueResponse {
93+
started_at: string;
94+
complete: boolean;
95+
}
96+
8897
const queuesUrl = (accountId: string, queueId?: string): string => {
8998
let url = `/accounts/${accountId}/queues`;
9099
if (queueId) {
@@ -406,3 +415,17 @@ export async function deleteWorkerConsumer(
406415
);
407416
return deleteConsumerById(config, queue.queue_id, targetConsumer.consumer_id);
408417
}
418+
419+
export async function purgeQueue(
420+
config: Config,
421+
queueName: string
422+
): Promise<void> {
423+
const accountId = await requireAuth(config);
424+
const queue = await getQueue(config, queueName);
425+
const purgeURL = `${queuesUrl(accountId, queue.queue_id)}/purge`;
426+
const body: PurgeQueueBody = { delete_messages_permanently: true };
427+
return fetchResult(purgeURL, {
428+
method: "POST",
429+
body: JSON.stringify(body),
430+
});
431+
}

0 commit comments

Comments
 (0)