Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/lemon-areas-tap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"wrangler": minor
---

Add new command to purge a Queue

This new command can be used to delete all existing messages in a Queue
94 changes: 94 additions & 0 deletions packages/wrangler/src/__tests__/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ describe("wrangler", () => {
wrangler queues consumer Configure Queue consumers
wrangler queues pause-delivery <name> Pause message delivery for a Queue
wrangler queues resume-delivery <name> Resume message delivery for a Queue
wrangler queues purge <name> Purge messages from a Queue

GLOBAL FLAGS
-c, --config Path to Wrangler configuration file [string]
Expand Down Expand Up @@ -2066,4 +2067,97 @@ describe("wrangler", () => {
`);
});
});

describe("purge", () => {
function mockPurgeRequest() {
const requests = { count: 0 };

msw.use(
http.post(
"*/accounts/:accountId/queues/:queueId/purge",
async ({ request }) => {
requests.count += 1;

const body = (await request.json()) as {
delete_messages_permanently: boolean;
};
expect(body.delete_messages_permanently).toEqual(true);
return HttpResponse.json({
success: true,
errors: [],
messages: [],
result: {
started_on: "01-01-2001",
complete: false,
},
});
},
{ once: true }
)
);
return requests;
}
function mockGetQueueRequest(queueName: string) {
const requests = { count: 0 };
msw.use(
http.get(
"*/accounts/:accountId/queues?*",
async () => {
requests.count += 1;
return HttpResponse.json({
success: true,
errors: [],
messages: [],
result: [
{
queue_name: queueName,
created_on: "",
producers: [],
consumers: [],
producers_total_count: 1,
consumers_total_count: 0,
modified_on: "",
queue_id: "queueId",
},
],
});
},
{ once: true }
)
);
return requests;
}

it("should show the correct help text", async () => {
await runWrangler("queues purge --help");
expect(std.err).toMatchInlineSnapshot(`""`);
expect(std.out).toMatchInlineSnapshot(`
"wrangler queues purge <name>

Purge messages from a Queue

POSITIONALS
name The name of the queue [string] [required]

GLOBAL FLAGS
-c, --config Path to Wrangler configuration file [string]
--cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string]
-e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string]
-h, --help Show help [boolean]
-v, --version Show version number [boolean]"
`);
});

it("should call the /purge API endpoint", async () => {
const getrequests = mockGetQueueRequest("testQueue");
const requests = mockPurgeRequest();

await runWrangler("queues purge testQueue");

expect(requests.count).toEqual(1);
expect(getrequests.count).toEqual(1);

expect(std.out).toMatchInlineSnapshot(`"Started a purge operation for Queue 'testQueue'"`);
});
});
});
8 changes: 8 additions & 0 deletions packages/wrangler/src/queues/cli/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
options as pauseResumeOptions,
resumeHandler,
} from "./pause-resume";
import { handler as purgeHandler, options as purgeOptions } from "./purge";
import { handler as updateHandler, options as updateOptions } from "./update";
import type { CommonYargsArgv } from "../../../yargs-types";

Expand Down Expand Up @@ -65,5 +66,12 @@ export function queues(yargs: CommonYargsArgv) {
resumeHandler
);

yargs.command(
"purge <name>",
"Purge messages from a Queue",
purgeOptions,
purgeHandler
);

yargs.fail(HandleUnauthorizedError);
}
39 changes: 39 additions & 0 deletions packages/wrangler/src/queues/cli/commands/purge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { readConfig } from "../../../config";
import { prompt } from "../../../dialogs";
import { FatalError } from "../../../errors";
import isInteractive from "../../../is-interactive";
import { logger } from "../../../logger";
import { purgeQueue } from "../../client";
import type {
CommonYargsArgv,
StrictYargsOptionsToInterface,
} from "../../../yargs-types";

export function options(yargs: CommonYargsArgv) {
return yargs.positional("name", {
type: "string",
demandOption: true,
description: "The name of the queue",
});
}

export async function handler(
args: StrictYargsOptionsToInterface<typeof options>
) {
const config = readConfig(args);

if (isInteractive()) {
Copy link
Contributor

@emily-shen emily-shen Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to add a --force flag so there's a similar check in non-interactive mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call! I added the flag along with some better testing

const result = await prompt(
`This operation will permanently delete all the messages in Queue ${args.name}. Type ${args.name} to proceed.`
);
if (result !== args.name) {
throw new FatalError(
"Incorrect queue name provided. Skipping purge operation"
);
}
}

await purgeQueue(config, args.name);

logger.log(`Started a purge operation for Queue '${args.name}'`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: with that message I sort of expect there to be a Completed purge operation log eventually.
is the purge expected to take a while? how can the user confirm that this has succeeded/finished?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reworded this to avoid the confusion here!

}
23 changes: 23 additions & 0 deletions packages/wrangler/src/queues/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ export interface ConsumerSettings {
retry_delay?: number;
}

export interface PurgeQueueBody {
delete_messages_permanently: boolean;
}

export interface PurgeQueueResponse {
started_at: string;
complete: boolean;
}

const queuesUrl = (accountId: string, queueId?: string): string => {
let url = `/accounts/${accountId}/queues`;
if (queueId) {
Expand Down Expand Up @@ -406,3 +415,17 @@ export async function deleteWorkerConsumer(
);
return deleteConsumerById(config, queue.queue_id, targetConsumer.consumer_id);
}

export async function purgeQueue(
config: Config,
queueName: string
): Promise<void> {
const accountId = await requireAuth(config);
const queue = await getQueue(config, queueName);
const purgeURL = `${queuesUrl(accountId, queue.queue_id)}/purge`;
const body: PurgeQueueBody = { delete_messages_permanently: true };
return fetchResult(purgeURL, {
method: "POST",
body: JSON.stringify(body),
});
}
Loading