Skip to content

Commit f555c4d

Browse files
committed
add command/command info and xread/xreadgroup to v5 support
1 parent 9a85b10 commit f555c4d

File tree

8 files changed

+99
-38
lines changed

8 files changed

+99
-38
lines changed
Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
// import { RedisCommandArguments } from '.';
2-
// import { CommandRawReply, CommandReply, transformCommandReply } from './generic-transformers';
1+
import { ArrayReply, Command, UnwrapReply } from '../RESP/types';
2+
import { CommandRawReply, CommandReply, transformCommandReply } from './generic-transformers';
33

4-
// export const IS_READ_ONLY = true;
5-
6-
// export function transformArguments(): RedisCommandArguments {
7-
// return ['COMMAND'];
8-
// }
9-
10-
// export function transformReply(reply: Array<CommandRawReply>): Array<CommandReply> {
11-
// return reply.map(transformCommandReply);
12-
// }
4+
export default {
5+
IS_READ_ONLY: true,
6+
transformArguments() {
7+
return ['COMMAND'];
8+
},
9+
// TODO: This works, as we don't currently handle any of the items returned as a map
10+
transformReply(reply: UnwrapReply<ArrayReply<CommandRawReply>>): Array<CommandReply> {
11+
return reply.map(transformCommandReply);
12+
}
13+
} as const satisfies Command;
Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1-
// import { RedisCommandArguments } from '.';
2-
// import { CommandRawReply, CommandReply, transformCommandReply } from './generic-transformers';
1+
import { ArrayReply, Command, UnwrapReply } from '../RESP/types';
2+
import { CommandRawReply, CommandReply, transformCommandReply } from './generic-transformers';
33

4-
// export const IS_READ_ONLY = true;
5-
6-
// export function transformArguments(commands: Array<string>): RedisCommandArguments {
7-
// return ['COMMAND', 'INFO', ...commands];
8-
// }
9-
10-
// export function transformReply(reply: Array<CommandRawReply | null>): Array<CommandReply | null> {
11-
// return reply.map(command => command ? transformCommandReply(command) : null);
12-
// }
4+
export default {
5+
FIRST_KEY_INDEX: undefined,
6+
IS_READ_ONLY: true,
7+
transformArguments(commands: Array<string>) {
8+
return ['COMMAND', 'INFO', ...commands];
9+
},
10+
// TODO: This works, as we don't currently handle any of the items returned as a map
11+
transformReply(reply: UnwrapReply<ArrayReply<CommandRawReply>>): Array<CommandReply | null> {
12+
return reply.map(command => command ? transformCommandReply(command) : null);
13+
}
14+
} as const satisfies Command;

packages/client/lib/commands/XAUTOCLAIM.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import { RedisArgument, TuplesReply, BlobStringReply, ArrayReply, NullReply, UnwrapReply, Command } from '../RESP/types';
2-
import { StreamMessageReply, transformStreamMessageNullReply } from './generic-transformers';
2+
import { StreamMessageRawReply, transformStreamMessageNullReply } from './generic-transformers';
33

44
export interface XAutoClaimOptions {
55
COUNT?: number;
66
}
77

88
export type XAutoClaimRawReply = TuplesReply<[
99
nextId: BlobStringReply,
10-
messages: ArrayReply<StreamMessageReply | NullReply>,
10+
messages: ArrayReply<StreamMessageRawReply | NullReply>,
1111
deletedMessages: ArrayReply<BlobStringReply>
1212
]>;
1313

packages/client/lib/commands/XCLAIM.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { RedisArgument, ArrayReply, NullReply, UnwrapReply, Command } from '../RESP/types';
2-
import { RedisVariadicArgument, pushVariadicArguments, StreamMessageReply, transformStreamMessageNullReply } from './generic-transformers';
2+
import { RedisVariadicArgument, pushVariadicArguments, StreamMessageRawReply, transformStreamMessageNullReply } from './generic-transformers';
33

