Skip to content

Commit 9ec2f83

Browse files
committed
chore(refactor): rework reading/writing
1 parent 0214516 commit 9ec2f83

File tree

2 files changed

+34
-49
lines changed

2 files changed

+34
-49
lines changed

src/client.ts

Lines changed: 34 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { castArrayIfExists } from "radashi";
22
import { RedisChannel, RedisChannelPattern } from "./channel";
33
import { Materialize, RedisCommand, RedisValue } from "./command";
44
import { ExtractMessageEvent, Subscriber } from "./subscriber";
5-
import { ConnectionInstance, RedisClientOptions, RedisResponse } from "./type";
5+
import { RedisClientOptions, RedisResponse } from "./type";
66
import { createParser } from "./utils/create-parser";
77
import { encodeCommand } from "./utils/encode-command";
88
import { getConnectFn } from "./utils/get-connect-fn";
@@ -13,7 +13,7 @@ type MaybeArray<T> = T | readonly T[];
1313
export class RedisClient {
1414
#encoder = new TextEncoder();
1515
#connected = false;
16-
#connection: Promise<ConnectionInstance> | null = null;
16+
#connection: Promise<Socket> | null = null;
1717
#writeLock = Promise.resolve();
1818
#responseQueue: {
1919
resolve: (value: RedisResponse) => void;
@@ -90,7 +90,6 @@ export class RedisClient {
9090

9191
this.#connection = (async () => {
9292
const connect = await getConnectFn(this.options.connectFn);
93-
9493
const socket = connect(
9594
{
9695
hostname: this.config.hostname,
@@ -102,45 +101,33 @@ export class RedisClient {
102101
},
103102
);
104103

104+
socket.closed.then(this.onClose, this.onClose);
105105
await socket.opened;
106106

107-
return {
108-
socket,
109-
writer: socket.writable.getWriter(),
110-
reader: socket.readable.getReader(),
111-
};
107+
return socket;
112108
})();
113109

114110
// Listen for socket close events and parse responses.
115-
this.#connection.then(async (connection) => {
116-
let closed = false;
117-
connection.socket.closed.then(() => {
118-
closed = true;
119-
});
120-
121-
try {
122-
while (true) {
123-
const result = await connection.reader.read();
124-
if (!result) {
125-
break;
126-
}
127-
if (result.value) {
128-
this.parser(result.value);
129-
}
130-
if (result.done) {
131-
break;
132-
}
111+
this.#connection.then(async (socket) => {
112+
const reader = socket.readable.getReader();
113+
while (true) {
114+
const result = await reader.read().catch(() => null);
115+
if (!result) {
116+
break;
133117
}
134-
} catch (error) {
135-
if (!closed && this.#connection) {
136-
console.error(error);
118+
if (result.value) {
119+
this.parser(result.value);
120+
}
121+
if (result.done) {
122+
reader.releaseLock();
123+
break;
137124
}
138125
}
139126
});
140127

141128
if (this.config.password || this.config.database) {
142129
// AUTH and SELECT block all other commands until they are resolved.
143-
this.#connection = this.#connection.then(async (connection) => {
130+
this.#connection = this.#connection.then(async (socket) => {
144131
const commands: [string, ...RedisValue[]][] = [];
145132
if (this.config.password) {
146133
const username = castArrayIfExists(this.config.username) ?? [];
@@ -151,11 +138,11 @@ export class RedisClient {
151138
}
152139

153140
// Wait for writing to finish...
154-
const promises = await this.writeCommands(commands, connection.writer);
141+
const promises = await this.writeCommands(commands, socket);
155142
// Then wait for all commands to finish...
156143
await Promise.all(promises);
157144

158-
return connection;
145+
return socket;
159146
});
160147
}
161148

@@ -200,15 +187,15 @@ export class RedisClient {
200187
): Promise<any>;
201188

202189
public async sendRaw(command: RedisCommand | (RedisCommand | undefined)[]) {
203-
const connection = await this.connect();
190+
const socket = await this.connect();
204191

205192
let promises: (Promise<RedisResponse> | undefined)[];
206193

207194
// Use a write lock to avoid out-of-order command execution.
208195
await (this.#writeLock = this.#writeLock.then(async () => {
209196
promises = await this.writeCommands(
210197
Array.isArray(command) ? command.map((c) => c?.args) : [command.args],
211-
connection.writer,
198+
socket,
212199
);
213200
}));
214201

@@ -221,7 +208,7 @@ export class RedisClient {
221208

222209
private async writeCommands(
223210
commands: ([string, ...RedisValue[]] | undefined)[],
224-
writer: WritableStreamDefaultWriter<Uint8Array>,
211+
socket: Socket,
225212
) {
226213
const stack = new Error().stack;
227214
const chunks: Array<string | Uint8Array> = [];
@@ -240,11 +227,14 @@ export class RedisClient {
240227
});
241228
});
242229
});
230+
231+
const writer = socket.writable.getWriter();
243232
for (const chunk of chunks) {
244233
await writer.write(
245234
chunk instanceof Uint8Array ? chunk : this.#encoder.encode(chunk),
246235
);
247236
}
237+
writer.releaseLock();
248238
return promises;
249239
}
250240

@@ -255,7 +245,9 @@ export class RedisClient {
255245
* You may unsubscribe through the `ReadableStream#cancel` or
256246
* `MessageEvent#cancel` methods.
257247
*/
258-
subscribe<TPattern extends MaybeArray<RedisChannel | RedisChannelPattern>>(
248+
public subscribe<
249+
TPattern extends MaybeArray<RedisChannel | RedisChannelPattern>,
250+
>(
259251
pattern: TPattern,
260252
signal?: AbortSignal,
261253
): ReadableStream<
@@ -265,16 +257,15 @@ export class RedisClient {
265257
return subscriber.subscribe(pattern, signal);
266258
}
267259

268-
public async close(err?: Error) {
269-
if (!this.#connection) return;
270-
271-
const connection = await this.#connection;
260+
private onClose = () => {
272261
this.#connection = null;
273262
this.#writeLock = Promise.resolve();
263+
};
274264

275-
await connection.reader.cancel(err);
276-
await connection.writer.abort(err);
277-
await connection.socket.close();
265+
public async close() {
266+
if (!this.#connection) return;
267+
const socket = await this.#connection;
268+
await socket.close();
278269
}
279270

280271
public async closeSubscriptions() {

src/type.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,3 @@ export type Redis = ((
4242
};
4343

4444
export type Socket = ReturnType<typeof connect>;
45-
46-
export interface ConnectionInstance {
47-
writer: WritableStreamDefaultWriter<any>;
48-
reader: ReadableStreamDefaultReader<any>;
49-
socket: Socket;
50-
}

0 commit comments

Comments
 (0)