Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/lemon-areas-tap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"wrangler": minor
---

Add new command to purge a Queue

This new command can be used to delete all existing messages in a Queue
180 changes: 180 additions & 0 deletions packages/wrangler/src/__tests__/queues.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { http, HttpResponse } from "msw";
import { mockAccountId, mockApiToken } from "./helpers/mock-account-id";
import { mockConsoleMethods } from "./helpers/mock-console";
import { mockPrompt } from "./helpers/mock-dialogs";
import { useMockIsTTY } from "./helpers/mock-istty";
import { msw } from "./helpers/msw";
import { runInTempDir } from "./helpers/run-in-tmp";
import { runWrangler } from "./helpers/run-wrangler";
Expand Down Expand Up @@ -35,6 +37,7 @@ describe("wrangler", () => {
wrangler queues consumer Configure Queue consumers
wrangler queues pause-delivery <name> Pause message delivery for a Queue
wrangler queues resume-delivery <name> Resume message delivery for a Queue
wrangler queues purge <name> Purge messages from a Queue

GLOBAL FLAGS
-c, --config Path to Wrangler configuration file [string]
Expand Down Expand Up @@ -2066,4 +2069,181 @@ describe("wrangler", () => {
`);
});
});

describe("purge", () => {
const { setIsTTY } = useMockIsTTY();
beforeEach(() => {
setIsTTY(false);
});

function mockPurgeRequest() {
const requests = { count: 0 };

msw.use(
http.post(
"*/accounts/:accountId/queues/:queueId/purge",
async ({ request }) => {
requests.count += 1;

const body = (await request.json()) as {
delete_messages_permanently: boolean;
};
expect(body.delete_messages_permanently).toEqual(true);
return HttpResponse.json({
success: true,
errors: [],
messages: [],
result: {
started_on: "01-01-2001",
complete: false,
},
});
},
{ once: true }
)
);
return requests;
}
function mockGetQueueRequest(queueName: string) {
const requests = { count: 0 };
msw.use(
http.get(
"*/accounts/:accountId/queues?*",
async () => {
requests.count += 1;
return HttpResponse.json({
success: true,
errors: [],
messages: [],
result: [
{
queue_name: queueName,
created_on: "",
producers: [],
consumers: [],
producers_total_count: 1,
consumers_total_count: 0,
modified_on: "",
queue_id: "queueId",
},
],
});
},
{ once: true }
)
);
return requests;
}

it("should show the correct help text", async () => {
await runWrangler("queues purge --help");
expect(std.err).toMatchInlineSnapshot(`""`);
expect(std.out).toMatchInlineSnapshot(`
"wrangler queues purge <name>

Purge messages from a Queue

POSITIONALS
name The name of the queue [string] [required]

GLOBAL FLAGS
-c, --config Path to Wrangler configuration file [string]
--cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string]
-e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string]
-h, --help Show help [boolean]
-v, --version Show version number [boolean]

OPTIONS
--force Skip the confirmation dialog and forcefully purge the Queue [boolean]"
`);
});

it("rejects a missing --force flag in non-interactive mode", async () => {
const getrequests = mockGetQueueRequest("testQueue");
const requests = mockPurgeRequest();

await expect(
runWrangler("queues purge testQueue")
).rejects.toThrowErrorMatchingInlineSnapshot(
`[Error: The --force flag is required to purge a Queue in non-interactive mode]`
);

expect(requests.count).toEqual(0);
expect(getrequests.count).toEqual(0);

expect(std.out).toMatchInlineSnapshot(`""`);
});

it("allows purge with the --force flag in non-interactive mode", async () => {
const getrequests = mockGetQueueRequest("testQueue");
const requests = mockPurgeRequest();

await runWrangler("queues purge testQueue --force");

expect(requests.count).toEqual(1);
expect(getrequests.count).toEqual(1);

expect(std.out).toMatchInlineSnapshot(`"Purged Queue 'testQueue'"`);
});

it("allows purge with the --force flag in non-interactive mode", async () => {
const getrequests = mockGetQueueRequest("testQueue");
const requests = mockPurgeRequest();

await runWrangler("queues purge testQueue --force");

expect(requests.count).toEqual(1);
expect(getrequests.count).toEqual(1);

expect(std.out).toMatchInlineSnapshot(`"Purged Queue 'testQueue'"`);
});

it("allows purge with the --force flag in interactive mode", async () => {
setIsTTY(true);
const getrequests = mockGetQueueRequest("testQueue");
const requests = mockPurgeRequest();
await runWrangler("queues purge testQueue --force");

expect(requests.count).toEqual(1);
expect(getrequests.count).toEqual(1);

expect(std.out).toMatchInlineSnapshot(`"Purged Queue 'testQueue'"`);
});

it("rejects invalid confirmation in interactive mode", async () => {
setIsTTY(true);
const getrequests = mockGetQueueRequest("testQueue");
const requests = mockPurgeRequest();
mockPrompt({
text: "This operation will permanently delete all the messages in Queue testQueue. Type testQueue to proceed.",
result: "wrong-name",
});
await expect(
runWrangler("queues purge testQueue")
).rejects.toThrowErrorMatchingInlineSnapshot(
`[Error: Incorrect queue name provided. Skipping purge operation]`
);

expect(requests.count).toEqual(0);
expect(getrequests.count).toEqual(0);

expect(std.out).toMatchInlineSnapshot(`""`);
});

it("allows purge with correct confirmation in interactive mode", async () => {
setIsTTY(true);
const getrequests = mockGetQueueRequest("testQueue");
const requests = mockPurgeRequest();
mockPrompt({
text: "This operation will permanently delete all the messages in Queue testQueue. Type testQueue to proceed.",
result: "testQueue",
});
await runWrangler("queues purge testQueue");

expect(requests.count).toEqual(1);
expect(getrequests.count).toEqual(1);

expect(std.out).toMatchInlineSnapshot(`"Purged Queue 'testQueue'"`);
});
});
});
8 changes: 8 additions & 0 deletions packages/wrangler/src/queues/cli/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
options as pauseResumeOptions,
resumeHandler,
} from "./pause-resume";
import { handler as purgeHandler, options as purgeOptions } from "./purge";
import { handler as updateHandler, options as updateOptions } from "./update";
import type { CommonYargsArgv } from "../../../yargs-types";

Expand Down Expand Up @@ -65,5 +66,12 @@ export function queues(yargs: CommonYargsArgv) {
resumeHandler
);

yargs.command(
"purge <name>",
"Purge messages from a Queue",
purgeOptions,
purgeHandler
);

yargs.fail(HandleUnauthorizedError);
}
50 changes: 50 additions & 0 deletions packages/wrangler/src/queues/cli/commands/purge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { readConfig } from "../../../config";
import { prompt } from "../../../dialogs";
import { FatalError } from "../../../errors";
import isInteractive from "../../../is-interactive";
import { logger } from "../../../logger";
import { purgeQueue } from "../../client";
import type {
CommonYargsArgv,
StrictYargsOptionsToInterface,
} from "../../../yargs-types";

export function options(yargs: CommonYargsArgv) {
return yargs
.positional("name", {
type: "string",
demandOption: true,
description: "The name of the queue",
})
.option("force", {
describe: "Skip the confirmation dialog and forcefully purge the Queue",
type: "boolean",
});
}

export async function handler(
args: StrictYargsOptionsToInterface<typeof options>
) {
const config = readConfig(args);

if (!args.force && !isInteractive()) {
throw new FatalError(
"The --force flag is required to purge a Queue in non-interactive mode"
);
}

if (!args.force && isInteractive()) {
const result = await prompt(
`This operation will permanently delete all the messages in Queue ${args.name}. Type ${args.name} to proceed.`
);

if (result !== args.name) {
throw new FatalError(
"Incorrect queue name provided. Skipping purge operation"
);
}
}
await purgeQueue(config, args.name);

logger.log(`Purged Queue '${args.name}'`);
}
23 changes: 23 additions & 0 deletions packages/wrangler/src/queues/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ export interface ConsumerSettings {
retry_delay?: number;
}

export interface PurgeQueueBody {
delete_messages_permanently: boolean;
}

export interface PurgeQueueResponse {
started_at: string;
complete: boolean;
}

const queuesUrl = (accountId: string, queueId?: string): string => {
let url = `/accounts/${accountId}/queues`;
if (queueId) {
Expand Down Expand Up @@ -406,3 +415,17 @@ export async function deleteWorkerConsumer(
);
return deleteConsumerById(config, queue.queue_id, targetConsumer.consumer_id);
}

export async function purgeQueue(
config: Config,
queueName: string
): Promise<void> {
const accountId = await requireAuth(config);
const queue = await getQueue(config, queueName);
const purgeURL = `${queuesUrl(accountId, queue.queue_id)}/purge`;
const body: PurgeQueueBody = { delete_messages_permanently: true };
return fetchResult(purgeURL, {
method: "POST",
body: JSON.stringify(body),
});
}
Loading