From 30ec775338dc9242b7d6dca16b7c1d9e0eed13e0 Mon Sep 17 00:00:00 2001 From: Josh Wheeler Date: Tue, 4 Mar 2025 17:27:00 -0600 Subject: [PATCH 1/2] Add purge queue command --- .changeset/lemon-areas-tap.md | 7 ++ .../wrangler/src/__tests__/queues.test.ts | 94 +++++++++++++++++++ .../wrangler/src/queues/cli/commands/index.ts | 8 ++ .../wrangler/src/queues/cli/commands/purge.ts | 39 ++++++++ packages/wrangler/src/queues/client.ts | 23 +++++ 5 files changed, 171 insertions(+) create mode 100644 .changeset/lemon-areas-tap.md create mode 100644 packages/wrangler/src/queues/cli/commands/purge.ts diff --git a/.changeset/lemon-areas-tap.md b/.changeset/lemon-areas-tap.md new file mode 100644 index 000000000000..bc3500cabab6 --- /dev/null +++ b/.changeset/lemon-areas-tap.md @@ -0,0 +1,7 @@ +--- +"wrangler": minor +--- + +Add new command to purge a Queue + +This new command can be used to delete all existing messages in a Queue diff --git a/packages/wrangler/src/__tests__/queues.test.ts b/packages/wrangler/src/__tests__/queues.test.ts index 43c19fb4f9fe..b8c8ea8adc29 100644 --- a/packages/wrangler/src/__tests__/queues.test.ts +++ b/packages/wrangler/src/__tests__/queues.test.ts @@ -35,6 +35,7 @@ describe("wrangler", () => { wrangler queues consumer Configure Queue consumers wrangler queues pause-delivery Pause message delivery for a Queue wrangler queues resume-delivery Resume message delivery for a Queue + wrangler queues purge Purge messages from a Queue GLOBAL FLAGS -c, --config Path to Wrangler configuration file [string] @@ -2066,4 +2067,97 @@ describe("wrangler", () => { `); }); }); + + describe("purge", () => { + function mockPurgeRequest() { + const requests = { count: 0 }; + + msw.use( + http.post( + "*/accounts/:accountId/queues/:queueId/purge", + async ({ request }) => { + requests.count += 1; + + const body = (await request.json()) as { + delete_messages_permanently: boolean; + }; + expect(body.delete_messages_permanently).toEqual(true); + return HttpResponse.json({ + success: true, + errors: [], + messages: [], + result: { + started_on: "01-01-2001", + complete: false, + }, + }); + }, + { once: true } + ) + ); + return requests; + } + function mockGetQueueRequest(queueName: string) { + const requests = { count: 0 }; + msw.use( + http.get( + "*/accounts/:accountId/queues?*", + async () => { + requests.count += 1; + return HttpResponse.json({ + success: true, + errors: [], + messages: [], + result: [ + { + queue_name: queueName, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + queue_id: "queueId", + }, + ], + }); + }, + { once: true } + ) + ); + return requests; + } + + it("should show the correct help text", async () => { + await runWrangler("queues purge --help"); + expect(std.err).toMatchInlineSnapshot(`""`); + expect(std.out).toMatchInlineSnapshot(` + "wrangler queues purge + + Purge messages from a Queue + + POSITIONALS + name The name of the queue [string] [required] + + GLOBAL FLAGS + -c, --config Path to Wrangler configuration file [string] + --cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string] + -e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string] + -h, --help Show help [boolean] + -v, --version Show version number [boolean]" + `); + }); + + it("should call the /purge API endpoint", async () => { + const getrequests = mockGetQueueRequest("testQueue"); + const requests = mockPurgeRequest(); + + await runWrangler("queues purge testQueue"); + + expect(requests.count).toEqual(1); + expect(getrequests.count).toEqual(1); + + expect(std.out).toMatchInlineSnapshot(`"Started a purge operation for Queue 'testQueue'"`); + }); + }); }); diff --git a/packages/wrangler/src/queues/cli/commands/index.ts b/packages/wrangler/src/queues/cli/commands/index.ts index e82c97b4f291..ebc113854a07 100644 --- a/packages/wrangler/src/queues/cli/commands/index.ts +++ b/packages/wrangler/src/queues/cli/commands/index.ts @@ -9,6 +9,7 @@ import { options as pauseResumeOptions, resumeHandler, } from "./pause-resume"; +import { handler as purgeHandler, options as purgeOptions } from "./purge"; import { handler as updateHandler, options as updateOptions } from "./update"; import type { CommonYargsArgv } from "../../../yargs-types"; @@ -65,5 +66,12 @@ export function queues(yargs: CommonYargsArgv) { resumeHandler ); + yargs.command( + "purge ", + "Purge messages from a Queue", + purgeOptions, + purgeHandler + ); + yargs.fail(HandleUnauthorizedError); } diff --git a/packages/wrangler/src/queues/cli/commands/purge.ts b/packages/wrangler/src/queues/cli/commands/purge.ts new file mode 100644 index 000000000000..adffcbcb3b3a --- /dev/null +++ b/packages/wrangler/src/queues/cli/commands/purge.ts @@ -0,0 +1,39 @@ +import { readConfig } from "../../../config"; +import { prompt } from "../../../dialogs"; +import { FatalError } from "../../../errors"; +import isInteractive from "../../../is-interactive"; +import { logger } from "../../../logger"; +import { purgeQueue } from "../../client"; +import type { + CommonYargsArgv, + StrictYargsOptionsToInterface, +} from "../../../yargs-types"; + +export function options(yargs: CommonYargsArgv) { + return yargs.positional("name", { + type: "string", + demandOption: true, + description: "The name of the queue", + }); +} + +export async function handler( + args: StrictYargsOptionsToInterface +) { + const config = readConfig(args); + + if (isInteractive()) { + const result = await prompt( + `This operation will permanently delete all the messages in Queue ${args.name}. Type ${args.name} to proceed.` + ); + if (result !== args.name) { + throw new FatalError( + "Incorrect queue name provided. Skipping purge operation" + ); + } + } + + await purgeQueue(config, args.name); + + logger.log(`Started a purge operation for Queue '${args.name}'`); +} diff --git a/packages/wrangler/src/queues/client.ts b/packages/wrangler/src/queues/client.ts index d58176bc8767..e47eab069fcd 100644 --- a/packages/wrangler/src/queues/client.ts +++ b/packages/wrangler/src/queues/client.ts @@ -85,6 +85,15 @@ export interface ConsumerSettings { retry_delay?: number; } +export interface PurgeQueueBody { + delete_messages_permanently: boolean; +} + +export interface PurgeQueueResponse { + started_at: string; + complete: boolean; +} + const queuesUrl = (accountId: string, queueId?: string): string => { let url = `/accounts/${accountId}/queues`; if (queueId) { @@ -406,3 +415,17 @@ export async function deleteWorkerConsumer( ); return deleteConsumerById(config, queue.queue_id, targetConsumer.consumer_id); } + +export async function purgeQueue( + config: Config, + queueName: string +): Promise { + const accountId = await requireAuth(config); + const queue = await getQueue(config, queueName); + const purgeURL = `${queuesUrl(accountId, queue.queue_id)}/purge`; + const body: PurgeQueueBody = { delete_messages_permanently: true }; + return fetchResult(purgeURL, { + method: "POST", + body: JSON.stringify(body), + }); +} From 09a96c90198715712e85c04c683ae8a6a0a13c42 Mon Sep 17 00:00:00 2001 From: Josh Wheeler Date: Tue, 11 Mar 2025 12:14:53 -0500 Subject: [PATCH 2/2] fixup! Add purge queue command --- .../wrangler/src/__tests__/queues.test.ts | 92 ++++++++++++++++++- .../wrangler/src/queues/cli/commands/purge.ts | 27 ++++-- 2 files changed, 108 insertions(+), 11 deletions(-) diff --git a/packages/wrangler/src/__tests__/queues.test.ts b/packages/wrangler/src/__tests__/queues.test.ts index b8c8ea8adc29..c0ecaf278b20 100644 --- a/packages/wrangler/src/__tests__/queues.test.ts +++ b/packages/wrangler/src/__tests__/queues.test.ts @@ -1,6 +1,8 @@ import { http, HttpResponse } from "msw"; import { mockAccountId, mockApiToken } from "./helpers/mock-account-id"; import { mockConsoleMethods } from "./helpers/mock-console"; +import { mockPrompt } from "./helpers/mock-dialogs"; +import { useMockIsTTY } from "./helpers/mock-istty"; import { msw } from "./helpers/msw"; import { runInTempDir } from "./helpers/run-in-tmp"; import { runWrangler } from "./helpers/run-wrangler"; @@ -2069,6 +2071,11 @@ describe("wrangler", () => { }); describe("purge", () => { + const { setIsTTY } = useMockIsTTY(); + beforeEach(() => { + setIsTTY(false); + }); + function mockPurgeRequest() { const requests = { count: 0 }; @@ -2144,20 +2151,99 @@ describe("wrangler", () => { --cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string] -e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string] -h, --help Show help [boolean] - -v, --version Show version number [boolean]" + -v, --version Show version number [boolean] + + OPTIONS + --force Skip the confirmation dialog and forcefully purge the Queue [boolean]" `); }); - it("should call the /purge API endpoint", async () => { + it("rejects a missing --force flag in non-interactive mode", async () => { + const getrequests = mockGetQueueRequest("testQueue"); + const requests = mockPurgeRequest(); + + await expect( + runWrangler("queues purge testQueue") + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: The --force flag is required to purge a Queue in non-interactive mode]` + ); + + expect(requests.count).toEqual(0); + expect(getrequests.count).toEqual(0); + + expect(std.out).toMatchInlineSnapshot(`""`); + }); + + it("allows purge with the --force flag in non-interactive mode", async () => { + const getrequests = mockGetQueueRequest("testQueue"); + const requests = mockPurgeRequest(); + + await runWrangler("queues purge testQueue --force"); + + expect(requests.count).toEqual(1); + expect(getrequests.count).toEqual(1); + + expect(std.out).toMatchInlineSnapshot(`"Purged Queue 'testQueue'"`); + }); + + it("allows purge with the --force flag in non-interactive mode", async () => { + const getrequests = mockGetQueueRequest("testQueue"); + const requests = mockPurgeRequest(); + + await runWrangler("queues purge testQueue --force"); + + expect(requests.count).toEqual(1); + expect(getrequests.count).toEqual(1); + + expect(std.out).toMatchInlineSnapshot(`"Purged Queue 'testQueue'"`); + }); + + it("allows purge with the --force flag in interactive mode", async () => { + setIsTTY(true); const getrequests = mockGetQueueRequest("testQueue"); const requests = mockPurgeRequest(); + await runWrangler("queues purge testQueue --force"); + + expect(requests.count).toEqual(1); + expect(getrequests.count).toEqual(1); + + expect(std.out).toMatchInlineSnapshot(`"Purged Queue 'testQueue'"`); + }); + it("rejects invalid confirmation in interactive mode", async () => { + setIsTTY(true); + const getrequests = mockGetQueueRequest("testQueue"); + const requests = mockPurgeRequest(); + mockPrompt({ + text: "This operation will permanently delete all the messages in Queue testQueue. Type testQueue to proceed.", + result: "wrong-name", + }); + await expect( + runWrangler("queues purge testQueue") + ).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: Incorrect queue name provided. Skipping purge operation]` + ); + + expect(requests.count).toEqual(0); + expect(getrequests.count).toEqual(0); + + expect(std.out).toMatchInlineSnapshot(`""`); + }); + + it("allows purge with correct confirmation in interactive mode", async () => { + setIsTTY(true); + const getrequests = mockGetQueueRequest("testQueue"); + const requests = mockPurgeRequest(); + mockPrompt({ + text: "This operation will permanently delete all the messages in Queue testQueue. Type testQueue to proceed.", + result: "testQueue", + }); await runWrangler("queues purge testQueue"); expect(requests.count).toEqual(1); expect(getrequests.count).toEqual(1); - expect(std.out).toMatchInlineSnapshot(`"Started a purge operation for Queue 'testQueue'"`); + expect(std.out).toMatchInlineSnapshot(`"Purged Queue 'testQueue'"`); }); }); }); diff --git a/packages/wrangler/src/queues/cli/commands/purge.ts b/packages/wrangler/src/queues/cli/commands/purge.ts index adffcbcb3b3a..fb9cab33e0da 100644 --- a/packages/wrangler/src/queues/cli/commands/purge.ts +++ b/packages/wrangler/src/queues/cli/commands/purge.ts @@ -10,11 +10,16 @@ import type { } from "../../../yargs-types"; export function options(yargs: CommonYargsArgv) { - return yargs.positional("name", { - type: "string", - demandOption: true, - description: "The name of the queue", - }); + return yargs + .positional("name", { + type: "string", + demandOption: true, + description: "The name of the queue", + }) + .option("force", { + describe: "Skip the confirmation dialog and forcefully purge the Queue", + type: "boolean", + }); } export async function handler( @@ -22,18 +27,24 @@ export async function handler( ) { const config = readConfig(args); - if (isInteractive()) { + if (!args.force && !isInteractive()) { + throw new FatalError( + "The --force flag is required to purge a Queue in non-interactive mode" + ); + } + + if (!args.force && isInteractive()) { const result = await prompt( `This operation will permanently delete all the messages in Queue ${args.name}. Type ${args.name} to proceed.` ); + if (result !== args.name) { throw new FatalError( "Incorrect queue name provided. Skipping purge operation" ); } } - await purgeQueue(config, args.name); - logger.log(`Started a purge operation for Queue '${args.name}'`); + logger.log(`Purged Queue '${args.name}'`); }