Skip to content
201 changes: 84 additions & 117 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { on } from 'stream';
import { type Readable, Transform, type TransformCallback } from 'stream';
import { clearTimeout, setTimeout } from 'timers';
import { promisify } from 'util';

Expand Down Expand Up @@ -61,6 +61,7 @@ import type { ClientMetadata } from './handshake/client_metadata';
import { MessageStream, type OperationDescription } from './message_stream';
import { StreamDescription, type StreamDescriptionOptions } from './stream_description';
import { decompressResponse } from './wire_protocol/compression';
import { onData } from './wire_protocol/on_data';
import { getReadPreference, isSharded } from './wire_protocol/shared';

/** @internal */
Expand Down Expand Up @@ -786,17 +787,19 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
/** @internal */
authContext?: AuthContext;

/**@internal */
delayedTimeoutId: NodeJS.Timeout | null = null;
/** @internal */
[kDescription]: StreamDescription;
/** @internal */
[kGeneration]: number;
/** @internal */
[kLastUseTime]: number;
/** @internal */
socket: Stream;
controller: AbortController;

private socket: Stream;
private controller: AbortController;
private messageStream: Readable;
private socketWrite: (buffer: Uint8Array) => Promise<void>;

/** @internal */
[kHello]: Document | null;
/** @internal */
Expand Down Expand Up @@ -836,9 +839,18 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {

this.socket = stream;
this.controller = new AbortController();
this.socket.on('error', this.onError.bind(this));

this.messageStream = this.socket
.on('error', this.onError.bind(this))
.pipe(new SizedMessageTransform({ connection: this }))
.on('error', this.onError.bind(this));
this.socket.on('close', this.onClose.bind(this));
this.socket.on('timeout', this.onTimeout.bind(this));

const socketWrite = promisify(this.socket.write.bind(this.socket));
this.socketWrite = async buffer => {
return abortable(socketWrite(buffer), { signal: this.controller.signal });
};
}

async commandAsync(...args: Parameters<typeof this.command>) {
Expand Down Expand Up @@ -1039,23 +1051,19 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
}

try {
await writeCommand(this, message, {
await this.writeCommand(message, {
agreedCompressor: this.description.compressor ?? 'none',
zlibCompressionLevel: this.description.zlibCompressionLevel,
signal: this.controller.signal
zlibCompressionLevel: this.description.zlibCompressionLevel
});

// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

if (options.noResponse) {
yield { ok: 1 };
return;
}

this.controller.signal.throwIfAborted();

for await (const response of readMany(this, { signal: this.controller.signal })) {
for await (const response of this.readMany()) {
this.socket.setTimeout(0);
response.parse(options);

Expand All @@ -1073,9 +1081,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
}
}

// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

yield document;
this.controller.signal.throwIfAborted();

Expand Down Expand Up @@ -1181,121 +1186,83 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
};
exhaustLoop().catch(replyListener);
}
}

const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;

/**
* @internal
*
* This helper reads chucks of data out of a socket and buffers them until it has received a
* full wire protocol message.
*
* By itself, produces an infinite async generator of wire protocol messages and consumers must end
* the stream by calling `return` on the generator.
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
export async function* readWireProtocolMessages(
connection: ModernConnection,
{ signal }: { signal?: AbortSignal } = {}
): AsyncGenerator<Buffer> {
const bufferPool = new BufferPool();
const maxBsonMessageSize = connection.hello?.maxBsonMessageSize ?? kDefaultMaxBsonMessageSize;
for await (const [chunk] of on(connection.socket, 'data', { signal })) {
if (connection.delayedTimeoutId) {
clearTimeout(connection.delayedTimeoutId);
connection.delayedTimeoutId = null;
}

bufferPool.append(chunk);
const sizeOfMessage = bufferPool.getInt32();
/**
* @internal
*
* Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method
* waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
*/
async writeCommand(
command: WriteProtocolMessageType,
options: Partial<Pick<OperationDescription, 'agreedCompressor' | 'zlibCompressionLevel'>>
): Promise<void> {
const finalCommand =
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
? command
: new OpCompressedRequest(command, {
agreedCompressor: options.agreedCompressor ?? 'none',
zlibCompressionLevel: options.zlibCompressionLevel ?? 0
});

if (sizeOfMessage == null) {
continue;
}
const buffer = Buffer.concat(await finalCommand.toBin());

if (sizeOfMessage < 0) {
throw new MongoParseError(`Invalid message size: ${sizeOfMessage}`);
}
return this.socketWrite(buffer);
}

