Skip to content

Commit dc6b5dd

Browse files
committed
Add Queues commands to pause-delivery and resume-delivery
1 parent d86e845 commit dc6b5dd

File tree

5 files changed

+310
-6
lines changed

5 files changed

+310
-6
lines changed

.changeset/shiny-zebras-switch.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"wrangler": minor
3+
---
4+
5+
Add two new Queues commands: pause-delivery and resume-delivery
6+
7+
These new commands allow users to pause and resume the delivery of messages to Queue Consumers

packages/wrangler/src/__tests__/queues.test.ts

Lines changed: 228 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@ describe("wrangler", () => {
2727
🇶 Manage Workers Queues
2828
2929
COMMANDS
30-
wrangler queues list List Queues
31-
wrangler queues create <name> Create a Queue
32-
wrangler queues update <name> Update a Queue
33-
wrangler queues delete <name> Delete a Queue
34-
wrangler queues info <name> Get Queue information
35-
wrangler queues consumer Configure Queue consumers
30+
wrangler queues list List Queues
31+
wrangler queues create <name> Create a Queue
32+
wrangler queues update <name> Update a Queue
33+
wrangler queues delete <name> Delete a Queue
34+
wrangler queues info <name> Get Queue information
35+
wrangler queues consumer Configure Queue consumers
36+
wrangler queues pause-delivery <name> Pause message delivery for a Queue
37+
wrangler queues resume-delivery <name> Resume message delivery for a Queue
3638
3739
GLOBAL FLAGS
3840
-c, --config Path to Wrangler configuration file [string]
@@ -1844,4 +1846,224 @@ describe("wrangler", () => {
18441846
});
18451847
});
18461848
});
1849+
1850+
describe("pause-delivery", () => {
1851+
function mockUpdateRequest(queueName: string) {
1852+
const requests = { count: 0 };
1853+
1854+
msw.use(
1855+
http.patch(
1856+
"*/accounts/:accountId/queues/:queueId",
1857+
async ({ request }) => {
1858+
requests.count += 1;
1859+
1860+
const body = (await request.json()) as {
1861+
queue_name: string;
1862+
settings: {
1863+
delivery_paused: boolean;
1864+
};
1865+
};
1866+
expect(body.queue_name).toEqual(queueName);
1867+
expect(body.settings.delivery_paused).toEqual(true);
1868+
return HttpResponse.json({
1869+
success: true,
1870+
errors: [],
1871+
messages: [],
1872+
result: {
1873+
queue_name: queueName,
1874+
created_on: "01-01-2001",
1875+
modified_on: "01-01-2001",
1876+
},
1877+
});
1878+
},
1879+
{ once: true }
1880+
)
1881+
);
1882+
return requests;
1883+
}
1884+
function mockGetQueueRequest(
1885+
queueName: string,
1886+
queueSettings: {
1887+
delivery_paused: boolean;
1888+
}
1889+
) {
1890+
const requests = { count: 0 };
1891+
msw.use(
1892+
http.get(
1893+
"*/accounts/:accountId/queues?*",
1894+
async () => {
1895+
requests.count += 1;
1896+
return HttpResponse.json({
1897+
success: true,
1898+
errors: [],
1899+
messages: [],
1900+
result: [
1901+
{
1902+
queue_name: queueName,
1903+
created_on: "",
1904+
producers: [],
1905+
consumers: [],
1906+
producers_total_count: 1,
1907+
consumers_total_count: 0,
1908+
modified_on: "",
1909+
queue_id: "queueId",
1910+
settings: {
1911+
delivery_paused: queueSettings.delivery_paused,
1912+
},
1913+
},
1914+
],
1915+
});
1916+
},
1917+
{ once: true }
1918+
)
1919+
);
1920+
return requests;
1921+
}
1922+
1923+
it("should show the correct help text", async () => {
1924+
await runWrangler("queues pause-delivery --help");
1925+
expect(std.err).toMatchInlineSnapshot(`""`);
1926+
expect(std.out).toMatchInlineSnapshot(`
1927+
"wrangler queues pause-delivery <name>
1928+
1929+
Pause message delivery for a Queue
1930+
1931+
POSITIONALS
1932+
name The name of the queue [string] [required]
1933+
1934+
GLOBAL FLAGS
1935+
-c, --config Path to Wrangler configuration file [string]
1936+
--cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string]
1937+
-e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string]
1938+
-h, --help Show help [boolean]
1939+
-v, --version Show version number [boolean]"
1940+
`);
1941+
});
1942+
1943+
it("should update the queue's delivery_paused setting", async () => {
1944+
const getrequests = mockGetQueueRequest("testQueue", {
1945+
delivery_paused: false,
1946+
});
1947+
const requests = mockUpdateRequest("testQueue");
1948+
await runWrangler("queues pause-delivery testQueue");
1949+
1950+
expect(requests.count).toEqual(1);
1951+
expect(getrequests.count).toEqual(1);
1952+
1953+
expect(std.out).toMatchInlineSnapshot(`
1954+
"Pausing message delivery for queue testQueue.
1955+
Paused message delivery for queue testQueue."
1956+
`);
1957+
});
1958+
});
1959+
1960+
describe("resume-delivery", () => {
1961+
function mockUpdateRequest(queueName: string) {
1962+
const requests = { count: 0 };
1963+
1964+
msw.use(
1965+
http.patch(
1966+
"*/accounts/:accountId/queues/:queueId",
1967+
async ({ request }) => {
1968+
requests.count += 1;
1969+
1970+
const body = (await request.json()) as {
1971+
queue_name: string;
1972+
settings: {
1973+
delivery_paused: boolean;
1974+
};
1975+
};
1976+
expect(body.queue_name).toEqual(queueName);
1977+
expect(body.settings.delivery_paused).toEqual(false);
1978+
return HttpResponse.json({
1979+
success: true,
1980+
errors: [],
1981+
messages: [],
1982+
result: {
1983+
queue_name: queueName,
1984+
created_on: "01-01-2001",
1985+
modified_on: "01-01-2001",
1986+
},
1987+
});
1988+
},
1989+
{ once: true }
1990+
)
1991+
);
1992+
return requests;
1993+
}
1994+
function mockGetQueueRequest(
1995+
queueName: string,
1996+
queueSettings: {
1997+
delivery_paused: boolean;
1998+
}
1999+
) {
2000+
const requests = { count: 0 };
2001+
msw.use(
2002+
http.get(
2003+
"*/accounts/:accountId/queues?*",
2004+
async () => {
2005+
requests.count += 1;
2006+
return HttpResponse.json({
2007+
success: true,
2008+
errors: [],
2009+
messages: [],
2010+
result: [
2011+
{
2012+
queue_name: queueName,
2013+
created_on: "",
2014+
producers: [],
2015+
consumers: [],
2016+
producers_total_count: 1,
2017+
consumers_total_count: 0,
2018+
modified_on: "",
2019+
queue_id: "queueId",
2020+
settings: {
2021+
delivery_paused: queueSettings.delivery_paused,
2022+
},
2023+
},
2024+
],
2025+
});
2026+
},
2027+
{ once: true }
2028+
)
2029+
);
2030+
return requests;
2031+
}
2032+
2033+
it("should show the correct help text", async () => {
2034+
await runWrangler("queues resume-delivery --help");
2035+
expect(std.err).toMatchInlineSnapshot(`""`);
2036+
expect(std.out).toMatchInlineSnapshot(`
2037+
"wrangler queues resume-delivery <name>
2038+
2039+
Resume message delivery for a Queue
2040+
2041+
POSITIONALS
2042+
name The name of the queue [string] [required]
2043+
2044+
GLOBAL FLAGS
2045+
-c, --config Path to Wrangler configuration file [string]
2046+
--cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string]
2047+
-e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string]
2048+
-h, --help Show help [boolean]
2049+
-v, --version Show version number [boolean]"
2050+
`);
2051+
});
2052+
2053+
it("should update the queue's delivery_paused setting to false", async () => {
2054+
const getrequests = mockGetQueueRequest("testQueue", {
2055+
delivery_paused: false,
2056+
});
2057+
const requests = mockUpdateRequest("testQueue");
2058+
await runWrangler("queues resume-delivery testQueue");
2059+
2060+
expect(requests.count).toEqual(1);
2061+
expect(getrequests.count).toEqual(1);
2062+
2063+
expect(std.out).toMatchInlineSnapshot(`
2064+
"Resuming message delivery for queue testQueue.
2065+
Resumed message delivery for queue testQueue."
2066+
`);
2067+
});
2068+
});
18472069
});

