Skip to content

Commit 1b17a78

Browse files
authored
feat(client): add XADD idempotency options (IDMPAUTO, IDMP) and policy (redis#3161)
1 parent 0230d6b commit 1b17a78

File tree

2 files changed

+192
-6
lines changed

2 files changed

+192
-6
lines changed

packages/client/lib/commands/XADD.spec.ts

Lines changed: 158 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,67 @@ describe('XADD', () => {
110110
['XADD', 'key', 'MAXLEN', '~', '1000', 'LIMIT', '100', 'ACKED', '*', 'field', 'value']
111111
);
112112
});
113+
114+
it('with policy', () => {
115+
assert.deepEqual(
116+
parseArgs(XADD, 'key', '*', {
117+
field: 'value'
118+
}, {
119+
policy: STREAM_DELETION_POLICY.KEEPREF
120+
}),
121+
['XADD', 'key', 'KEEPREF', '*', 'field', 'value']
122+
);
123+
});
124+
125+
it('with IDMPAUTO', () => {
126+
assert.deepEqual(
127+
parseArgs(XADD, 'key', '*', {
128+
field: 'value'
129+
}, {
130+
IDMPAUTO: { pid: 'producer1' }
131+
}),
132+
['XADD', 'key', 'IDMPAUTO', 'producer1', '*', 'field', 'value']
133+
);
134+
});
135+
136+
it('with IDMP', () => {
137+
assert.deepEqual(
138+
parseArgs(XADD, 'key', '*', {
139+
field: 'value'
140+
}, {
141+
IDMP: { pid: 'producer1', iid: '42' }
142+
}),
143+
['XADD', 'key', 'IDMP', 'producer1', '42', '*', 'field', 'value']
144+
);
145+
});
146+
147+
it('with policy and IDMPAUTO', () => {
148+
assert.deepEqual(
149+
parseArgs(XADD, 'key', '*', {
150+
field: 'value'
151+
}, {
152+
policy: STREAM_DELETION_POLICY.DELREF,
153+
IDMPAUTO: { pid: 'producer1' }
154+
}),
155+
['XADD', 'key', 'DELREF', 'IDMPAUTO', 'producer1', '*', 'field', 'value']
156+
);
157+
});
158+
159+
it('with policy, IDMP, and TRIM', () => {
160+
assert.deepEqual(
161+
parseArgs(XADD, 'key', '*', {
162+
field: 'value'
163+
}, {
164+
policy: STREAM_DELETION_POLICY.ACKED,
165+
IDMP: { pid: 'producer1', iid: 'msg123' },
166+
TRIM: {
167+
strategy: 'MAXLEN',
168+
threshold: 1000
169+
}
170+
}),
171+
['XADD', 'key', 'ACKED', 'IDMP', 'producer1', 'msg123', 'MAXLEN', '1000', '*', 'field', 'value']
172+
);
173+
});
113174
});
114175

115176
testUtils.testAll('xAdd', async client => {
@@ -128,7 +189,7 @@ describe('XADD', () => {
128189
'xAdd with TRIM policy',
129190
async (client) => {
130191
assert.equal(
131-
typeof await client.xAdd('{tag}key', '*',
192+
typeof await client.xAdd('{tag}key', '*',
132193
{ field: 'value' },
133194
{
134195
TRIM: {
@@ -151,7 +212,7 @@ describe('XADD', () => {
151212
'xAdd with all TRIM options',
152213
async (client) => {
153214
assert.equal(
154-
typeof await client.xAdd('{tag}key2', '*',
215+
typeof await client.xAdd('{tag}key2', '*',
155216
{ field: 'value' },
156217
{
157218
TRIM: {
@@ -171,4 +232,99 @@ describe('XADD', () => {
171232
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
172233
}
173234
);
235+
236+
testUtils.testAll(
237+
'xAdd with policy',
238+
async (client) => {
239+
assert.equal(
240+
typeof await client.xAdd('{tag}key3', '*',
241+
{ field: 'value' },
242+
{
243+
policy: STREAM_DELETION_POLICY.KEEPREF
244+
}
245+
),
246+
'string'
247+
);
248+
},
249+
{
250+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 6] },
251+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 6] },
252+
}
253+
);
254+
255+
testUtils.testAll(
256+
'xAdd with IDMPAUTO',
257+
async (client) => {
258+
const id1 = await client.xAdd('{tag}key4', '*',
259+
{ field1: 'value1', field2: 'value2' },
260+
{
261+
IDMPAUTO: { pid: 'producer1' }
262+
}
263+
);
264+
assert.equal(typeof id1, 'string');
265+
266+
// Adding the same content with same producer should return the same ID (idempotent)
267+
const id2 = await client.xAdd('{tag}key4', '*',
268+
{ field1: 'value1', field2: 'value2' },
269+
{
270+
IDMPAUTO: { pid: 'producer1' }
271+
}
272+
);
273+
assert.equal(id1, id2);
274+
},
275+
{
276+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 6] },
277+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 6] },
278+
}
279+
);
280+
281+
testUtils.testAll(
282+
'xAdd with IDMP',
283+
async (client) => {
284+
const id1 = await client.xAdd('{tag}key5', '*',
285+
{ field: 'value' },
286+
{
287+
IDMP: { pid: 'producer1', iid: '42' }
288+
}
289+
);
290+
assert.equal(typeof id1, 'string');
291+
292+
// Adding with same producer and iid should return the same ID (idempotent)
293+
const id2 = await client.xAdd('{tag}key5', '*',
294+
{ field: 'value' },
295+
{
296+
IDMP: { pid: 'producer1', iid: '42' }
297+
}
298+
);
299+
assert.equal(id1, id2);
300+
},
301+
{
302+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 6] },
303+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 6] },
304+
}
305+
);
306+
307+
testUtils.testAll(
308+
'xAdd with policy, IDMP, and TRIM',
309+
async (client) => {
310+
assert.equal(
311+
typeof await client.xAdd('{tag}key6', '*',
312+
{ field: 'value' },
313+
{
314+
policy: STREAM_DELETION_POLICY.ACKED,
315+
IDMP: { pid: 'producer1', iid: 'msg123' },
316+
TRIM: {
317+
strategy: 'MAXLEN',
318+
threshold: 1000
319+
}
320+
}
321+
),
322+
'string'
323+
);
324+
},
325+
{
326+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 6] },
327+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 6] },
328+
}
329+
);
174330
});

