Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 92 additions & 1 deletion packages/client/lib/client/socket.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { strict as assert } from 'node:assert';
import { spy } from 'sinon';
import { once } from 'node:events';
import net from 'node:net';
import RedisSocket, { RedisSocketOptions } from './socket';
import testUtils, { GLOBAL } from '../test-utils';
import { setTimeout } from 'timers/promises';
Expand Down Expand Up @@ -86,6 +87,96 @@ describe('Socket', () => {
});
});

describe('write', () => {
function captureUnderlyingSocket() {
const original = net.createConnection;
const captured: { socket?: net.Socket } = {};
const target = net as unknown as { createConnection: unknown };
target.createConnection = (...args: unknown[]) => {
const s = (original as unknown as (...a: unknown[]) => net.Socket).apply(net, args);
captured.socket = s;
return s;
};
return {
captured,
restore() {
target.createConnection = original;
}
};
}

async function withConnectedSocket(
fn: (socket: RedisSocket, underlying: net.Socket) => Promise<void>
) {
const server = net.createServer();
server.on('connection', conn => conn.on('error', () => { /* ignore */ }));
await new Promise<void>(resolve => server.listen(0, '127.0.0.1', resolve));
const { port } = server.address() as net.AddressInfo;

const capture = captureUnderlyingSocket();
try {
const socket = createSocket({
host: '127.0.0.1',
port,
reconnectStrategy: false
});

await socket.connect();
assert.ok(capture.captured.socket, 'captured underlying socket');

try {
await fn(socket, capture.captured.socket!);
} finally {
// Tear down the connection so server.close() doesn't wait for it.
capture.captured.socket?.destroy();
}
} finally {
capture.restore();
await new Promise<void>(resolve => server.close(() => resolve()));
}
}

it('should short-circuit when the underlying socket is no longer writable (#3282)', async () => {
await withConnectedSocket(async (socket, underlying) => {
Object.defineProperty(underlying, 'writable', {
value: false,
configurable: true
});

const writeSpy = spy(underlying, 'write');
socket.write([[Buffer.from('PING\r\n')]]);
assert.equal(writeSpy.callCount, 0, 'must not call write on a non-writable socket');
});
});

it('should swallow synchronous EPIPE from net.Socket.write (#3282)', async () => {
await withConnectedSocket(async (socket, underlying) => {
underlying.write = (() => {
const err: NodeJS.ErrnoException = new Error('write EPIPE');
err.code = 'EPIPE';
throw err;
}) as net.Socket['write'];

assert.doesNotThrow(() =>
socket.write([[Buffer.from('PING\r\n')]])
);
});
});

it('should rethrow non-EPIPE errors from net.Socket.write', async () => {
await withConnectedSocket(async (socket, underlying) => {
underlying.write = (() => {
throw new Error('boom');
}) as net.Socket['write'];

assert.throws(
() => socket.write([[Buffer.from('PING\r\n')]]),
/boom/
);
});
});
});

describe('socketTimeout', () => {
const timeout = 50;
testUtils.testWithClient(
Expand Down Expand Up @@ -132,7 +223,7 @@ describe('Socket', () => {
assert.equal(client.isReady, true, 'client.isReady');
assert.equal(client.isOpen, true, 'client.isOpen');

client.on('error', err => {
client.on('error', _err => {
assert.fail('Should not have timed out or errored in any way');
});
await setTimeout(100);
Expand Down
25 changes: 18 additions & 7 deletions packages/client/lib/client/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,17 +381,28 @@ export default class RedisSocket extends EventEmitter {


write(iterable: Iterable<ReadonlyArray<RedisArgument>>) {
if (!this.#socket) return;
if (!this.#socket || !this.#socket.writable) return;

this.#socket.cork();
for (const args of iterable) {
for (const toWrite of args) {
this.#socket.write(toWrite);
}
try {
for (const args of iterable) {
for (const toWrite of args) {
this.#socket.write(toWrite);
}

if (this.#socket.writableNeedDrain) break;
if (this.#socket.writableNeedDrain) break;
}
} catch (err) {
// net.Socket.write can throw synchronously on a half-closed socket
// (writeAfterFIN -> EPIPE) before the 'close' event fires. The pending
// command has already been moved to #waitingForReply by the queue's
// generator, so the close handler will reject it on reconnect.
if (!err || (err as NodeJS.ErrnoException).code !== 'EPIPE') {
throw err;
}
} finally {
this.#socket.uncork();
}
this.#socket.uncork();
}

async quit<T>(fn: () => Promise<T>): Promise<T> {
Expand Down
Loading