Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/shiny-zebras-switch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"wrangler": minor
---

Add two new Queues commands: pause-delivery and resume-delivery

These new commands allow users to pause and resume the delivery of messages to Queue Consumers
234 changes: 228 additions & 6 deletions packages/wrangler/src/__tests__/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ describe("wrangler", () => {
🇶 Manage Workers Queues

COMMANDS
wrangler queues list List Queues
wrangler queues create <name> Create a Queue
wrangler queues update <name> Update a Queue
wrangler queues delete <name> Delete a Queue
wrangler queues info <name> Get Queue information
wrangler queues consumer Configure Queue consumers
wrangler queues list List Queues
wrangler queues create <name> Create a Queue
wrangler queues update <name> Update a Queue
wrangler queues delete <name> Delete a Queue
wrangler queues info <name> Get Queue information
wrangler queues consumer Configure Queue consumers
wrangler queues pause-delivery <name> Pause message delivery for a Queue
wrangler queues resume-delivery <name> Resume message delivery for a Queue

GLOBAL FLAGS
-c, --config Path to Wrangler configuration file [string]
Expand Down Expand Up @@ -1844,4 +1846,224 @@ describe("wrangler", () => {
});
});
});

describe("pause-delivery", () => {
function mockUpdateRequest(queueName: string) {
const requests = { count: 0 };

msw.use(
http.patch(
"*/accounts/:accountId/queues/:queueId",
async ({ request }) => {
requests.count += 1;

const body = (await request.json()) as {
queue_name: string;
settings: {
delivery_paused: boolean;
};
};
expect(body.queue_name).toEqual(queueName);
expect(body.settings.delivery_paused).toEqual(true);
return HttpResponse.json({
success: true,
errors: [],
messages: [],
result: {
queue_name: queueName,
created_on: "01-01-2001",
modified_on: "01-01-2001",
},
});
},
{ once: true }
)
);
return requests;
}
function mockGetQueueRequest(
queueName: string,
queueSettings: {
delivery_paused: boolean;
}
) {
const requests = { count: 0 };
msw.use(
http.get(
"*/accounts/:accountId/queues?*",
async () => {
requests.count += 1;
return HttpResponse.json({
success: true,
errors: [],
messages: [],
result: [
{
queue_name: queueName,
created_on: "",
producers: [],
consumers: [],
producers_total_count: 1,
consumers_total_count: 0,
modified_on: "",
queue_id: "queueId",
settings: {
delivery_paused: queueSettings.delivery_paused,
},
},
],
});
},
{ once: true }
)
);
return requests;
}

it("should show the correct help text", async () => {
await runWrangler("queues pause-delivery --help");
expect(std.err).toMatchInlineSnapshot(`""`);
expect(std.out).toMatchInlineSnapshot(`
"wrangler queues pause-delivery <name>

Pause message delivery for a Queue

POSITIONALS
name The name of the queue [string] [required]

GLOBAL FLAGS
-c, --config Path to Wrangler configuration file [string]
--cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string]
-e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string]
-h, --help Show help [boolean]
-v, --version Show version number [boolean]"
`);
});

it("should update the queue's delivery_paused setting", async () => {
const getrequests = mockGetQueueRequest("testQueue", {
delivery_paused: false,
});
const requests = mockUpdateRequest("testQueue");
await runWrangler("queues pause-delivery testQueue");

expect(requests.count).toEqual(1);
expect(getrequests.count).toEqual(1);

expect(std.out).toMatchInlineSnapshot(`
"Pausing message delivery for queue testQueue.
Paused message delivery for queue testQueue."
`);
});
});