packages/wrangler/src/queues/cli/commands/index.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ import { handler as createHandler, options as createOptions } from "./create";
44
import { handler as deleteHandler, options as deleteOptions } from "./delete";
55
import { handler as infoHandler, options as infoOptions } from "./info";
66
import { handler as listHandler, options as listOptions } from "./list";
7+
import {
8+
pauseHandler,
9+
options as pauseResumeOptions,
10+
resumeHandler,
11+
} from "./pause-resume";
712
import { handler as updateHandler, options as updateOptions } from "./update";
813
import type { CommonYargsArgv } from "../../../yargs-types";
914

@@ -46,5 +51,19 @@ export function queues(yargs: CommonYargsArgv) {
4651
}
4752
);
4853

54+
yargs.command(
55+
"pause-delivery <name>",
56+
"Pause message delivery for a Queue",
57+
pauseResumeOptions,
58+
pauseHandler
59+
);
60+
61+
yargs.command(
62+
"resume-delivery <name>",
63+
"Resume message delivery for a Queue",
64+
pauseResumeOptions,
65+
resumeHandler
66+
);
67+
4968
yargs.fail(HandleUnauthorizedError);
5069
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import { readConfig } from "../../../config";
2+
import { logger } from "../../../logger";
3+
import { getQueue, updateQueue } from "../../client";
4+
import { handleFetchError } from "../../utils";
5+
import type {
6+
CommonYargsArgv,
7+
StrictYargsOptionsToInterface,
8+
} from "../../../yargs-types";
9+
import type { PostQueueBody } from "../../client";
10+
11+
export function options(yargs: CommonYargsArgv) {
12+
return yargs.positional("name", {
13+
type: "string",
14+
demandOption: true,
15+
description: "The name of the queue",
16+
});
17+
}
18+
19+
export async function pauseHandler(
20+
args: StrictYargsOptionsToInterface<typeof options>
21+
) {
22+
return toggleDeliveryPaused(args, true);
23+
}
24+
25+
export async function resumeHandler(
26+
args: StrictYargsOptionsToInterface<typeof options>
27+
) {
28+
return toggleDeliveryPaused(args, false);
29+
}
30+
31+
async function toggleDeliveryPaused(
32+
args: StrictYargsOptionsToInterface<typeof options>,
33+
paused: boolean
34+
) {
35+
const config = readConfig(args);
36+
const body: PostQueueBody = {
37+
queue_name: args.name,
38+
settings: {
39+
delivery_paused: paused,
40+
},
41+
};
42+
try {
43+
const currentQueue = await getQueue(config, args.name);
44+
45+
let msg = paused ? "Pausing" : "Resuming";
46+
logger.log(`${msg} message delivery for queue ${args.name}.`);
47+
48+
await updateQueue(config, body, currentQueue.queue_id);
49+
50+
msg = paused ? "Paused" : "Resumed";
51+
logger.log(`${msg} message delivery for queue ${args.name}.`);
52+
} catch (e) {
53+
handleFetchError(e as { code?: number });
54+
}
55+
}

packages/wrangler/src/queues/client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ interface WorkerService {
1919

2020
export interface QueueSettings {
2121
delivery_delay?: number;
22+
delivery_paused?: boolean;
2223
message_retention_period?: number;
2324
}
2425

0 commit comments

Comments
 (0)