Skip to content

Commit 2b2bbb5

Browse files
committed
handle xread/xreadgroup with typing.
1 parent 4066097 commit 2b2bbb5

File tree

1 file changed

+44
-34
lines changed

1 file changed

+44
-34
lines changed

packages/client/lib/commands/generic-transformers.ts

Lines changed: 44 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { UnwrapReply, ArrayReply, BlobStringReply, BooleanReply, CommandArguments, DoubleReply, NullReply, NumberReply, RedisArgument, TuplesReply } from '../RESP/types';
1+
import { UnwrapReply, ArrayReply, BlobStringReply, BooleanReply, CommandArguments, DoubleReply, NullReply, NumberReply, RedisArgument, TuplesReply, MapReply } from '../RESP/types';
22

33
export function isNullReply(reply: unknown): reply is NullReply {
44
return reply === null;
@@ -88,28 +88,6 @@ export function transformTuplesReply(
8888
return message;
8989
}
9090

91-
export type StreamMessageRawReply = TuplesReply<[
92-
id: BlobStringReply,
93-
message: ArrayReply<BlobStringReply>
94-
]>;
95-
96-
export type StreamMessageReply = {
97-
id: BlobStringReply,
98-
message: Record<string, BlobStringReply>,
99-
};
100-
101-
export function transformStreamMessageReply(reply: StreamMessageRawReply): StreamMessageReply {
102-
const [ id, message ] = reply as unknown as UnwrapReply<typeof reply>;
103-
return {
104-
id: id,
105-
message: transformTuplesReply(message)
106-
};
107-
}
108-
109-
export function transformStreamMessageNullReply(reply: StreamMessageRawReply | NullReply) {
110-
return isNullReply(reply) ? reply : transformStreamMessageReply(reply);
111-
}
112-
11391
export interface SortedSetMember {
11492
value: RedisArgument;
11593
score: number;
@@ -452,27 +430,60 @@ function isPlainKeys(keys: Array<RedisArgument> | Array<ZKeyAndWeight>): keys is
452430
return isPlainKey(keys[0]);
453431
}
454432

433+
export type StreamMessageRawReply = TuplesReply<[
434+
id: BlobStringReply,
435+
message: ArrayReply<BlobStringReply>
436+
]>;
437+
438+
export type StreamMessageReply = {
439+
id: BlobStringReply,
440+
message: Record<string, BlobStringReply>,
441+
};
442+
443+
export function transformStreamMessageReply(reply: StreamMessageRawReply): StreamMessageReply {
444+
const [ id, message ] = reply as unknown as UnwrapReply<typeof reply>;
445+
return {
446+
id: id,
447+
message: transformTuplesReply(message)
448+
};
449+
}
450+
451+
export function transformStreamMessageNullReply(reply: StreamMessageRawReply | NullReply) {
452+
return isNullReply(reply) ? reply : transformStreamMessageReply(reply);
453+
}
454+
455455
export type StreamMessagesReply = Array<StreamMessageReply>;
456456

457457
export type StreamsMessagesReply = Array<{
458458
name: BlobStringReply | string;
459459
messages: StreamMessagesReply;
460460
}> | null;
461461

462-
export function transformStreamMessagesReply(reply: Array<any>): StreamMessagesReply {
462+
export function transformStreamMessagesReply(r: ArrayReply<StreamMessageRawReply>): StreamMessagesReply {
463+
const reply = r as unknown as UnwrapReply<typeof r>;
464+
463465
return reply.map(transformStreamMessageReply);
464466
}
465467

466-
export function transformStreamsMessagesReplyResp2(reply: Array<any> | null): StreamsMessagesReply | null {
468+
type StreamMessagesRawReply = TuplesReply<[name: BlobStringReply, ArrayReply<StreamMessageRawReply>]>;
469+
type StreamsMessagesRawReply2 = ArrayReply<StreamMessagesRawReply>;
470+
471+
export function transformStreamsMessagesReplyResp2(reply: UnwrapReply<StreamsMessagesRawReply2> | UnwrapReply<NullReply>): StreamsMessagesReply | null {
467472
if (reply === null) return null;
468473

469-
return reply.map(([name, rawMessages]) => ({
470-
name,
471-
messages: transformStreamMessagesReply(rawMessages)
472-
}));
474+
return reply.map((s) => {
475+
const stream = s as unknown as UnwrapReply<typeof s>;
476+
477+
return {
478+
name: stream[0],
479+
messages: transformStreamMessagesReply(stream[1])
480+
}
481+
})
473482
}
474483

475-
export function transformStreamsMessagesReplyResp3(reply: any | null): StreamsMessagesReply | null {
484+
type StreamsMessagesRawReply3 = MapReply<BlobStringReply, ArrayReply<StreamMessageRawReply>>;
485+
486+
export function transformStreamsMessagesReplyResp3(reply: UnwrapReply<StreamsMessagesRawReply3> | UnwrapReply<NullReply>): StreamsMessagesReply | null {
476487
if (reply === null) return null;
477488

478489
const ret: StreamsMessagesReply = [];
@@ -487,8 +498,8 @@ export function transformStreamsMessagesReplyResp3(reply: any | null): StreamsMe
487498
return ret;
488499
} else if (reply instanceof Array) {
489500
for (let i=0; i < reply.length; i += 2) {
490-
const name = reply[i];
491-
const rawMessages = reply[i+1];
501+
const name = reply[i] as BlobStringReply
502+
const rawMessages = reply[i+1] as ArrayReply<StreamMessageRawReply>;
492503

493504
ret.push({
494505
name,
@@ -497,10 +508,9 @@ export function transformStreamsMessagesReplyResp3(reply: any | null): StreamsMe
497508
}
498509
} else {
499510
for (const [name, rawMessages] of Object.entries(reply)) {
500-
const m = rawMessages as Array<any>;
501511
ret.push({
502512
name,
503-
messages: transformStreamMessagesReply(m),
513+
messages: transformStreamMessagesReply(rawMessages),
504514
})
505515
}
506516
}

0 commit comments

Comments
 (0)