Skip to content

Commit 3e043ee

Browse files
committed
Add purge queue command
1 parent da568e5 commit 3e043ee

File tree

5 files changed

+169
-0
lines changed

5 files changed

+169
-0
lines changed

.changeset/lemon-areas-tap.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"wrangler": minor
3+
---
4+
5+
Add new command to purge a Queue
6+
7+
This new command can be used to delete all existing messages in a Queue

packages/wrangler/src/__tests__/queues.test.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { http, HttpResponse } from "msw";
22
import { mockAccountId, mockApiToken } from "./helpers/mock-account-id";
33
import { mockConsoleMethods } from "./helpers/mock-console";
4+
import { mockPrompt } from "./helpers/mock-dialogs";
45
import { msw } from "./helpers/msw";
56
import { runInTempDir } from "./helpers/run-in-tmp";
67
import { runWrangler } from "./helpers/run-wrangler";
@@ -35,6 +36,7 @@ describe("wrangler", () => {
3536
wrangler queues consumer Configure Queue consumers
3637
wrangler queues pause-delivery <name> Pause message delivery for a Queue
3738
wrangler queues resume-delivery <name> Resume message delivery for a Queue
39+
wrangler queues purge <name> Purge messages from a Queue
3840
3941
GLOBAL FLAGS
4042
-c, --config Path to Wrangler configuration file [string]
@@ -2066,4 +2068,97 @@ describe("wrangler", () => {
20662068
`);
20672069
});
20682070
});
2071+
2072+
describe("purge", () => {
2073+
function mockPurgeRequest() {
2074+
const requests = { count: 0 };
2075+
2076+
msw.use(
2077+
http.post(
2078+
"*/accounts/:accountId/queues/:queueId/purge",
2079+
async ({ request }) => {
2080+
requests.count += 1;
2081+
2082+
const body = (await request.json()) as {
2083+
delete_messages_permanently: boolean;
2084+
};
2085+
expect(body.delete_messages_permanently).toEqual(true);
2086+
return HttpResponse.json({
2087+
success: true,
2088+
errors: [],
2089+
messages: [],
2090+
result: {
2091+
started_on: "01-01-2001",
2092+
complete: false,
2093+
},
2094+
});
2095+
},
2096+
{ once: true }
2097+
)
2098+
);
2099+
return requests;
2100+
}
2101+
function mockGetQueueRequest(queueName: string) {
2102+
const requests = { count: 0 };
2103+
msw.use(
2104+
http.get(
2105+
"*/accounts/:accountId/queues?*",
2106+
async () => {
2107+
requests.count += 1;
2108+
return HttpResponse.json({
2109+
success: true,
2110+
errors: [],
2111+
messages: [],
2112+
result: [
2113+
{
2114+
queue_name: queueName,
2115+
created_on: "",
2116+
producers: [],
2117+
consumers: [],
2118+
producers_total_count: 1,
2119+
consumers_total_count: 0,
2120+
modified_on: "",
2121+
queue_id: "queueId",
2122+
},
2123+
],
2124+
});
2125+
},
2126+
{ once: true }
2127+
)
2128+
);
2129+
return requests;
2130+
}
2131+
2132+
it("should show the correct help text", async () => {
2133+
await runWrangler("queues purge --help");
2134+
expect(std.err).toMatchInlineSnapshot(`""`);
2135+
expect(std.out).toMatchInlineSnapshot(`
2136+
"wrangler queues purge <name>
2137+
2138+
Purge messages from a Queue
2139+
2140+
POSITIONALS
2141+
name The name of the queue [string] [required]
2142+
2143+
GLOBAL FLAGS
2144+
-c, --config Path to Wrangler configuration file [string]
2145+
--cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string]
2146+
-e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string]
2147+
-h, --help Show help [boolean]
2148+
-v, --version Show version number [boolean]"
2149+
`);
2150+
});
2151+
2152+
it("should call the /purge API endpoint", async () => {
2153+
const getrequests = mockGetQueueRequest("testQueue");
2154+
const requests = mockPurgeRequest();
2155+
2156+
await runWrangler("queues purge testQueue");
2157+
2158+
expect(requests.count).toEqual(1);
2159+
expect(getrequests.count).toEqual(1);
2160+
2161+
expect(std.out).toMatchInlineSnapshot(`""`);
2162+
});
2163+
});
20692164
});

packages/wrangler/src/queues/cli/commands/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
options as pauseResumeOptions,
1010
resumeHandler,
1111
} from "./pause-resume";
12+
import { handler as purgeHandler, options as purgeOptions } from "./purge";
1213
import { handler as updateHandler, options as updateOptions } from "./update";
1314
import type { CommonYargsArgv } from "../../../yargs-types";
1415

@@ -65,5 +66,12 @@ export function queues(yargs: CommonYargsArgv) {
6566
resumeHandler
6667
);
6768

69+
yargs.command(
70+
"purge <name>",
71+
"Purge messages from a Queue",
72+
purgeOptions,
73+
purgeHandler
74+
);
75+
6876
yargs.fail(HandleUnauthorizedError);
6977
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { readConfig } from "../../../config";
2+
import { prompt } from "../../../dialogs";
3+
import { FatalError } from "../../../errors";
4+
import isInteractive from "../../../is-interactive";
5+
import { purgeQueue } from "../../client";
6+
import type {
7+
CommonYargsArgv,
8+
StrictYargsOptionsToInterface,
9+
} from "../../../yargs-types";
10+
11+
export function options(yargs: CommonYargsArgv) {
12+
return yargs.positional("name", {
13+
type: "string",
14+
demandOption: true,
15+
description: "The name of the queue",
16+
});
17+
}
18+
19+
export async function handler(
20+
args: StrictYargsOptionsToInterface<typeof options>
21+
) {
22+
const config = readConfig(args);
23+
24+
if (isInteractive()) {
25+
const result = await prompt(
26+
`This operation will permanently delete all the messages in Queue ${args.name}. Type ${args.name} to proceed.`
27+
);
28+
if (result !== args.name) {
29+
throw new FatalError(
30+
"Incorrect queue name provided. Skipping purge operation"
31+
);
32+
}
33+
}
34+
35+
await purgeQueue(config, args.name);
36+
}

packages/wrangler/src/queues/client.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ export interface ConsumerSettings {
8585
retry_delay?: number;
8686
}
8787

88+
export interface PurgeQueueBody {
89+
delete_messages_permanently: boolean;
90+
}
91+
92+
export interface PurgeQueueResponse {
93+
started_at: string;
94+
complete: boolean;
95+
}
96+
8897
const queuesUrl = (accountId: string, queueId?: string): string => {
8998
let url = `/accounts/${accountId}/queues`;
9099
if (queueId) {
@@ -406,3 +415,17 @@ export async function deleteWorkerConsumer(
406415
);
407416
return deleteConsumerById(config, queue.queue_id, targetConsumer.consumer_id);
408417
}
418+
419+
export async function purgeQueue(
420+
config: Config,
421+
queueName: string
422+
): Promise<void> {
423+
const accountId = await requireAuth(config);
424+
const queue = await getQueue(config, queueName);
425+
const purgeURL = `${queuesUrl(accountId, queue.queue_id)}/purge`;
426+
const body: PurgeQueueBody = { delete_messages_permanently: true };
427+
return fetchResult(purgeURL, {
428+
method: "POST",
429+
body: JSON.stringify(body),
430+
});
431+
}

0 commit comments

Comments
 (0)