if (sizeOfMessage > maxBsonMessageSize) {
throw new MongoParseError(
`Invalid message size: ${sizeOfMessage}, max allowed: ${maxBsonMessageSize}`
);
}
/**
* @internal
*
* Returns an async generator that yields full wire protocol messages from the underlying socket. This function
* yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request
* by calling `return` on the generator.
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
async *readMany(): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
for await (const message of onData(this.messageStream, { signal: this.controller.signal })) {
const response = await decompressResponse(message);
yield response;

if (sizeOfMessage > bufferPool.length) {
continue;
if (!response.moreToCome) {
return;
}
}

yield bufferPool.read(sizeOfMessage);
}
}

/**
* @internal
*
* Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method
* waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
*/
export async function writeCommand(
connection: ModernConnection,
command: WriteProtocolMessageType,
options: Partial<Pick<OperationDescription, 'agreedCompressor' | 'zlibCompressionLevel'>> & {
signal?: AbortSignal;
}
): Promise<void> {
const finalCommand =
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
? command
: new OpCompressedRequest(command, {
agreedCompressor: options.agreedCompressor ?? 'none',
zlibCompressionLevel: options.zlibCompressionLevel ?? 0
});
/** @internal */
export class SizedMessageTransform extends Transform {
bufferPool: BufferPool;
connection: ModernConnection;

constructor({ connection }: { connection: ModernConnection }) {
super({ objectMode: false });
this.bufferPool = new BufferPool();
this.connection = connection;
}
override _transform(chunk: Buffer, encoding: unknown, callback: TransformCallback): void {
if (this.connection.delayedTimeoutId != null) {
clearTimeout(this.connection.delayedTimeoutId);
this.connection.delayedTimeoutId = null;
}

const buffer = Buffer.concat(await finalCommand.toBin());
this.bufferPool.append(chunk);
const sizeOfMessage = this.bufferPool.getInt32();

const socketWriteFn = promisify(connection.socket.write.bind(connection.socket));
if (sizeOfMessage == null) {
return callback();
}

return abortable(socketWriteFn(buffer), options);
}
if (sizeOfMessage < 0) {
return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}, too small`));
}

/**
* @internal
*
* Returns an async generator that yields full wire protocol messages from the underlying socket. This function
* yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request
* by calling `return` on the generator.
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
export async function* readMany(
connection: ModernConnection,
options: { signal?: AbortSignal } = {}
): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
for await (const message of readWireProtocolMessages(connection, options)) {
const response = await decompressResponse(message);
yield response;

if (!response.moreToCome) {
return;
if (sizeOfMessage > this.bufferPool.length) {
return callback();
}
}
}

/**
* @internal
*
* Reads a single wire protocol message out of a connection.
*/
export async function read(
connection: ModernConnection,
options: { signal?: AbortSignal } = {}
): Promise<OpMsgResponse | OpQueryResponse> {
for await (const value of readMany(connection, options)) {
return value;
const message = this.bufferPool.read(sizeOfMessage);
return callback(null, message);
}

throw new MongoRuntimeError('unable to read message off of connection');
}
99 changes: 99 additions & 0 deletions src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { type EventEmitter } from 'events';

import { List, promiseWithResolvers } from '../../utils';

type PendingPromises = Omit<
ReturnType<typeof promiseWithResolvers<IteratorResult<Buffer>>>,
'promise'
>;

export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) {
const signal = options.signal;
signal.throwIfAborted();

// Preparing controlling queues and variables
const unconsumedEvents = new List<Buffer>();
const unconsumedPromises = new List<PendingPromises>();
let error: Error | null = null;
let finished = false;

const iterator: AsyncGenerator<Buffer> = {
next() {
// First, we consume all unread events
const value = unconsumedEvents.shift();
if (value != null) {
return Promise.resolve({ value, done: false });
}

// Then we error, if an error happened
// This happens one time if at all, because after 'error'
// we stop listening
if (error != null) {
const p = Promise.reject(error);
// Only the first element errors
error = null;
return p;
}

// If the iterator is finished, resolve to done
if (finished) return closeHandler();

// Wait until an event happens
const { promise, resolve, reject } = promiseWithResolvers<IteratorResult<Buffer>>();
unconsumedPromises.push({ resolve, reject });
return promise;
},

return() {
return closeHandler();
},

throw(err: Error) {
errorHandler(err);
return Promise.resolve({ value: undefined, done: true });
},

[Symbol.asyncIterator]() {
return this;
}
};

// Adding event handlers
emitter.on('data', eventHandler);
emitter.on('error', errorHandler);
signal.addEventListener('abort', abortListener, { once: true });

return iterator;

function abortListener() {
errorHandler(signal.reason);
}

function eventHandler(value: Buffer) {
const promise = unconsumedPromises.shift();
if (promise != null) promise.resolve({ value, done: false });
else unconsumedEvents.push(value);
}

function errorHandler(err: Error) {
const promise = unconsumedPromises.shift();
if (promise != null) promise.reject(err);
else error = err;
void closeHandler();
}

function closeHandler() {
// Adding event handlers
emitter.off('data', eventHandler);
emitter.off('error', errorHandler);
signal.removeEventListener('abort', abortListener);
finished = true;
const doneResult = { value: undefined, done: finished } as const;

for (const promise of unconsumedPromises) {
promise.resolve(doneResult);
}

return Promise.resolve(doneResult);
}
}
5 changes: 1 addition & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,7 @@ export type {
ConnectionOptions,
DestroyOptions,
ModernConnection,
ProxyOptions,
read,
readMany,
writeCommand
ProxyOptions
} from './cmap/connection';
export type {
CloseOptions,
Expand Down
Loading