Skip to content

Commit 287f07b

Browse files
committed
feat: implement RESP3Subscription
1 parent 08237c6 commit 287f07b

File tree

3 files changed

+80
-16
lines changed

3 files changed

+80
-16
lines changed

protocol/deno_streams/reply.ts

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { BufReader } from "../../deps/std/io.ts";
22
import type * as types from "../shared/types.ts";
3+
import type { ProtocolEvents } from "../shared/types.ts";
34
import {
45
ArrayReplyCode,
56
AttributeReplyCode,
@@ -19,17 +20,32 @@ import {
1920
} from "../shared/reply.ts";
2021
import { EOFError, ErrorReplyError, InvalidStateError } from "../../errors.ts";
2122
import { decoder } from "../../internal/encoding.ts";
23+
import type { TypedEventTarget } from "../../internal/typed_event_target.ts";
24+
import { dispatchEvent } from "../../internal/typed_event_target.ts";
2225

23-
export async function readReply(
26+
export async function readOrEmitReply(
2427
reader: BufReader,
28+
eventTarget: TypedEventTarget<ProtocolEvents>,
2529
returnUint8Arrays?: boolean,
2630
): Promise<types.RedisReply> {
27-
const res = await reader.peek(1);
28-
if (res == null) {
29-
throw new EOFError();
31+
const code = await peekReplyCode(reader);
32+
if (code === ErrorReplyCode) {
33+
await readErrorReplyOrFail(reader);
3034
}
3135

32-
const code = res[0];
36+
if (code === PushReplyCode) {
37+
const reply = await readPushReply(reader, returnUint8Arrays);
38+
dispatchEvent(eventTarget, "push", reply ?? []);
39+
return readOrEmitReply(reader, eventTarget, returnUint8Arrays);
40+
}
41+
return readReply(reader, returnUint8Arrays);
42+
}
43+
44+
export async function readReply(
45+
reader: BufReader,
46+
returnUint8Arrays?: boolean,
47+
): Promise<types.RedisReply> {
48+
const code = await peekReplyCode(reader);
3349
if (code === ErrorReplyCode) {
3450
await readErrorReplyOrFail(reader);
3551
}
@@ -73,6 +89,15 @@ export async function readReply(
7389
}
7490
}
7591

92+
async function peekReplyCode(reader: BufReader): Promise<number> {
93+
const res = await reader.peek(1);
94+
if (res == null) {
95+
throw new EOFError();
96+
}
97+
const code = res[0];
98+
return code;
99+
}
100+
76101
async function readIntegerReply(
77102
reader: BufReader,
78103
): Promise<number> {

protocol/shared/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,7 @@ export type RawOrError = Raw | ErrorReplyError;
5050
export const okReply = "OK";
5151

5252
export type Protover = 2 | 3;
53+
54+
export type ProtocolEvents = {
55+
push: Array<RedisReply>;
56+
};

protocol/web_streams/reply.ts

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type * as types from "../shared/types.ts";
2+
import type { ProtocolEvents } from "../shared/types.ts";
23
import {
34
ArrayReplyCode,
45
AttributeReplyCode,
@@ -19,12 +20,38 @@ import {
1920
import { ErrorReplyError, NotImplementedError } from "../../errors.ts";
2021
import { decoder } from "../../internal/encoding.ts";
2122
import type { BufferedReadableStream } from "../../internal/buffered_readable_stream.ts";
23+
import type { TypedEventTarget } from "../../internal/typed_event_target.ts";
24+
import { dispatchEvent } from "../../internal/typed_event_target.ts";
25+
26+
export async function readOrEmitReply(
27+
readable: BufferedReadableStream,
28+
eventTarget: TypedEventTarget<ProtocolEvents>,
29+
returnUint8Arrays?: boolean,
30+
): Promise<types.RedisReply> {
31+
const line = await readable.readLine();
32+
const code = line[0];
33+
if (code === PushReplyCode) {
34+
const reply = await parseArrayLikeReply(line, readable, returnUint8Arrays);
35+
dispatchEvent(eventTarget, "push", reply ?? []);
36+
return readOrEmitReply(readable, eventTarget, returnUint8Arrays);
37+
} else {
38+
return parseLine(line, readable, returnUint8Arrays);
39+
}
40+
}
2241

2342
export async function readReply(
2443
readable: BufferedReadableStream,
2544
returnUint8Arrays?: boolean,
26-
) {
45+
): Promise<types.RedisReply> {
2746
const line = await readable.readLine();
47+
return parseLine(line, readable, returnUint8Arrays);
48+
}
49+
50+
async function parseLine(
51+
line: Uint8Array,
52+
readable: BufferedReadableStream,
53+
returnUint8Arrays?: boolean,
54+
): Promise<types.RedisReply> {
2855
const code = line[0];
2956
switch (code) {
3057
case ErrorReplyCode: {
@@ -56,16 +83,7 @@ export async function readReply(
5683
}
5784
case ArrayReplyCode:
5885
case PushReplyCode: {
59-
const size = Number.parseInt(decoder.decode(line.slice(1)));
60-
if (size === -1) {
61-
// `-1` indicates a null array
62-
return null;
63-
}
64-
const array: Array<types.RedisReply> = [];
65-
for (let i = 0; i < size; i++) {
66-
array.push(await readReply(readable, returnUint8Arrays));
67-
}
68-
return array;
86+
return parseArrayLikeReply(line, readable, returnUint8Arrays);
6987
}
7088
case MapReplyCode: {
7189
// NOTE: We treat a map type as an array to keep backward compatibility.
@@ -130,3 +148,20 @@ export async function readReply(
130148
);
131149
}
132150
}
151+
152+
async function parseArrayLikeReply(
153+
line: Uint8Array,
154+
readable: BufferedReadableStream,
155+
returnUint8Arrays?: boolean,
156+
): Promise<Array<types.RedisReply> | null> {
157+
const size = Number.parseInt(decoder.decode(line.slice(1)));
158+
if (size === -1) {
159+
// `-1` indicates a null array
160+
return null;
161+
}
162+
const array: Array<types.RedisReply> = [];
163+
for (let i = 0; i < size; i++) {
164+
array.push(await readReply(readable, returnUint8Arrays));
165+
}
166+
return array;
167+
}

0 commit comments

Comments
 (0)