Skip to content

Commit 3aec14b

Browse files
committed
refactor: create shared stream deletion types
for Redis 8.2 commands
1 parent 1c1e1ca commit 3aec14b

File tree

5 files changed

+86
-82
lines changed

5 files changed

+86
-82
lines changed

packages/client/lib/commands/XACKDEL.spec.ts

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import { strict as assert } from "node:assert";
2-
import XACKDEL, { XACKDEL_REPLY_CODES, XAckDelPolicy } from "./XACKDEL";
2+
import XACKDEL from "./XACKDEL";
33
import { parseArgs } from "./generic-transformers";
44
import testUtils, { GLOBAL } from "../test-utils";
5+
import {
6+
STREAM_DELETION_POLICY,
7+
STREAM_DELETION_REPLY_CODES,
8+
} from "./common-stream.types";
59

610
describe("XACKDEL", () => {
711
describe("transformArguments", () => {
@@ -18,7 +22,13 @@ describe("XACKDEL", () => {
1822

1923
it("string - with policy", () => {
2024
assert.deepEqual(
21-
parseArgs(XACKDEL, "key", "group", "0-0", XAckDelPolicy.KEEPREF),
25+
parseArgs(
26+
XACKDEL,
27+
"key",
28+
"group",
29+
"0-0",
30+
STREAM_DELETION_POLICY.KEEPREF
31+
),
2232
["XACKDEL", "key", "group", "KEEPREF", "IDS", "1", "0-0"]
2333
);
2434
});
@@ -42,7 +52,7 @@ describe("XACKDEL", () => {
4252
"key",
4353
"group",
4454
["0-0", "1-0"],
45-
XAckDelPolicy.DELREF
55+
STREAM_DELETION_POLICY.DELREF
4656
),
4757
["XACKDEL", "key", "group", "DELREF", "IDS", "2", "0-0", "1-0"]
4858
);
@@ -53,7 +63,7 @@ describe("XACKDEL", () => {
5363
`XACKDEL non-existing key - without policy`,
5464
async (client) => {
5565
const reply = await client.xAckDel("{tag}stream-key", "testgroup", "0-0");
56-
assert.deepEqual(reply, [XACKDEL_REPLY_CODES.NOT_FOUND]);
66+
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.NOT_FOUND]);
5767
},
5868
{
5969
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
@@ -78,7 +88,7 @@ describe("XACKDEL", () => {
7888
});
7989

8090
const reply = await client.xAckDel(streamKey, groupName, messageId);
81-
assert.deepEqual(reply, [XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED]);
91+
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]);
8292
},
8393
{
8494
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
@@ -106,9 +116,9 @@ describe("XACKDEL", () => {
106116
streamKey,
107117
groupName,
108118
messageId,
109-
XAckDelPolicy.DELREF
119+
STREAM_DELETION_POLICY.DELREF
110120
);
111-
assert.deepEqual(reply, [XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED]);
121+
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]);
112122
},
113123
{
114124
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
@@ -137,9 +147,9 @@ describe("XACKDEL", () => {
137147
streamKey,
138148
groupName,
139149
messageId,
140-
XAckDelPolicy.ACKED
150+
STREAM_DELETION_POLICY.ACKED
141151
);
142-
assert.deepEqual(reply, [XACKDEL_REPLY_CODES.ACKNOWLEDGED_DANGLING_REFS]);
152+
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DANGLING_REFS]);
143153
},
144154
{
145155
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
@@ -170,12 +180,12 @@ describe("XACKDEL", () => {
170180
streamKey,
171181
groupName,
172182
[...messageIds, "0-0"],
173-
XAckDelPolicy.DELREF
183+
STREAM_DELETION_POLICY.DELREF
174184
);
175185
assert.deepEqual(reply, [
176-
XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED,
177-
XACKDEL_REPLY_CODES.ACKNOWLEDGED_AND_DELETED,
178-
XACKDEL_REPLY_CODES.NOT_FOUND,
186+
STREAM_DELETION_REPLY_CODES.DELETED,
187+
STREAM_DELETION_REPLY_CODES.DELETED,
188+
STREAM_DELETION_REPLY_CODES.NOT_FOUND,
179189
]);
180190
},
181191
{
Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,11 @@
11
import { CommandParser } from "../client/parser";
22
import { RedisArgument, ArrayReply, Command } from "../RESP/types";
3+
import {
4+
StreamDeletionReplyCode,
5+
StreamDeletionPolicy,
6+
} from "./common-stream.types";
37
import { RedisVariadicArgument } from "./generic-transformers";
48

5-
/** XACKDEL deletion policies */
6-
export const XAckDelPolicy = {
7-
/** Preserve references (default) */
8-
KEEPREF: "KEEPREF",
9-
/** Delete all references */
10-
DELREF: "DELREF",
11-
/** Only acknowledged entries */
12-
ACKED: "ACKED",
13-
} as const;
14-
15-
/** XACKDEL reply codes */
16-
export const XACKDEL_REPLY_CODES = {
17-
/** ID not found */
18-
NOT_FOUND: -1,
19-
/** Entry acknowledged and deleted */
20-
ACKNOWLEDGED_AND_DELETED: 1,
21-
/** Entry acknowledged but dangling references remain */
22-
ACKNOWLEDGED_DANGLING_REFS: 2,
23-
} as const;
24-
259
/**
2610
* Acknowledges and deletes one or multiple messages for a stream consumer group
2711
*/
@@ -43,7 +27,7 @@ export default {
4327
key: RedisArgument,
4428
group: RedisArgument,
4529
id: RedisVariadicArgument,
46-
policy?: (typeof XAckDelPolicy)[keyof typeof XAckDelPolicy],
30+
policy?: StreamDeletionPolicy
4731
) {
4832
parser.push("XACKDEL");
4933
parser.pushKey(key);
@@ -56,7 +40,6 @@ export default {
5640
parser.push("IDS");
5741
parser.pushVariadicWithLength(id);
5842
},
59-
transformReply: undefined as unknown as () => ArrayReply<
60-
(typeof XACKDEL_REPLY_CODES)[keyof typeof XACKDEL_REPLY_CODES]
61-
>,
43+
transformReply:
44+
undefined as unknown as () => ArrayReply<StreamDeletionReplyCode>,
6245
} as const satisfies Command;

packages/client/lib/commands/XDELEX.spec.ts

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import { strict as assert } from "node:assert";
2-
import XDELEX, { XDELEX_REPLY_CODES, XDelexPolicy } from "./XDELEX";
2+
import XDELEX from "./XDELEX";
33
import { parseArgs } from "./generic-transformers";
44
import testUtils, { GLOBAL } from "../test-utils";
5+
import {
6+
STREAM_DELETION_POLICY,
7+
STREAM_DELETION_REPLY_CODES,
8+
} from "./common-stream.types";
59

610
describe("XDELEX", () => {
711
describe("transformArguments", () => {
@@ -16,14 +20,10 @@ describe("XDELEX", () => {
1620
});
1721

1822
it("string - with policy", () => {
19-
assert.deepEqual(parseArgs(XDELEX, "key", "0-0", XDelexPolicy.KEEPREF), [
20-
"XDELEX",
21-
"key",
22-
"KEEPREF",
23-
"IDS",
24-
"1",
25-
"0-0",
26-
]);
23+
assert.deepEqual(
24+
parseArgs(XDELEX, "key", "0-0", STREAM_DELETION_POLICY.KEEPREF),
25+
["XDELEX", "key", "KEEPREF", "IDS", "1", "0-0"]
26+
);
2727
});
2828

2929
it("array - without policy", () => {
@@ -39,7 +39,7 @@ describe("XDELEX", () => {
3939

4040
it("array - with policy", () => {
4141
assert.deepEqual(
42-
parseArgs(XDELEX, "key", ["0-0", "1-0"], XDelexPolicy.DELREF),
42+
parseArgs(XDELEX, "key", ["0-0", "1-0"], STREAM_DELETION_POLICY.DELREF),
4343
["XDELEX", "key", "DELREF", "IDS", "2", "0-0", "1-0"]
4444
);
4545
});
@@ -49,7 +49,7 @@ describe("XDELEX", () => {
4949
`XDELEX non-existing key - without policy`,
5050
async (client) => {
5151
const reply = await client.xDelex("{tag}stream-key", "0-0");
52-
assert.deepEqual(reply, [XDELEX_REPLY_CODES.NOT_FOUND]);
52+
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.NOT_FOUND]);
5353
},
5454
{
5555
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
@@ -68,9 +68,9 @@ describe("XDELEX", () => {
6868
const reply = await client.xDelex(
6969
streamKey,
7070
messageId,
71-
XDelexPolicy.KEEPREF
71+
STREAM_DELETION_POLICY.KEEPREF
7272
);
73-
assert.deepEqual(reply, [XDELEX_REPLY_CODES.DELETED]);
73+
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]);
7474
},
7575
{
7676
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
@@ -89,9 +89,9 @@ describe("XDELEX", () => {
8989
const reply = await client.xDelex(
9090
streamKey,
9191
messageId,
92-
XDelexPolicy.DELREF
92+
STREAM_DELETION_POLICY.DELREF
9393
);
94-
assert.deepEqual(reply, [XDELEX_REPLY_CODES.DELETED]);
94+
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]);
9595
},
9696
{
9797
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
@@ -115,9 +115,9 @@ describe("XDELEX", () => {
115115
const reply = await client.xDelex(
116116
streamKey,
117117
messageId,
118-
XDelexPolicy.ACKED
118+
STREAM_DELETION_POLICY.ACKED
119119
);
120-
assert.deepEqual(reply, [XDELEX_REPLY_CODES.DANGLING_REFS]);
120+
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DANGLING_REFS]);
121121
},
122122
{
123123
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
@@ -141,12 +141,12 @@ describe("XDELEX", () => {
141141
const reply = await client.xDelex(
142142
streamKey,
143143
[...messageIds, "0-0"],
144-
XDelexPolicy.DELREF
144+
STREAM_DELETION_POLICY.DELREF
145145
);
146146
assert.deepEqual(reply, [
147-
XDELEX_REPLY_CODES.DELETED,
148-
XDELEX_REPLY_CODES.DELETED,
149-
XDELEX_REPLY_CODES.NOT_FOUND,
147+
STREAM_DELETION_REPLY_CODES.DELETED,
148+
STREAM_DELETION_REPLY_CODES.DELETED,
149+
STREAM_DELETION_REPLY_CODES.NOT_FOUND,
150150
]);
151151
},
152152
{
Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,11 @@
11
import { CommandParser } from "../client/parser";
22
import { RedisArgument, ArrayReply, Command } from "../RESP/types";
3+
import {
4+
StreamDeletionPolicy,
5+
StreamDeletionReplyCode,
6+
} from "./common-stream.types";
37
import { RedisVariadicArgument } from "./generic-transformers";
48

5-
/** XDELEX deletion policies */
6-
export const XDelexPolicy = {
7-
/** Preserve references (default) */
8-
KEEPREF: "KEEPREF",
9-
/** Delete all references */
10-
DELREF: "DELREF",
11-
/** Only acknowledged entries */
12-
ACKED: "ACKED",
13-
} as const;
14-
15-
/** XDELEX reply codes */
16-
export const XDELEX_REPLY_CODES = {
17-
/** ID not found */
18-
NOT_FOUND: -1,
19-
/** Entry deleted */
20-
DELETED: 1,
21-
/** Dangling references */
22-
DANGLING_REFS: 2,
23-
} as const;
24-
259
/**
2610
* Deletes one or multiple entries from the stream
2711
*/
@@ -41,7 +25,7 @@ export default {
4125
parser: CommandParser,
4226
key: RedisArgument,
4327
id: RedisVariadicArgument,
44-
policy?: (typeof XDelexPolicy)[keyof typeof XDelexPolicy],
28+
policy?: StreamDeletionPolicy
4529
) {
4630
parser.push("XDELEX");
4731
parser.pushKey(key);
@@ -53,7 +37,6 @@ export default {
5337
parser.push("IDS");
5438
parser.pushVariadicWithLength(id);
5539
},
56-
transformReply: undefined as unknown as () => ArrayReply<
57-
(typeof XDELEX_REPLY_CODES)[keyof typeof XDELEX_REPLY_CODES]
58-
>,
40+
transformReply:
41+
undefined as unknown as () => ArrayReply<StreamDeletionReplyCode>,
5942
} as const satisfies Command;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/** Common stream deletion policies
2+
*
3+
* Added in Redis 8.2
4+
*/
5+
export const STREAM_DELETION_POLICY = {
6+
/** Preserve references (default) */
7+
KEEPREF: "KEEPREF",
8+
/** Delete all references */
9+
DELREF: "DELREF",
10+
/** Only acknowledged entries */
11+
ACKED: "ACKED",
12+
} as const;
13+
14+
export type StreamDeletionPolicy =
15+
(typeof STREAM_DELETION_POLICY)[keyof typeof STREAM_DELETION_POLICY];
16+
17+
/** Common reply codes for stream deletion operations */
18+
export const STREAM_DELETION_REPLY_CODES = {
19+
/** ID not found */
20+
NOT_FOUND: -1,
21+
/** Entry deleted */
22+
DELETED: 1,
23+
/** Dangling references */
24+
DANGLING_REFS: 2,
25+
} as const;
26+
27+
export type StreamDeletionReplyCode =
28+
(typeof STREAM_DELETION_REPLY_CODES)[keyof typeof STREAM_DELETION_REPLY_CODES];

0 commit comments

Comments
 (0)