Skip to content

Add Redis 8.2 New Stream Commands #3029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
fail-fast: false
matrix:
node-version: ["18", "20", "22"]
redis-version: ["rs-7.2.0-v13", "rs-7.4.0-v1", "8.0.2", "8.2-M01-pre"]
redis-version: ["rs-7.4.0-v1", "8.0.2", "8.2-rc1"]
steps:
- uses: actions/checkout@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion packages/bloom/lib/test-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import RedisBloomModules from '.';
export default TestUtils.createFromConfig({
dockerImageName: 'redislabs/client-libs-test',
dockerImageVersionArgument: 'redis-version',
defaultDockerVersion: '8.2-M01-pre'
defaultDockerVersion: '8.2-rc1'
});

export const GLOBAL = {
Expand Down
196 changes: 196 additions & 0 deletions packages/client/lib/commands/XACKDEL.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import { strict as assert } from "node:assert";
import XACKDEL from "./XACKDEL";
import { parseArgs } from "./generic-transformers";
import testUtils, { GLOBAL } from "../test-utils";
import {
STREAM_DELETION_POLICY,
STREAM_DELETION_REPLY_CODES,
} from "./common-stream.types";

describe("XACKDEL", () => {
describe("transformArguments", () => {
it("string - without policy", () => {
assert.deepEqual(parseArgs(XACKDEL, "key", "group", "0-0"), [
"XACKDEL",
"key",
"group",
"IDS",
"1",
"0-0",
]);
});

it("string - with policy", () => {
assert.deepEqual(
parseArgs(
XACKDEL,
"key",
"group",
"0-0",
STREAM_DELETION_POLICY.KEEPREF
),
["XACKDEL", "key", "group", "KEEPREF", "IDS", "1", "0-0"]
);
});

it("array - without policy", () => {
assert.deepEqual(parseArgs(XACKDEL, "key", "group", ["0-0", "1-0"]), [
"XACKDEL",
"key",
"group",
"IDS",
"2",
"0-0",
"1-0",
]);
});

it("array - with policy", () => {
assert.deepEqual(
parseArgs(
XACKDEL,
"key",
"group",
["0-0", "1-0"],
STREAM_DELETION_POLICY.DELREF
),
["XACKDEL", "key", "group", "DELREF", "IDS", "2", "0-0", "1-0"]
);
});
});

testUtils.testAll(
`XACKDEL non-existing key - without policy`,
async (client) => {
const reply = await client.xAckDel("{tag}stream-key", "testgroup", "0-0");
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.NOT_FOUND]);
},
{
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
}
);

testUtils.testAll(
`XACKDEL existing key - without policy`,
async (client) => {
const streamKey = "{tag}stream-key";
const groupName = "testgroup";

// create consumer group, stream and message
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
const messageId = await client.xAdd(streamKey, "*", { field: "value" });

// read message
await client.xReadGroup(groupName, "testconsumer", {
key: streamKey,
id: ">",
});

const reply = await client.xAckDel(streamKey, groupName, messageId);
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]);
},
{
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
}
);

testUtils.testAll(
`XACKDEL existing key - with policy`,
async (client) => {
const streamKey = "{tag}stream-key";
const groupName = "testgroup";

// create consumer group, stream and message
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
const messageId = await client.xAdd(streamKey, "*", { field: "value" });

// read message
await client.xReadGroup(groupName, "testconsumer", {
key: streamKey,
id: ">",
});

const reply = await client.xAckDel(
streamKey,
groupName,
messageId,
STREAM_DELETION_POLICY.DELREF
);
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]);
},
{
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
}
);

testUtils.testAll(
`XACKDEL acknowledge policy - with consumer group`,
async (client) => {
const streamKey = "{tag}stream-key";
const groupName = "testgroup";

// create consumer groups, stream and message
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
await client.xGroupCreate(streamKey, "some-other-group", "0");
const messageId = await client.xAdd(streamKey, "*", { field: "value" });

// read message
await client.xReadGroup(groupName, "testconsumer", {
key: streamKey,
id: ">",
});

const reply = await client.xAckDel(
streamKey,
groupName,
messageId,
STREAM_DELETION_POLICY.ACKED
);
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DANGLING_REFS]);
},
{
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
}
);

testUtils.testAll(
`XACKDEL multiple keys`,
async (client) => {
const streamKey = "{tag}stream-key";
const groupName = "testgroup";

// create consumer groups, stream and add messages
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
const messageIds = await Promise.all([
client.xAdd(streamKey, "*", { field: "value1" }),
client.xAdd(streamKey, "*", { field: "value2" }),
]);

// read messages
await client.xReadGroup(groupName, "testconsumer", {
key: streamKey,
id: ">",
});

const reply = await client.xAckDel(
streamKey,
groupName,
[...messageIds, "0-0"],
STREAM_DELETION_POLICY.DELREF
);
assert.deepEqual(reply, [
STREAM_DELETION_REPLY_CODES.DELETED,
STREAM_DELETION_REPLY_CODES.DELETED,
STREAM_DELETION_REPLY_CODES.NOT_FOUND,
]);
},
{
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
}
);
});
45 changes: 45 additions & 0 deletions packages/client/lib/commands/XACKDEL.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { CommandParser } from "../client/parser";
import { RedisArgument, ArrayReply, Command } from "../RESP/types";
import {
StreamDeletionReplyCode,
StreamDeletionPolicy,
} from "./common-stream.types";
import { RedisVariadicArgument } from "./generic-transformers";

