diff --git a/protocol/deno_streams/reply.ts b/protocol/deno_streams/reply.ts index 64a8b2b4..406f5461 100644 --- a/protocol/deno_streams/reply.ts +++ b/protocol/deno_streams/reply.ts @@ -1,5 +1,6 @@ import type { BufReader } from "../../deps/std/io.ts"; import type * as types from "../shared/types.ts"; +import type { ProtocolEvents } from "../shared/types.ts"; import { ArrayReplyCode, AttributeReplyCode, @@ -19,17 +20,32 @@ import { } from "../shared/reply.ts"; import { EOFError, ErrorReplyError, InvalidStateError } from "../../errors.ts"; import { decoder } from "../../internal/encoding.ts"; +import type { TypedEventTarget } from "../../internal/typed_event_target.ts"; +import { dispatchEvent } from "../../internal/typed_event_target.ts"; -export async function readReply( +export async function readOrEmitReply( reader: BufReader, + eventTarget: TypedEventTarget, returnUint8Arrays?: boolean, ): Promise { - const res = await reader.peek(1); - if (res == null) { - throw new EOFError(); + const code = await peekReplyCode(reader); + if (code === ErrorReplyCode) { + await readErrorReplyOrFail(reader); } - const code = res[0]; + if (code === PushReplyCode) { + const reply = await readPushReply(reader, returnUint8Arrays); + dispatchEvent(eventTarget, "push", reply ?? []); + return readOrEmitReply(reader, eventTarget, returnUint8Arrays); + } + return readReply(reader, returnUint8Arrays); +} + +export async function readReply( + reader: BufReader, + returnUint8Arrays?: boolean, +): Promise { + const code = await peekReplyCode(reader); if (code === ErrorReplyCode) { await readErrorReplyOrFail(reader); } @@ -73,6 +89,15 @@ export async function readReply( } } +async function peekReplyCode(reader: BufReader): Promise { + const res = await reader.peek(1); + if (res == null) { + throw new EOFError(); + } + const code = res[0]; + return code; +} + async function readIntegerReply( reader: BufReader, ): Promise { diff --git a/protocol/shared/types.ts b/protocol/shared/types.ts index ef47abfb..6b544339 100644 --- a/protocol/shared/types.ts +++ b/protocol/shared/types.ts @@ -50,3 +50,7 @@ export type RawOrError = Raw | ErrorReplyError; export const okReply = "OK"; export type Protover = 2 | 3; + +export type ProtocolEvents = { + push: Array; +}; diff --git a/protocol/web_streams/reply.ts b/protocol/web_streams/reply.ts index 94e62e02..4fe3b31d 100644 --- a/protocol/web_streams/reply.ts +++ b/protocol/web_streams/reply.ts @@ -1,4 +1,5 @@ import type * as types from "../shared/types.ts"; +import type { ProtocolEvents } from "../shared/types.ts"; import { ArrayReplyCode, AttributeReplyCode, @@ -19,12 +20,38 @@ import { import { ErrorReplyError, NotImplementedError } from "../../errors.ts"; import { decoder } from "../../internal/encoding.ts"; import type { BufferedReadableStream } from "../../internal/buffered_readable_stream.ts"; +import type { TypedEventTarget } from "../../internal/typed_event_target.ts"; +import { dispatchEvent } from "../../internal/typed_event_target.ts"; + +export async function readOrEmitReply( + readable: BufferedReadableStream, + eventTarget: TypedEventTarget, + returnUint8Arrays?: boolean, +): Promise { + const line = await readable.readLine(); + const code = line[0]; + if (code === PushReplyCode) { + const reply = await parseArrayLikeReply(line, readable, returnUint8Arrays); + dispatchEvent(eventTarget, "push", reply ?? []); + return readOrEmitReply(readable, eventTarget, returnUint8Arrays); + } else { + return parseLine(line, readable, returnUint8Arrays); + } +} export async function readReply( readable: BufferedReadableStream, returnUint8Arrays?: boolean, -) { +): Promise { const line = await readable.readLine(); + return parseLine(line, readable, returnUint8Arrays); +} + +async function parseLine( + line: Uint8Array, + readable: BufferedReadableStream, + returnUint8Arrays?: boolean, +): Promise { const code = line[0]; switch (code) { case ErrorReplyCode: { @@ -56,16 +83,7 @@ export async function readReply( } case ArrayReplyCode: case PushReplyCode: { - const size = Number.parseInt(decoder.decode(line.slice(1))); - if (size === -1) { - // `-1` indicates a null array - return null; - } - const array: Array = []; - for (let i = 0; i < size; i++) { - array.push(await readReply(readable, returnUint8Arrays)); - } - return array; + return parseArrayLikeReply(line, readable, returnUint8Arrays); } case MapReplyCode: { // NOTE: We treat a map type as an array to keep backward compatibility. @@ -130,3 +148,20 @@ export async function readReply( ); } } + +async function parseArrayLikeReply( + line: Uint8Array, + readable: BufferedReadableStream, + returnUint8Arrays?: boolean, +): Promise | null> { + const size = Number.parseInt(decoder.decode(line.slice(1))); + if (size === -1) { + // `-1` indicates a null array + return null; + } + const array: Array = []; + for (let i = 0; i < size; i++) { + array.push(await readReply(readable, returnUint8Arrays)); + } + return array; +}