describe("resume-delivery", () => {
function mockUpdateRequest(queueName: string) {
const requests = { count: 0 };

msw.use(
http.patch(
"*/accounts/:accountId/queues/:queueId",
async ({ request }) => {
requests.count += 1;

const body = (await request.json()) as {
queue_name: string;
settings: {
delivery_paused: boolean;
};
};
expect(body.queue_name).toEqual(queueName);
expect(body.settings.delivery_paused).toEqual(false);
return HttpResponse.json({
success: true,
errors: [],
messages: [],
result: {
queue_name: queueName,
created_on: "01-01-2001",
modified_on: "01-01-2001",
},
});
},
{ once: true }
)
);
return requests;
}
function mockGetQueueRequest(
queueName: string,
queueSettings: {
delivery_paused: boolean;
}
) {
const requests = { count: 0 };
msw.use(
http.get(
"*/accounts/:accountId/queues?*",
async () => {
requests.count += 1;
return HttpResponse.json({
success: true,
errors: [],
messages: [],
result: [
{
queue_name: queueName,
created_on: "",
producers: [],
consumers: [],
producers_total_count: 1,
consumers_total_count: 0,
modified_on: "",
queue_id: "queueId",
settings: {
delivery_paused: queueSettings.delivery_paused,
},
},
],
});
},
{ once: true }
)
);
return requests;
}

it("should show the correct help text", async () => {
await runWrangler("queues resume-delivery --help");
expect(std.err).toMatchInlineSnapshot(`""`);
expect(std.out).toMatchInlineSnapshot(`
"wrangler queues resume-delivery <name>

Resume message delivery for a Queue

POSITIONALS
name The name of the queue [string] [required]

GLOBAL FLAGS
-c, --config Path to Wrangler configuration file [string]
--cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string]
-e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string]
-h, --help Show help [boolean]
-v, --version Show version number [boolean]"
`);
});

it("should update the queue's delivery_paused setting to false", async () => {
const getrequests = mockGetQueueRequest("testQueue", {
delivery_paused: false,
});
const requests = mockUpdateRequest("testQueue");
await runWrangler("queues resume-delivery testQueue");

expect(requests.count).toEqual(1);
expect(getrequests.count).toEqual(1);

expect(std.out).toMatchInlineSnapshot(`
"Resuming message delivery for queue testQueue.
Resumed message delivery for queue testQueue."
`);
});
});
});
19 changes: 19 additions & 0 deletions packages/wrangler/src/queues/cli/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import { handler as createHandler, options as createOptions } from "./create";
import { handler as deleteHandler, options as deleteOptions } from "./delete";
import { handler as infoHandler, options as infoOptions } from "./info";
import { handler as listHandler, options as listOptions } from "./list";
import {
pauseHandler,
options as pauseResumeOptions,
resumeHandler,
} from "./pause-resume";
import { handler as updateHandler, options as updateOptions } from "./update";
import type { CommonYargsArgv } from "../../../yargs-types";

Expand Down Expand Up @@ -46,5 +51,19 @@ export function queues(yargs: CommonYargsArgv) {
}
);

yargs.command(
"pause-delivery <name>",
"Pause message delivery for a Queue",
pauseResumeOptions,
pauseHandler
);

yargs.command(
"resume-delivery <name>",
"Resume message delivery for a Queue",
pauseResumeOptions,
resumeHandler
);

yargs.fail(HandleUnauthorizedError);
}
55 changes: 55 additions & 0 deletions packages/wrangler/src/queues/cli/commands/pause-resume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { readConfig } from "../../../config";
import { logger } from "../../../logger";
import { getQueue, updateQueue } from "../../client";
import { handleFetchError } from "../../utils";
import type {
CommonYargsArgv,
StrictYargsOptionsToInterface,
} from "../../../yargs-types";
import type { PostQueueBody } from "../../client";

export function options(yargs: CommonYargsArgv) {
return yargs.positional("name", {
type: "string",
demandOption: true,
description: "The name of the queue",
});
}

export async function pauseHandler(
args: StrictYargsOptionsToInterface<typeof options>
) {
return toggleDeliveryPaused(args, true);
}

export async function resumeHandler(
args: StrictYargsOptionsToInterface<typeof options>
) {
return toggleDeliveryPaused(args, false);
}

async function toggleDeliveryPaused(
args: StrictYargsOptionsToInterface<typeof options>,
paused: boolean
) {
const config = readConfig(args);
const body: PostQueueBody = {
queue_name: args.name,
settings: {
delivery_paused: paused,
},
};
try {
const currentQueue = await getQueue(config, args.name);

let msg = paused ? "Pausing" : "Resuming";
logger.log(`${msg} message delivery for queue ${args.name}.`);

await updateQueue(config, body, currentQueue.queue_id);

msg = paused ? "Paused" : "Resumed";
logger.log(`${msg} message delivery for queue ${args.name}.`);
} catch (e) {
handleFetchError(e as { code?: number });
}
}
1 change: 1 addition & 0 deletions packages/wrangler/src/queues/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ interface WorkerService {

export interface QueueSettings {
delivery_delay?: number;
delivery_paused?: boolean;
message_retention_period?: number;
}

Expand Down
Loading