/**
* Acknowledges and deletes one or multiple messages for a stream consumer group
*/
export default {
IS_READ_ONLY: false,
/**
* Constructs the XACKDEL command to acknowledge and delete one or multiple messages for a stream consumer group
*
* @param parser - The command parser
* @param key - The stream key
* @param group - The consumer group name
* @param id - One or more message IDs to acknowledge and delete
* @param policy - Policy to apply when deleting entries (optional, defaults to KEEPREF)
* @returns Array of integers: -1 (not found), 1 (acknowledged and deleted), 2 (acknowledged with dangling refs)
* @see https://redis.io/commands/xackdel/
*/
parseCommand(
parser: CommandParser,
key: RedisArgument,
group: RedisArgument,
id: RedisVariadicArgument,
policy?: StreamDeletionPolicy
) {
parser.push("XACKDEL");
parser.pushKey(key);
parser.push(group);

if (policy) {
parser.push(policy);
}

parser.push("IDS");
parser.pushVariadicWithLength(id);
},
transformReply:
undefined as unknown as () => ArrayReply<StreamDeletionReplyCode>,
} as const satisfies Command;
80 changes: 80 additions & 0 deletions packages/client/lib/commands/XADD.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert';
import testUtils, { GLOBAL } from '../test-utils';
import XADD from './XADD';
import { parseArgs } from './generic-transformers';
import { STREAM_DELETION_POLICY } from './common-stream.types';

describe('XADD', () => {
describe('transformArguments', () => {
Expand Down Expand Up @@ -78,6 +79,37 @@ describe('XADD', () => {
['XADD', 'key', '1000', 'LIMIT', '1', '*', 'field', 'value']
);
});

it('with TRIM.policy', () => {
assert.deepEqual(
parseArgs(XADD, 'key', '*', {
field: 'value'
}, {
TRIM: {
threshold: 1000,
policy: STREAM_DELETION_POLICY.DELREF
}
}),
['XADD', 'key', '1000', 'DELREF', '*', 'field', 'value']
);
});

it('with all TRIM options', () => {
assert.deepEqual(
parseArgs(XADD, 'key', '*', {
field: 'value'
}, {
TRIM: {
strategy: 'MAXLEN',
strategyModifier: '~',
threshold: 1000,
limit: 100,
policy: STREAM_DELETION_POLICY.ACKED
}
}),
['XADD', 'key', 'MAXLEN', '~', '1000', 'LIMIT', '100', 'ACKED', '*', 'field', 'value']
);
});
});

testUtils.testAll('xAdd', async client => {
Expand All @@ -91,4 +123,52 @@ describe('XADD', () => {
client: GLOBAL.SERVERS.OPEN,
cluster: GLOBAL.CLUSTERS.OPEN
});

testUtils.testAll(
'xAdd with TRIM policy',
async (client) => {
assert.equal(
typeof await client.xAdd('{tag}key', '*',
{ field: 'value' },
{
TRIM: {
strategy: 'MAXLEN',
threshold: 1000,
policy: STREAM_DELETION_POLICY.KEEPREF
}
}
),
'string'
);
},
{
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
}
);

testUtils.testAll(
'xAdd with all TRIM options',
async (client) => {
assert.equal(
typeof await client.xAdd('{tag}key2', '*',
{ field: 'value' },
{
TRIM: {
strategy: 'MAXLEN',
strategyModifier: '~',
threshold: 1000,
limit: 10,
policy: STREAM_DELETION_POLICY.DELREF
}
}
),
'string'
);
},
{
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
}
);
});
8 changes: 8 additions & 0 deletions packages/client/lib/commands/XADD.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { CommandParser } from '../client/parser';
import { RedisArgument, BlobStringReply, Command } from '../RESP/types';
import { StreamDeletionPolicy } from './common-stream.types';
import { Tail } from './generic-transformers';

/**
Expand All @@ -10,13 +11,16 @@ import { Tail } from './generic-transformers';
* @property TRIM.strategyModifier - Exact ('=') or approximate ('~') trimming
* @property TRIM.threshold - Maximum stream length or minimum ID to retain
* @property TRIM.limit - Maximum number of entries to trim in one call
* @property TRIM.policy - Policy to apply when trimming entries (optional, defaults to KEEPREF)
*/
export interface XAddOptions {
TRIM?: {
strategy?: 'MAXLEN' | 'MINID';
strategyModifier?: '=' | '~';
threshold: number;
limit?: number;
/** added in 8.2 */
policy?: StreamDeletionPolicy;
};
}

Expand Down Expand Up @@ -58,6 +62,10 @@ export function parseXAddArguments(
if (options.TRIM.limit) {
parser.push('LIMIT', options.TRIM.limit.toString());
}

if (options.TRIM.policy) {
parser.push(options.TRIM.policy);
}
}

parser.push(id);
Expand Down
Loading
Loading