packages/client/lib/commands/XADD.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,13 @@ import { Tail } from './generic-transformers';
55

66
/**
77
* Options for the XADD command
8-
*
8+
*
9+
* @property policy - Reference tracking policy for the entry (KEEPREF, DELREF, or ACKED) - added in 8.6
10+
* @property IDMPAUTO - Automatically calculate an idempotent ID based on entry content to prevent duplicate entries - added in 8.6
11+
* @property IDMPAUTO.pid - Producer ID, must be unique per producer and consistent across restarts
12+
* @property IDMP - Use a specific idempotent ID to prevent duplicate entries - added in 8.6
13+
* @property IDMP.pid - Producer ID, must be unique per producer and consistent across restarts
14+
* @property IDMP.iid - Idempotent ID (binary string), must be unique per message and per pid
915
* @property TRIM - Optional trimming configuration
1016
* @property TRIM.strategy - Trim strategy: MAXLEN (by length) or MINID (by ID)
1117
* @property TRIM.strategyModifier - Exact ('=') or approximate ('~') trimming
@@ -14,25 +20,36 @@ import { Tail } from './generic-transformers';
1420
* @property TRIM.policy - Policy to apply when trimming entries (optional, defaults to KEEPREF)
1521
*/
1622
export interface XAddOptions {
23+
/** added in 8.6 */
24+
policy?: StreamDeletionPolicy;
25+
/** added in 8.6 */
26+
IDMPAUTO?: {
27+
pid: RedisArgument;
28+
};
29+
/** added in 8.6 */
30+
IDMP?: {
31+
pid: RedisArgument;
32+
iid: RedisArgument;
33+
};
1734
TRIM?: {
1835
strategy?: 'MAXLEN' | 'MINID';
1936
strategyModifier?: '=' | '~';
2037
threshold: number;
2138
limit?: number;
22-
/** added in 8.2 */
39+
/** added in 8.6 */
2340
policy?: StreamDeletionPolicy;
2441
};
2542
}
2643

2744
/**
2845
* Parses arguments for the XADD command
2946
*
30-
* @param optional - Optional command modifier
47+
* @param optional - Optional command modifier (e.g., NOMKSTREAM)
3148
* @param parser - The command parser
3249
* @param key - The stream key
3350
* @param id - Message ID (* for auto-generation)
3451
* @param message - Key-value pairs representing the message fields
35-
* @param options - Additional options for stream trimming
52+
* @param options - Additional options for reference tracking, idempotency, and trimming
3653
*/
3754
export function parseXAddArguments(
3855
optional: RedisArgument | undefined,
@@ -48,6 +65,19 @@ export function parseXAddArguments(
4865
parser.push(optional);
4966
}
5067

68+
// Reference tracking policy (KEEPREF | DELREF | ACKED)
69+
if (options?.policy) {
70+
parser.push(options.policy);
71+
}
72+
73+
// Idempotency options (IDMPAUTO or IDMP)
74+
if (options?.IDMPAUTO) {
75+
parser.push('IDMPAUTO', options.IDMPAUTO.pid);
76+
} else if (options?.IDMP) {
77+
parser.push('IDMP', options.IDMP.pid, options.IDMP.iid);
78+
}
79+
80+
// Trimming options
5181
if (options?.TRIM) {
5282
if (options.TRIM.strategy) {
5383
parser.push(options.TRIM.strategy);

0 commit comments

Comments
 (0)