44
export interface XClaimOptions {
55
IDLE?: number;
@@ -50,7 +50,7 @@ export default {
5050

5151
return args;
5252
},
53-
transformReply(reply: UnwrapReply<ArrayReply<StreamMessageReply | NullReply>>) {
53+
transformReply(reply: UnwrapReply<ArrayReply<StreamMessageRawReply | NullReply>>) {
5454
return reply.map(transformStreamMessageNullReply);
5555
}
5656
} as const satisfies Command;

packages/client/lib/commands/XRANGE.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { RedisArgument, ArrayReply, UnwrapReply, Command } from '../RESP/types';
2-
import { StreamMessageReply, transformStreamMessageReply } from './generic-transformers';
2+
import { StreamMessageRawReply, transformStreamMessageReply } from './generic-transformers';
33

44
export interface XRangeOptions {
55
COUNT?: number;
@@ -25,7 +25,7 @@ export default {
2525
FIRST_KEY_INDEX: 1,
2626
IS_READ_ONLY: true,
2727
transformArguments: transformXRangeArguments.bind(undefined, 'XRANGE'),
28-
transformReply(reply: UnwrapReply<ArrayReply<StreamMessageReply>>) {
28+
transformReply(reply: UnwrapReply<ArrayReply<StreamMessageRawReply>>) {
2929
return reply.map(transformStreamMessageReply);
3030
}
3131
} as const satisfies Command;

packages/client/lib/commands/XREAD.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Command, RedisArgument } from '../RESP/types';
2+
import { transformStreamsMessagesReplyResp2, transformStreamsMessagesReplyResp3 } from './generic-transformers';
23

34
export interface XReadStream {
45
key: RedisArgument;
@@ -48,8 +49,9 @@ export default {
4849

4950
return args;
5051
},
51-
// export { transformStreamsMessagesReply as transformReply } from './generic-transformers';
52-
// TODO
53-
transformReply: undefined as unknown as () => unknown
52+
transformReply: {
53+
2: transformStreamsMessagesReplyResp2,
54+
3: transformStreamsMessagesReplyResp3
55+
}
5456
} as const satisfies Command;
5557

packages/client/lib/commands/XREADGROUP.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Command, RedisArgument } from '../RESP/types';
2+
import { transformStreamsMessagesReplyResp2, transformStreamsMessagesReplyResp3 } from './generic-transformers';
23
import XREAD, { XReadStreams, pushXReadStreams } from './XREAD';
34

45
export interface XReadGroupOptions {
@@ -40,7 +41,8 @@ export default {
4041

4142
return args;
4243
},
43-
// export { transformStreamsMessagesReply as transformReply } from './generic-transformers';
44-
// TODO
45-
transformReply: undefined as unknown as () => unknown
44+
transformReply: {
45+
2: transformStreamsMessagesReplyResp2,
46+
3: transformStreamsMessagesReplyResp3
47+
}
4648
} as const satisfies Command;

packages/client/lib/commands/generic-transformers.ts

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,25 @@ export function transformTuplesReply(
8888
return message;
8989
}
9090

91-
export type StreamMessageReply = TuplesReply<[
91+
export type StreamMessageRawReply = TuplesReply<[
9292
id: BlobStringReply,
9393
message: ArrayReply<BlobStringReply>
9494
]>;
9595

96-
export function transformStreamMessageReply(reply: StreamMessageReply) {
96+
export type StreamMessageReply = {
97+
id: BlobStringReply,
98+
message: Record<string, BlobStringReply>,
99+
};
100+
101+
export function transformStreamMessageReply(reply: StreamMessageRawReply): StreamMessageReply {
97102
const [ id, message ] = reply as unknown as UnwrapReply<typeof reply>;
98103
return {
99-
id,
104+
id: id,
100105
message: transformTuplesReply(message)
101106
};
102107
}
103108

104-
export function transformStreamMessageNullReply(reply: StreamMessageReply | NullReply) {
109+
export function transformStreamMessageNullReply(reply: StreamMessageRawReply | NullReply) {
105110
return isNullReply(reply) ? reply : transformStreamMessageReply(reply);
106111
}
107112

@@ -446,3 +451,52 @@ function isPlainKey(key: RedisArgument | ZKeyAndWeight): key is RedisArgument {
446451
function isPlainKeys(keys: Array<RedisArgument> | Array<ZKeyAndWeight>): keys is Array<RedisArgument> {
447452
return isPlainKey(keys[0]);
448453
}
454+
455+
export type StreamMessagesReply = Array<StreamMessageReply>;
456+
457+
export type StreamsMessagesReply = Array<{
458+
name: BlobStringReply | string;
459+
messages: StreamMessagesReply;
460+
}> | null;
461+
462+
export function transformStreamMessagesReply(reply: Array<any>): StreamMessagesReply {
463+
return reply.map(transformStreamMessageReply);
464+
}
465+
466+
export function transformStreamsMessagesReplyResp2(reply: Array<any> | null): StreamsMessagesReply | null {
467+
if (reply === null) return null;
468+
469+
return reply.map(([name, rawMessages]) => ({
470+
name,
471+
messages: transformStreamMessagesReply(rawMessages)
472+
}));
473+
}
474+
475+
export function transformStreamsMessagesReplyResp3(reply: any | null): StreamsMessagesReply | null {
476+
if (reply === null) return null;
477+
478+
if (reply instanceof Map) {
479+
const ret: StreamsMessagesReply = [];
480+
for (const [name, rawMessages] of reply) {
481+
ret.push({
482+
name,
483+
messages: transformStreamMessagesReply(rawMessages),
484+
})
485+
}
486+
487+
return ret;
488+
} else if (reply instanceof Array) {
489+
return transformStreamsMessagesReplyResp2(reply);
490+
} else {
491+
const ret: StreamsMessagesReply = [];
492+
for (const [name, rawMessages] of Object.entries(reply)) {
493+
const m = rawMessages as Array<any>;
494+
ret.push({
495+
name,
496+
messages: transformStreamMessagesReply(m),
497+
})
498+
}
499+
500+
return ret;
501+
}
502+
}

0 commit comments

Comments
 (0)