Skip to content

Commit d39148c

Browse files
committed
feat: implement XDELEX command for Redis 8.2
1 parent 8ab2709 commit d39148c

File tree

3 files changed

+219
-0
lines changed

3 files changed

+219
-0
lines changed
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
import { strict as assert } from "node:assert";
2+
import XDELEX, { XDELEX_REPLY_CODES, XDelexPolicy } from "./XDELEX";
3+
import { parseArgs } from "./generic-transformers";
4+
import testUtils, { GLOBAL } from "../test-utils";
5+
6+
describe("XDELEX", () => {
7+
describe("transformArguments", () => {
8+
it("string - without policy", () => {
9+
assert.deepEqual(parseArgs(XDELEX, "key", "0-0"), [
10+
"XDELEX",
11+
"key",
12+
"IDS",
13+
"1",
14+
"0-0",
15+
]);
16+
});
17+
18+
it("string - with policy", () => {
19+
assert.deepEqual(parseArgs(XDELEX, "key", "0-0", XDelexPolicy.KEEPREF), [
20+
"XDELEX",
21+
"key",
22+
"KEEPREF",
23+
"IDS",
24+
"1",
25+
"0-0",
26+
]);
27+
});
28+
29+
it("array - without policy", () => {
30+
assert.deepEqual(parseArgs(XDELEX, "key", ["0-0", "1-0"]), [
31+
"XDELEX",
32+
"key",
33+
"IDS",
34+
"2",
35+
"0-0",
36+
"1-0",
37+
]);
38+
});
39+
40+
it("array - with policy", () => {
41+
assert.deepEqual(
42+
parseArgs(XDELEX, "key", ["0-0", "1-0"], XDelexPolicy.DELREF),
43+
["XDELEX", "key", "DELREF", "IDS", "2", "0-0", "1-0"]
44+
);
45+
});
46+
});
47+
48+
testUtils.testAll(
49+
`XDELEX non-existing key - without policy`,
50+
async (client) => {
51+
const reply = await client.xDelex("{tag}stream-key", "0-0");
52+
assert.deepEqual(reply, [XDELEX_REPLY_CODES.NOT_FOUND]);
53+
},
54+
{
55+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
56+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
57+
}
58+
);
59+
60+
testUtils.testAll(
61+
`XDELEX existing key - without policy`,
62+
async (client) => {
63+
const streamKey = "{tag}stream-key";
64+
const messageId = await client.xAdd(streamKey, "*", {
65+
field: "value",
66+
});
67+
68+
const reply = await client.xDelex(
69+
streamKey,
70+
messageId,
71+
XDelexPolicy.KEEPREF
72+
);
73+
assert.deepEqual(reply, [XDELEX_REPLY_CODES.DELETED]);
74+
},
75+
{
76+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
77+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
78+
}
79+
);
80+
81+
testUtils.testAll(
82+
`XDELEX existing key - with policy`,
83+
async (client) => {
84+
const streamKey = "{tag}stream-key";
85+
const messageId = await client.xAdd(streamKey, "*", {
86+
field: "value",
87+
});
88+
89+
const reply = await client.xDelex(
90+
streamKey,
91+
messageId,
92+
XDelexPolicy.DELREF
93+
);
94+
assert.deepEqual(reply, [XDELEX_REPLY_CODES.DELETED]);
95+
},
96+
{
97+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
98+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
99+
}
100+
);
101+
102+
testUtils.testAll(
103+
`XDELEX acknowledge policy - with consumer group`,
104+
async (client) => {
105+
const streamKey = "{tag}stream-key";
106+
107+
// Add a message to the stream
108+
const messageId = await client.xAdd(streamKey, "*", {
109+
field: "value",
110+
});
111+
112+
// Create consumer group
113+
await client.xGroupCreate(streamKey, "testgroup", "0");
114+
115+
const reply = await client.xDelex(
116+
streamKey,
117+
messageId,
118+
XDelexPolicy.ACKED
119+
);
120+
assert.deepEqual(reply, [XDELEX_REPLY_CODES.DANGLING_REFS]);
121+
},
122+
{
123+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
124+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
125+
}
126+
);
127+
128+
testUtils.testAll(
129+
`XDELEX multiple keys`,
130+
async (client) => {
131+
const streamKey = "{tag}stream-key";
132+
const messageIds = await Promise.all([
133+
client.xAdd(streamKey, "*", {
134+
field: "value1",
135+
}),
136+
client.xAdd(streamKey, "*", {
137+
field: "value2",
138+
}),
139+
]);
140+
141+
const reply = await client.xDelex(
142+
streamKey,
143+
[...messageIds, "0-0"],
144+
XDelexPolicy.DELREF
145+
);
146+
assert.deepEqual(reply, [
147+
XDELEX_REPLY_CODES.DELETED,
148+
XDELEX_REPLY_CODES.DELETED,
149+
XDELEX_REPLY_CODES.NOT_FOUND,
150+
]);
151+
},
152+
{
153+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
154+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
155+
}
156+
);
157+
});
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { CommandParser } from "../client/parser";
2+
import { RedisArgument, ArrayReply, Command } from "../RESP/types";
3+
import { RedisVariadicArgument } from "./generic-transformers";
4+
5+
/** XDELEX deletion policies */
6+
export const XDelexPolicy = {
7+
/** Preserve references (default) */
8+
KEEPREF: "KEEPREF",
9+
/** Delete all references */
10+
DELREF: "DELREF",
11+
/** Only acknowledged entries */
12+
ACKED: "ACKED",
13+
} as const;
14+
15+
/** XDELEX reply codes */
16+
export const XDELEX_REPLY_CODES = {
17+
/** ID not found */
18+
NOT_FOUND: -1,
19+
/** Entry deleted */
20+
DELETED: 1,
21+
/** Dangling references */
22+
DANGLING_REFS: 2,
23+
} as const;
24+
25+
/**
26+
* Deletes one or multiple entries from the stream
27+
*/
28+
export default {
29+
IS_READ_ONLY: false,
30+
/**
31+
* Constructs the XDELEX command to delete one or multiple entries from the stream
32+
*
33+
* @param parser - The command parser
34+
* @param key - The stream key
35+
* @param id - One or more message IDs to delete
36+
* @param policy - Policy to apply when deleting entries (optional, defaults to KEEPREF)
37+
* @returns Array of integers: -1 (not found), 1 (deleted), 2 (dangling refs)
38+
* @see https://redis.io/commands/xdelex/
39+
*/
40+
parseCommand(
41+
parser: CommandParser,
42+
key: RedisArgument,
43+
id: RedisVariadicArgument,
44+
policy?: (typeof XDelexPolicy)[keyof typeof XDelexPolicy],
45+
) {
46+
parser.push("XDELEX");
47+
parser.pushKey(key);
48+
49+
if (policy) {
50+
parser.push(policy);
51+
}
52+
53+
parser.push("IDS");
54+
parser.pushVariadicWithLength(id);
55+
},
56+
transformReply: undefined as unknown as () => ArrayReply<
57+
(typeof XDELEX_REPLY_CODES)[keyof typeof XDELEX_REPLY_CODES]
58+
>,
59+
} as const satisfies Command;

packages/client/lib/commands/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ import XAUTOCLAIM from './XAUTOCLAIM';
287287
import XCLAIM_JUSTID from './XCLAIM_JUSTID';
288288
import XCLAIM from './XCLAIM';
289289
import XDEL from './XDEL';
290+
import XDELEX from './XDELEX';
290291
import XGROUP_CREATE from './XGROUP_CREATE';
291292
import XGROUP_CREATECONSUMER from './XGROUP_CREATECONSUMER';
292293
import XGROUP_DELCONSUMER from './XGROUP_DELCONSUMER';
@@ -938,6 +939,8 @@ export default {
938939
xClaim: XCLAIM,
939940
XDEL,
940941
xDel: XDEL,
942+
XDELEX,
943+
xDelex: XDELEX,
941944
XGROUP_CREATE,
942945
xGroupCreate: XGROUP_CREATE,
943946
XGROUP_CREATECONSUMER,

0 commit comments

Comments
 (0)