diff --git a/packages/client/lib/client/socket.spec.ts b/packages/client/lib/client/socket.spec.ts index 2271e163af..581da7ea76 100644 --- a/packages/client/lib/client/socket.spec.ts +++ b/packages/client/lib/client/socket.spec.ts @@ -1,6 +1,7 @@ import { strict as assert } from 'node:assert'; import { spy } from 'sinon'; import { once } from 'node:events'; +import net from 'node:net'; import RedisSocket, { RedisSocketOptions } from './socket'; import testUtils, { GLOBAL } from '../test-utils'; import { setTimeout } from 'timers/promises'; @@ -86,6 +87,96 @@ describe('Socket', () => { }); }); + describe('write', () => { + function captureUnderlyingSocket() { + const original = net.createConnection; + const captured: { socket?: net.Socket } = {}; + const target = net as unknown as { createConnection: unknown }; + target.createConnection = (...args: unknown[]) => { + const s = (original as unknown as (...a: unknown[]) => net.Socket).apply(net, args); + captured.socket = s; + return s; + }; + return { + captured, + restore() { + target.createConnection = original; + } + }; + } + + async function withConnectedSocket( + fn: (socket: RedisSocket, underlying: net.Socket) => Promise + ) { + const server = net.createServer(); + server.on('connection', conn => conn.on('error', () => { /* ignore */ })); + await new Promise(resolve => server.listen(0, '127.0.0.1', resolve)); + const { port } = server.address() as net.AddressInfo; + + const capture = captureUnderlyingSocket(); + try { + const socket = createSocket({ + host: '127.0.0.1', + port, + reconnectStrategy: false + }); + + await socket.connect(); + assert.ok(capture.captured.socket, 'captured underlying socket'); + + try { + await fn(socket, capture.captured.socket!); + } finally { + // Tear down the connection so server.close() doesn't wait for it. + capture.captured.socket?.destroy(); + } + } finally { + capture.restore(); + await new Promise(resolve => server.close(() => resolve())); + } + } + + it('should short-circuit when the underlying socket is no longer writable (#3282)', async () => { + await withConnectedSocket(async (socket, underlying) => { + Object.defineProperty(underlying, 'writable', { + value: false, + configurable: true + }); + + const writeSpy = spy(underlying, 'write'); + socket.write([[Buffer.from('PING\r\n')]]); + assert.equal(writeSpy.callCount, 0, 'must not call write on a non-writable socket'); + }); + }); + + it('should swallow synchronous EPIPE from net.Socket.write (#3282)', async () => { + await withConnectedSocket(async (socket, underlying) => { + underlying.write = (() => { + const err: NodeJS.ErrnoException = new Error('write EPIPE'); + err.code = 'EPIPE'; + throw err; + }) as net.Socket['write']; + + assert.doesNotThrow(() => + socket.write([[Buffer.from('PING\r\n')]]) + ); + }); + }); + + it('should rethrow non-EPIPE errors from net.Socket.write', async () => { + await withConnectedSocket(async (socket, underlying) => { + underlying.write = (() => { + throw new Error('boom'); + }) as net.Socket['write']; + + assert.throws( + () => socket.write([[Buffer.from('PING\r\n')]]), + /boom/ + ); + }); + }); + }); + describe('socketTimeout', () => { const timeout = 50; testUtils.testWithClient( @@ -132,7 +223,7 @@ describe('Socket', () => { assert.equal(client.isReady, true, 'client.isReady'); assert.equal(client.isOpen, true, 'client.isOpen'); - client.on('error', err => { + client.on('error', _err => { assert.fail('Should not have timed out or errored in any way'); }); await setTimeout(100); diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index c07bd44c73..974feec136 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -381,17 +381,28 @@ export default class RedisSocket extends EventEmitter { write(iterable: Iterable>) { - if (!this.#socket) return; + if (!this.#socket || !this.#socket.writable) return; this.#socket.cork(); - for (const args of iterable) { - for (const toWrite of args) { - this.#socket.write(toWrite); - } + try { + for (const args of iterable) { + for (const toWrite of args) { + this.#socket.write(toWrite); + } - if (this.#socket.writableNeedDrain) break; + if (this.#socket.writableNeedDrain) break; + } + } catch (err) { + // net.Socket.write can throw synchronously on a half-closed socket + // (writeAfterFIN -> EPIPE) before the 'close' event fires. The pending + // command has already been moved to #waitingForReply by the queue's + // generator, so the close handler will reject it on reconnect. + if (!err || (err as NodeJS.ErrnoException).code !== 'EPIPE') { + throw err; + } + } finally { + this.#socket.uncork(); } - this.#socket.uncork(); } async quit(fn: () => Promise): Promise {