Skip to content

Commit 1c1e1ca

Browse files
committed
feat: implement XACKDEL command for Redis 8.2
1 parent d39148c commit 1c1e1ca

File tree

3 files changed

+251
-0
lines changed

3 files changed

+251
-0
lines changed
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
import { strict as assert } from "node:assert";
2+
import XACKDEL, { XACKDEL_REPLY_CODES, XAckDelPolicy } from "./XACKDEL";
3+
import { parseArgs } from "./generic-transformers";
4+
import testUtils, { GLOBAL } from "../test-utils";
5+
6+
describe("XACKDEL", () => {
7+
describe("transformArguments", () => {
8+
it("string - without policy", () => {
9+
assert.deepEqual(parseArgs(XACKDEL, "key", "group", "0-0"), [
10+
"XACKDEL",
11+
"key",
12+
"group",
13+
"IDS",
14+
"1",
15+
"0-0",
16+
]);
17+
});
18+
19+
it("string - with policy", () => {
20+
assert.deepEqual(
21+
parseArgs(XACKDEL, "key", "group", "0-0", XAckDelPolicy.KEEPREF),
22+
["XACKDEL", "key", "group", "KEEPREF", "IDS", "1", "0-0"]
23+
);
24+
});
25+
26+
it("array - without policy", () => {
27+
assert.deepEqual(parseArgs(XACKDEL, "key", "group", ["0-0", "1-0"]), [
28+
"XACKDEL",
29+
"key",
30+
"group",
31+
"IDS",
32+
"2",
33+
"0-0",
34+
"1-0",
35+
]);
36+
});
37+
38+
it("array - with policy", () => {
39+
assert.deepEqual(
40+
parseArgs(
41+
XACKDEL,
42+
"key",
43+
"group",
44+
["0-0", "1-0"],
45+
XAckDelPolicy.DELREF
46+
),
47+
["XACKDEL", "key", "group", "DELREF", "IDS", "2", "0-0", "1-0"]
48+
);
49+
});
50+
});
51+
52+
testUtils.testAll(
53+
`XACKDEL non-existing key - without policy`,
54+
async (client) => {
55+
const reply = await client.xAckDel("{tag}stream-key", "testgroup", "0-0");
56+
assert.deepEqual(reply, [XACKDEL_REPLY_CODES.NOT_FOUND]);
57+
},
58+
{
59+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
60+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
61+
}
62+
);
63+
64+
testUtils.testAll(
65+
`XACKDEL existing key - without policy`,
66+
async (client) => {
67+
const streamKey = "{tag}stream-key";
68+
const groupName = "testgroup";
69+
70+
// create consumer group, stream and message
71+
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
72+
const messageId = await client.xAdd(streamKey, "*", { field: "value" });
73+
74+
// read message
75+
await client.xReadGroup(groupName, "testconsumer", {
76+
key: streamKey,
77+
id: ">",
78+
});
79+
80+
const reply = await client.xAckDel(streamKey, groupName, messageId);
81+
assert.deepEqual(reply, [XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED]);
82+
},
83+
{
84+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
85+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
86+
}
87+
);
88+
89+
testUtils.testAll(
90+
`XACKDEL existing key - with policy`,
91+
async (client) => {
92+
const streamKey = "{tag}stream-key";
93+
const groupName = "testgroup";
94+
95+
// create consumer group, stream and message
96+
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
97+
const messageId = await client.xAdd(streamKey, "*", { field: "value" });
98+
99+
// read message
100+
await client.xReadGroup(groupName, "testconsumer", {
101+
key: streamKey,
102+
id: ">",
103+
});
104+
105+
const reply = await client.xAckDel(
106+
streamKey,
107+
groupName,
108+
messageId,
109+
XAckDelPolicy.DELREF
110+
);
111+
assert.deepEqual(reply, [XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED]);
112+
},
113+
{
114+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
115+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
116+
}
117+
);
118+
119+
testUtils.testAll(
120+
`XACKDEL acknowledge policy - with consumer group`,
121+
async (client) => {
122+
const streamKey = "{tag}stream-key";
123+
const groupName = "testgroup";
124+
125+
// create consumer groups, stream and message
126+
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
127+
await client.xGroupCreate(streamKey, "some-other-group", "0");
128+
const messageId = await client.xAdd(streamKey, "*", { field: "value" });
129+
130+
// read message
131+
await client.xReadGroup(groupName, "testconsumer", {
132+
key: streamKey,
133+
id: ">",
134+
});
135+
136+
const reply = await client.xAckDel(
137+
streamKey,
138+
groupName,
139+
messageId,
140+
XAckDelPolicy.ACKED
141+
);
142+
assert.deepEqual(reply, [XACKDEL_REPLY_CODES.ACKNOWLEDGED_DANGLING_REFS]);
143+
},
144+
{
145+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
146+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
147+
}
148+
);
149+
150+
testUtils.testAll(
151+
`XACKDEL multiple keys`,
152+
async (client) => {
153+
const streamKey = "{tag}stream-key";
154+
const groupName = "testgroup";
155+
156+
// create consumer groups, stream and add messages
157+
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
158+
const messageIds = await Promise.all([
159+
client.xAdd(streamKey, "*", { field: "value1" }),
160+
client.xAdd(streamKey, "*", { field: "value2" }),
161+
]);
162+
163+
// read messages
164+
await client.xReadGroup(groupName, "testconsumer", {
165+
key: streamKey,
166+
id: ">",
167+
});
168+
169+
const reply = await client.xAckDel(
170+
streamKey,
171+
groupName,
172+
[...messageIds, "0-0"],
173+
XAckDelPolicy.DELREF
174+
);
175+
assert.deepEqual(reply, [
176+
XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED,
177+
XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED,
178+
XACKDEL_REPLY_CODES.NOT_FOUND,
179+
]);
180+
},
181+
{
182+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
183+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
184+
}
185+
);
186+
});
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { CommandParser } from "../client/parser";
2+
import { RedisArgument, ArrayReply, Command } from "../RESP/types";
3+
import { RedisVariadicArgument } from "./generic-transformers";
4+
5+
/** XACKDEL deletion policies */
6+
export const XAckDelPolicy = {
7+
/** Preserve references (default) */
8+
KEEPREF: "KEEPREF",
9+
/** Delete all references */
10+
DELREF: "DELREF",
11+
/** Only acknowledged entries */
12+
ACKED: "ACKED",
13+
} as const;
14+
15+
/** XACKDEL reply codes */
16+
export const XACKDEL_REPLY_CODES = {
17+
/** ID not found */
18+
NOT_FOUND: -1,
19+
/** Entry acknowledged and deleted */
20+
ACKNOWLEDGED_AND_DELETED: 1,
21+
/** Entry acknowledged but dangling references remain */
22+
ACKNOWLEDGED_DANGLING_REFS: 2,
23+
} as const;
24+
25+
/**
26+
* Acknowledges and deletes one or multiple messages for a stream consumer group
27+
*/
28+
export default {
29+
IS_READ_ONLY: false,
30+
/**
31+
* Constructs the XACKDEL command to acknowledge and delete one or multiple messages for a stream consumer group
32+
*
33+
* @param parser - The command parser
34+
* @param key - The stream key
35+
* @param group - The consumer group name
36+
* @param id - One or more message IDs to acknowledge and delete
37+
* @param policy - Policy to apply when deleting entries (optional, defaults to KEEPREF)
38+
* @returns Array of integers: -1 (not found), 1 (acknowledged and deleted), 2 (acknowledged with dangling refs)
39+
* @see https://redis.io/commands/xackdel/
40+
*/
41+
parseCommand(
42+
parser: CommandParser,
43+
key: RedisArgument,
44+
group: RedisArgument,
45+
id: RedisVariadicArgument,
46+
policy?: (typeof XAckDelPolicy)[keyof typeof XAckDelPolicy],
47+
) {
48+
parser.push("XACKDEL");
49+
parser.pushKey(key);
50+
parser.push(group);
51+
52+
if (policy) {
53+
parser.push(policy);
54+
}
55+
56+
parser.push("IDS");
57+
parser.pushVariadicWithLength(id);
58+
},
59+
transformReply: undefined as unknown as () => ArrayReply<
60+
(typeof XACKDEL_REPLY_CODES)[keyof typeof XACKDEL_REPLY_CODES]
61+
>,
62+
} as const satisfies Command;

packages/client/lib/commands/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ import TYPE from './TYPE';
280280
import UNLINK from './UNLINK';
281281
import WAIT from './WAIT';
282282
import XACK from './XACK';
283+
import XACKDEL from './XACKDEL';
283284
import XADD_NOMKSTREAM from './XADD_NOMKSTREAM';
284285
import XADD from './XADD';
285286
import XAUTOCLAIM_JUSTID from './XAUTOCLAIM_JUSTID';
@@ -925,6 +926,8 @@ export default {
925926
wait: WAIT,
926927
XACK,
927928
xAck: XACK,
929+
XACKDEL,
930+
xAckDel: XACKDEL,
928931
XADD_NOMKSTREAM,
929932
xAddNoMkStream: XADD_NOMKSTREAM,
930933
XADD,

0 commit comments

Comments
 (0)