Skip to content

Commit 78b2dd5

Browse files
authored
perf(NODE-4727): Improve performance of buffering and cursors (mongodb#3447)
1 parent 4e55a04 commit 78b2dd5

File tree

14 files changed

+1932
-1921
lines changed

14 files changed

+1932
-1921
lines changed

package-lock.json

Lines changed: 1027 additions & 1698 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
},
2727
"dependencies": {
2828
"bson": "^4.7.0",
29-
"denque": "^2.1.0",
3029
"mongodb-connection-string-url": "^2.5.4",
3130
"socks": "^2.7.1"
3231
},

src/cmap/connection_pool.ts

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import Denque = require('denque');
21
import { clearTimeout, setTimeout } from 'timers';
32

43
import type { ObjectId } from '../bson';
@@ -26,7 +25,7 @@ import {
2625
import { Logger } from '../logger';
2726
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
2827
import type { Server } from '../sdam/server';
29-
import { Callback, eachAsync, makeCounter } from '../utils';
28+
import { Callback, eachAsync, List, makeCounter } from '../utils';
3029
import { connect } from './connect';
3130
import { Connection, ConnectionEvents, ConnectionOptions } from './connection';
3231
import {
@@ -137,7 +136,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
137136
[kPoolState]: typeof PoolState[keyof typeof PoolState];
138137
[kServer]: Server;
139138
[kLogger]: Logger;
140-
[kConnections]: Denque<Connection>;
139+
[kConnections]: List<Connection>;
141140
[kPending]: number;
142141
[kCheckedOut]: Set<Connection>;
143142
[kMinPoolSizeTimer]?: NodeJS.Timeout;
@@ -151,7 +150,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
151150
[kServiceGenerations]: Map<string, number>;
152151
[kConnectionCounter]: Generator<number>;
153152
[kCancellationToken]: CancellationToken;
154-
[kWaitQueue]: Denque<WaitQueueMember>;
153+
[kWaitQueue]: List<WaitQueueMember>;
155154
[kMetrics]: ConnectionPoolMetrics;
156155
[kProcessingWaitQueue]: boolean;
157156

@@ -236,7 +235,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
236235
this[kPoolState] = PoolState.paused;
237236
this[kServer] = server;
238237
this[kLogger] = new Logger('ConnectionPool');
239-
this[kConnections] = new Denque();
238+
this[kConnections] = new List();
240239
this[kPending] = 0;
241240
this[kCheckedOut] = new Set();
242241
this[kMinPoolSizeTimer] = undefined;
@@ -245,7 +244,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
245244
this[kConnectionCounter] = makeCounter(1);
246245
this[kCancellationToken] = new CancellationToken();
247246
this[kCancellationToken].setMaxListeners(Infinity);
248-
this[kWaitQueue] = new Denque();
247+
this[kWaitQueue] = new List();
249248
this[kMetrics] = new ConnectionPoolMetrics();
250249
this[kProcessingWaitQueue] = false;
251250

@@ -659,12 +658,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
659658
return;
660659
}
661660

662-
for (let i = 0; i < this[kConnections].length; i++) {
663-
const connection = this[kConnections].peekAt(i);
664-
if (connection && this.connectionIsPerished(connection)) {
665-
this[kConnections].removeOne(i);
666-
}
667-
}
661+
this[kConnections].prune(connection => this.connectionIsPerished(connection));
668662

669663
if (
670664
this.totalConnectionCount < minPoolSize &&
@@ -705,7 +699,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
705699
this[kProcessingWaitQueue] = true;
706700

707701
while (this.waitQueueSize) {
708-
const waitQueueMember = this[kWaitQueue].peekFront();
702+
const waitQueueMember = this[kWaitQueue].first();
709703
if (!waitQueueMember) {
710704
this[kWaitQueue].shift();
711705
continue;

src/cmap/message_stream.ts

Lines changed: 29 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -132,31 +132,28 @@ function canCompress(command: WriteProtocolMessageType) {
132132
return !uncompressibleCommands.has(commandName);
133133
}
134134

135-
function processIncomingData(stream: MessageStream, callback: Callback<Buffer>) {
135+
function processIncomingData(stream: MessageStream, callback: Callback<Buffer>): void {
136136
const buffer = stream[kBuffer];
137-
if (buffer.length < 4) {
138-
callback();
139-
return;
137+
const sizeOfMessage = buffer.getInt32();
138+
139+
if (sizeOfMessage == null) {
140+
return callback();
140141
}
141142

142-
const sizeOfMessage = buffer.peek(4).readInt32LE();
143143
if (sizeOfMessage < 0) {
144-
callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
145-
return;
144+
return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
146145
}
147146

148147
if (sizeOfMessage > stream.maxBsonMessageSize) {
149-
callback(
148+
return callback(
150149
new MongoParseError(
151150
`Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}`
152151
)
153152
);
154-
return;
155153
}
156154

157155
if (sizeOfMessage > buffer.length) {
158-
callback();
159-
return;
156+
return callback();
160157
}
161158

162159
const message = buffer.read(sizeOfMessage);
@@ -170,36 +167,31 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
170167
const monitorHasAnotherHello = () => {
171168
if (stream.isMonitoringConnection) {
172169
// Can we read the next message size?
173-
if (buffer.length >= 4) {
174-
const sizeOfMessage = buffer.peek(4).readInt32LE();
175-
if (sizeOfMessage <= buffer.length) {
176-
return true;
177-
}
170+
const sizeOfMessage = buffer.getInt32();
171+
if (sizeOfMessage != null && sizeOfMessage <= buffer.length) {
172+
return true;
178173
}
179174
}
180175
return false;
181176
};
182177

183178
let ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response;
184179
if (messageHeader.opCode !== OP_COMPRESSED) {
185-
const messageBody = message.slice(MESSAGE_HEADER_SIZE);
180+
const messageBody = message.subarray(MESSAGE_HEADER_SIZE);
186181

187182
// If we are a monitoring connection message stream and
188183
// there is more in the buffer that can be read, skip processing since we
189184
// want the last hello command response that is in the buffer.
190185
if (monitorHasAnotherHello()) {
191-
processIncomingData(stream, callback);
192-
} else {
193-
stream.emit('message', new ResponseType(message, messageHeader, messageBody));
194-
195-
if (buffer.length >= 4) {
196-
processIncomingData(stream, callback);
197-
} else {
198-
callback();
199-
}
186+
return processIncomingData(stream, callback);
200187
}
201188

202-
return;
189+
stream.emit('message', new ResponseType(message, messageHeader, messageBody));
190+
191+
if (buffer.length >= 4) {
192+
return processIncomingData(stream, callback);
193+
}
194+
return callback();
203195
}
204196

205197
messageHeader.fromCompressed = true;
@@ -210,33 +202,28 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
210202

211203
// recalculate based on wrapped opcode
212204
ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response;
213-
decompress(compressorID, compressedBuffer, (err, messageBody) => {
205+
return decompress(compressorID, compressedBuffer, (err, messageBody) => {
214206
if (err || !messageBody) {
215-
callback(err);
216-
return;
207+
return callback(err);
217208
}
218209

219210
if (messageBody.length !== messageHeader.length) {
220-
callback(
211+
return callback(
221212
new MongoDecompressionError('Message body and message header must be the same length')
222213
);
223-
224-
return;
225214
}
226215

227216
// If we are a monitoring connection message stream and
228217
// there is more in the buffer that can be read, skip processing since we
229218
// want the last hello command response that is in the buffer.
230219
if (monitorHasAnotherHello()) {
231-
processIncomingData(stream, callback);
232-
} else {
233-
stream.emit('message', new ResponseType(message, messageHeader, messageBody));
234-
235-
if (buffer.length >= 4) {
236-
processIncomingData(stream, callback);
237-
} else {
238-
callback();
239-
}
220+
return processIncomingData(stream, callback);
221+
}
222+
stream.emit('message', new ResponseType(message, messageHeader, messageBody));
223+
224+
if (buffer.length >= 4) {
225+
return processIncomingData(stream, callback);
240226
}
227+
return callback();
241228
});
242229
}

src/cursor/abstract_cursor.ts

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { ReadConcern, ReadConcernLike } from '../read_concern';
2121
import { ReadPreference, ReadPreferenceLike } from '../read_preference';
2222
import type { Server } from '../sdam/server';
2323
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
24-
import { Callback, maybeCallback, MongoDBNamespace, ns } from '../utils';
24+
import { Callback, List, maybeCallback, MongoDBNamespace, ns } from '../utils';
2525

2626
/** @internal */
2727
const kId = Symbol('id');
@@ -143,15 +143,15 @@ export abstract class AbstractCursor<
143143
CursorEvents extends AbstractCursorEvents = AbstractCursorEvents
144144
> extends TypedEventEmitter<CursorEvents> {
145145
/** @internal */
146-
[kId]?: Long;
146+
[kId]: Long | null;
147147
/** @internal */
148148
[kSession]: ClientSession;
149149
/** @internal */
150150
[kServer]?: Server;
151151
/** @internal */
152152
[kNamespace]: MongoDBNamespace;
153153
/** @internal */
154-
[kDocuments]: TSchema[];
154+
[kDocuments]: List<TSchema>;
155155
/** @internal */
156156
[kClient]: MongoClient;
157157
/** @internal */
@@ -181,7 +181,8 @@ export abstract class AbstractCursor<
181181
}
182182
this[kClient] = client;
183183
this[kNamespace] = namespace;
184-
this[kDocuments] = [];
184+
this[kId] = null;
185+
this[kDocuments] = new List();
185186
this[kInitialized] = false;
186187
this[kClosed] = false;
187188
this[kKilled] = false;
@@ -224,7 +225,7 @@ export abstract class AbstractCursor<
224225
}
225226

226227
get id(): Long | undefined {
227-
return this[kId];
228+
return this[kId] ?? undefined;
228229
}
229230

230231
/** @internal */
@@ -282,7 +283,17 @@ export abstract class AbstractCursor<
282283

283284
/** Returns current buffered documents */
284285
readBufferedDocuments(number?: number): TSchema[] {
285-
return this[kDocuments].splice(0, number ?? this[kDocuments].length);
286+
const bufferedDocs: TSchema[] = [];
287+
const documentsToRead = Math.min(number ?? this[kDocuments].length, this[kDocuments].length);
288+
289+
for (let count = 0; count < documentsToRead; count++) {
290+
const document = this[kDocuments].shift();
291+
if (document != null) {
292+
bufferedDocs.push(document);
293+
}
294+
}
295+
296+
return bufferedDocs;
286297
}
287298

288299
[Symbol.asyncIterator](): AsyncIterator<TSchema, void> {
@@ -350,7 +361,7 @@ export abstract class AbstractCursor<
350361
return false;
351362
}
352363

353-
if (this[kDocuments].length) {
364+
if (this[kDocuments].length !== 0) {
354365
return true;
355366
}
356367

@@ -597,8 +608,8 @@ export abstract class AbstractCursor<
597608
return;
598609
}
599610

600-
this[kId] = undefined;
601-
this[kDocuments] = [];
611+
this[kId] = null;
612+
this[kDocuments].clear();
602613
this[kClosed] = false;
603614
this[kKilled] = false;
604615
this[kInitialized] = false;
@@ -662,7 +673,7 @@ export abstract class AbstractCursor<
662673
this[kNamespace] = ns(response.cursor.ns);
663674
}
664675

665-
this[kDocuments] = response.cursor.firstBatch;
676+
this[kDocuments].pushMany(response.cursor.firstBatch);
666677
}
667678

668679
// When server responses return without a cursor document, we close this cursor
@@ -671,7 +682,7 @@ export abstract class AbstractCursor<
671682
if (this[kId] == null) {
672683
this[kId] = Long.ZERO;
673684
// TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
674-
this[kDocuments] = [state.response as TODO_NODE_3286];
685+
this[kDocuments].push(state.response as TODO_NODE_3286);
675686
}
676687
}
677688

@@ -687,22 +698,14 @@ export abstract class AbstractCursor<
687698
}
688699
}
689700

690-
function nextDocument<T>(cursor: AbstractCursor): T | null {
691-
if (cursor[kDocuments] == null || !cursor[kDocuments].length) {
692-
return null;
693-
}
694-
701+
function nextDocument<T>(cursor: AbstractCursor<T>): T | null {
695702
const doc = cursor[kDocuments].shift();
696-
if (doc) {
697-
const transform = cursor[kTransform];
698-
if (transform) {
699-
return transform(doc) as T;
700-
}
701703

702-
return doc;
704+
if (doc && cursor[kTransform]) {
705+
return cursor[kTransform](doc) as T;
703706
}
704707

705-
return null;
708+
return doc;
706709
}
707710

708711
const nextAsync = promisify(
@@ -733,7 +736,7 @@ export function next<T>(
733736
return callback(undefined, null);
734737
}
735738

736-
if (cursor[kDocuments] && cursor[kDocuments].length) {
739+
if (cursor[kDocuments].length !== 0) {
737740
callback(undefined, nextDocument<T>(cursor));
738741
return;
739742
}
@@ -757,19 +760,19 @@ export function next<T>(
757760

758761
// otherwise need to call getMore
759762
const batchSize = cursor[kOptions].batchSize || 1000;
760-
cursor._getMore(batchSize, (err, response) => {
763+
cursor._getMore(batchSize, (error, response) => {
761764
if (response) {
762765
const cursorId =
763766
typeof response.cursor.id === 'number'
764767
? Long.fromNumber(response.cursor.id)
765768
: response.cursor.id;
766769

767-
cursor[kDocuments] = response.cursor.nextBatch;
770+
cursor[kDocuments].pushMany(response.cursor.nextBatch);
768771
cursor[kId] = cursorId;
769772
}
770773

771-
if (err || cursorIsDead(cursor)) {
772-
return cleanupCursor(cursor, { error: err }, () => callback(err, nextDocument<T>(cursor)));
774+
if (error || cursorIsDead(cursor)) {
775+
return cleanupCursor(cursor, { error }, () => callback(error, nextDocument<T>(cursor)));
773776
}
774777

775778
if (cursor[kDocuments].length === 0 && blocking === false) {

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,7 @@ export type {
477477
ClientMetadataOptions,
478478
EventEmitterWithState,
479479
HostAddress,
480+
List,
480481
MongoDBNamespace
481482
} from './utils';
482483
export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern';

0 commit comments

Comments
 (0)