diff --git a/.changeset/shiny-zebras-switch.md b/.changeset/shiny-zebras-switch.md new file mode 100644 index 000000000000..d7f084303dbd --- /dev/null +++ b/.changeset/shiny-zebras-switch.md @@ -0,0 +1,7 @@ +--- +"wrangler": minor +--- + +Add two new Queues commands: pause-delivery and resume-delivery + +These new commands allow users to pause and resume the delivery of messages to Queue Consumers diff --git a/packages/wrangler/src/__tests__/queues.test.ts b/packages/wrangler/src/__tests__/queues.test.ts index 1bc43d89e49d..43c19fb4f9fe 100644 --- a/packages/wrangler/src/__tests__/queues.test.ts +++ b/packages/wrangler/src/__tests__/queues.test.ts @@ -27,12 +27,14 @@ describe("wrangler", () => { 🇶 Manage Workers Queues COMMANDS - wrangler queues list List Queues - wrangler queues create Create a Queue - wrangler queues update Update a Queue - wrangler queues delete Delete a Queue - wrangler queues info Get Queue information - wrangler queues consumer Configure Queue consumers + wrangler queues list List Queues + wrangler queues create Create a Queue + wrangler queues update Update a Queue + wrangler queues delete Delete a Queue + wrangler queues info Get Queue information + wrangler queues consumer Configure Queue consumers + wrangler queues pause-delivery Pause message delivery for a Queue + wrangler queues resume-delivery Resume message delivery for a Queue GLOBAL FLAGS -c, --config Path to Wrangler configuration file [string] @@ -1844,4 +1846,224 @@ describe("wrangler", () => { }); }); }); + + describe("pause-delivery", () => { + function mockUpdateRequest(queueName: string) { + const requests = { count: 0 }; + + msw.use( + http.patch( + "*/accounts/:accountId/queues/:queueId", + async ({ request }) => { + requests.count += 1; + + const body = (await request.json()) as { + queue_name: string; + settings: { + delivery_paused: boolean; + }; + }; + expect(body.queue_name).toEqual(queueName); + expect(body.settings.delivery_paused).toEqual(true); + return HttpResponse.json({ + success: true, + errors: [], + messages: [], + result: { + queue_name: queueName, + created_on: "01-01-2001", + modified_on: "01-01-2001", + }, + }); + }, + { once: true } + ) + ); + return requests; + } + function mockGetQueueRequest( + queueName: string, + queueSettings: { + delivery_paused: boolean; + } + ) { + const requests = { count: 0 }; + msw.use( + http.get( + "*/accounts/:accountId/queues?*", + async () => { + requests.count += 1; + return HttpResponse.json({ + success: true, + errors: [], + messages: [], + result: [ + { + queue_name: queueName, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + queue_id: "queueId", + settings: { + delivery_paused: queueSettings.delivery_paused, + }, + }, + ], + }); + }, + { once: true } + ) + ); + return requests; + } + + it("should show the correct help text", async () => { + await runWrangler("queues pause-delivery --help"); + expect(std.err).toMatchInlineSnapshot(`""`); + expect(std.out).toMatchInlineSnapshot(` + "wrangler queues pause-delivery + + Pause message delivery for a Queue + + POSITIONALS + name The name of the queue [string] [required] + + GLOBAL FLAGS + -c, --config Path to Wrangler configuration file [string] + --cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string] + -e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string] + -h, --help Show help [boolean] + -v, --version Show version number [boolean]" + `); + }); + + it("should update the queue's delivery_paused setting", async () => { + const getrequests = mockGetQueueRequest("testQueue", { + delivery_paused: false, + }); + const requests = mockUpdateRequest("testQueue"); + await runWrangler("queues pause-delivery testQueue"); + + expect(requests.count).toEqual(1); + expect(getrequests.count).toEqual(1); + + expect(std.out).toMatchInlineSnapshot(` + "Pausing message delivery for queue testQueue. + Paused message delivery for queue testQueue." + `); + }); + }); + + describe("resume-delivery", () => { + function mockUpdateRequest(queueName: string) { + const requests = { count: 0 }; + + msw.use( + http.patch( + "*/accounts/:accountId/queues/:queueId", + async ({ request }) => { + requests.count += 1; + + const body = (await request.json()) as { + queue_name: string; + settings: { + delivery_paused: boolean; + }; + }; + expect(body.queue_name).toEqual(queueName); + expect(body.settings.delivery_paused).toEqual(false); + return HttpResponse.json({ + success: true, + errors: [], + messages: [], + result: { + queue_name: queueName, + created_on: "01-01-2001", + modified_on: "01-01-2001", + }, + }); + }, + { once: true } + ) + ); + return requests; + } + function mockGetQueueRequest( + queueName: string, + queueSettings: { + delivery_paused: boolean; + } + ) { + const requests = { count: 0 }; + msw.use( + http.get( + "*/accounts/:accountId/queues?*", + async () => { + requests.count += 1; + return HttpResponse.json({ + success: true, + errors: [], + messages: [], + result: [ + { + queue_name: queueName, + created_on: "", + producers: [], + consumers: [], + producers_total_count: 1, + consumers_total_count: 0, + modified_on: "", + queue_id: "queueId", + settings: { + delivery_paused: queueSettings.delivery_paused, + }, + }, + ], + }); + }, + { once: true } + ) + ); + return requests; + } + + it("should show the correct help text", async () => { + await runWrangler("queues resume-delivery --help"); + expect(std.err).toMatchInlineSnapshot(`""`); + expect(std.out).toMatchInlineSnapshot(` + "wrangler queues resume-delivery + + Resume message delivery for a Queue + + POSITIONALS + name The name of the queue [string] [required] + + GLOBAL FLAGS + -c, --config Path to Wrangler configuration file [string] + --cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string] + -e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string] + -h, --help Show help [boolean] + -v, --version Show version number [boolean]" + `); + }); + + it("should update the queue's delivery_paused setting to false", async () => { + const getrequests = mockGetQueueRequest("testQueue", { + delivery_paused: false, + }); + const requests = mockUpdateRequest("testQueue"); + await runWrangler("queues resume-delivery testQueue"); + + expect(requests.count).toEqual(1); + expect(getrequests.count).toEqual(1); + + expect(std.out).toMatchInlineSnapshot(` + "Resuming message delivery for queue testQueue. + Resumed message delivery for queue testQueue." + `); + }); + }); }); diff --git a/packages/wrangler/src/queues/cli/commands/index.ts b/packages/wrangler/src/queues/cli/commands/index.ts index 5d84c45528ed..e82c97b4f291 100644 --- a/packages/wrangler/src/queues/cli/commands/index.ts +++ b/packages/wrangler/src/queues/cli/commands/index.ts @@ -4,6 +4,11 @@ import { handler as createHandler, options as createOptions } from "./create"; import { handler as deleteHandler, options as deleteOptions } from "./delete"; import { handler as infoHandler, options as infoOptions } from "./info"; import { handler as listHandler, options as listOptions } from "./list"; +import { + pauseHandler, + options as pauseResumeOptions, + resumeHandler, +} from "./pause-resume"; import { handler as updateHandler, options as updateOptions } from "./update"; import type { CommonYargsArgv } from "../../../yargs-types"; @@ -46,5 +51,19 @@ export function queues(yargs: CommonYargsArgv) { } ); + yargs.command( + "pause-delivery ", + "Pause message delivery for a Queue", + pauseResumeOptions, + pauseHandler + ); + + yargs.command( + "resume-delivery ", + "Resume message delivery for a Queue", + pauseResumeOptions, + resumeHandler + ); + yargs.fail(HandleUnauthorizedError); } diff --git a/packages/wrangler/src/queues/cli/commands/pause-resume.ts b/packages/wrangler/src/queues/cli/commands/pause-resume.ts new file mode 100644 index 000000000000..96a35d111338 --- /dev/null +++ b/packages/wrangler/src/queues/cli/commands/pause-resume.ts @@ -0,0 +1,55 @@ +import { readConfig } from "../../../config"; +import { logger } from "../../../logger"; +import { getQueue, updateQueue } from "../../client"; +import { handleFetchError } from "../../utils"; +import type { + CommonYargsArgv, + StrictYargsOptionsToInterface, +} from "../../../yargs-types"; +import type { PostQueueBody } from "../../client"; + +export function options(yargs: CommonYargsArgv) { + return yargs.positional("name", { + type: "string", + demandOption: true, + description: "The name of the queue", + }); +} + +export async function pauseHandler( + args: StrictYargsOptionsToInterface +) { + return toggleDeliveryPaused(args, true); +} + +export async function resumeHandler( + args: StrictYargsOptionsToInterface +) { + return toggleDeliveryPaused(args, false); +} + +async function toggleDeliveryPaused( + args: StrictYargsOptionsToInterface, + paused: boolean +) { + const config = readConfig(args); + const body: PostQueueBody = { + queue_name: args.name, + settings: { + delivery_paused: paused, + }, + }; + try { + const currentQueue = await getQueue(config, args.name); + + let msg = paused ? "Pausing" : "Resuming"; + logger.log(`${msg} message delivery for queue ${args.name}.`); + + await updateQueue(config, body, currentQueue.queue_id); + + msg = paused ? "Paused" : "Resumed"; + logger.log(`${msg} message delivery for queue ${args.name}.`); + } catch (e) { + handleFetchError(e as { code?: number }); + } +} diff --git a/packages/wrangler/src/queues/client.ts b/packages/wrangler/src/queues/client.ts index 0608953895d0..d58176bc8767 100644 --- a/packages/wrangler/src/queues/client.ts +++ b/packages/wrangler/src/queues/client.ts @@ -19,6 +19,7 @@ interface WorkerService { export interface QueueSettings { delivery_delay?: number; + delivery_paused?: boolean; message_retention_period?: number; }