Skip to content

Commit b59f17c

Browse files
authored
refactor(NODE-4392): getMore and killCursors methods (#3317)
1 parent be34a94 commit b59f17c

File tree

16 files changed

+395
-729
lines changed

16 files changed

+395
-729
lines changed

src/cmap/command_monitoring_events.ts

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { Document, ObjectId } from '../bson';
22
import { LEGACY_HELLO_COMMAND, LEGACY_HELLO_COMMAND_CAMEL_CASE } from '../constants';
33
import { calculateDurationInMs, deepCopy } from '../utils';
4-
import { GetMore, KillCursor, Msg, WriteProtocolMessageType } from './commands';
4+
import { Msg, WriteProtocolMessageType } from './commands';
55
import type { Connection } from './connection';
66

77
/**
@@ -206,21 +206,6 @@ const OP_QUERY_KEYS = [
206206

207207
/** Extract the actual command from the query, possibly up-converting if it's a legacy format */
208208
function extractCommand(command: WriteProtocolMessageType): Document {
209-
if (command instanceof GetMore) {
210-
return {
211-
getMore: deepCopy(command.cursorId),
212-
collection: collectionName(command),
213-
batchSize: command.numberToReturn
214-
};
215-
}
216-
217-
if (command instanceof KillCursor) {
218-
return {
219-
killCursors: collectionName(command),
220-
cursors: deepCopy(command.cursorIds)
221-
};
222-
}
223-
224209
if (command instanceof Msg) {
225210
return deepCopy(command.command);
226211
}
@@ -280,28 +265,10 @@ function extractCommand(command: WriteProtocolMessageType): Document {
280265
}
281266

282267
function extractReply(command: WriteProtocolMessageType, reply?: Document) {
283-
if (command instanceof KillCursor) {
284-
return {
285-
ok: 1,
286-
cursorsUnknown: command.cursorIds
287-
};
288-
}
289-
290268
if (!reply) {
291269
return reply;
292270
}
293271

294-
if (command instanceof GetMore) {
295-
return {
296-
ok: 1,
297-
cursor: {
298-
id: deepCopy(reply.cursorId),
299-
ns: namespace(command),
300-
nextBatch: deepCopy(reply.documents)
301-
}
302-
};
303-
}
304-
305272
if (command instanceof Msg) {
306273
return deepCopy(reply.result ? reply.result : reply);
307274
}

src/cmap/commands.ts

Lines changed: 2 additions & 190 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { ReadPreference } from '../read_preference';
55
import type { ClientSession } from '../sessions';
66
import { databaseNamespace } from '../utils';
77
import type { CommandOptions } from './connection';
8-
import { OP_GETMORE, OP_KILL_CURSORS, OP_MSG, OP_QUERY } from './wire_protocol/constants';
8+
import { OP_MSG, OP_QUERY } from './wire_protocol/constants';
99

1010
// Incrementing request id
1111
let _requestId = 0;
@@ -26,7 +26,7 @@ const SHARD_CONFIG_STALE = 4;
2626
const AWAIT_CAPABLE = 8;
2727

2828
/** @internal */
29-
export type WriteProtocolMessageType = Query | Msg | GetMore | KillCursor;
29+
export type WriteProtocolMessageType = Query | Msg;
3030

3131
/** @internal */
3232
export interface OpQueryOptions extends CommandOptions {
@@ -270,194 +270,6 @@ export class Query {
270270
}
271271
}
272272

273-
/** @internal */
274-
export interface OpGetMoreOptions {
275-
numberToReturn?: number;
276-
}
277-
278-
/**************************************************************
279-
* GETMORE
280-
**************************************************************/
281-
/** @internal */
282-
export class GetMore {
283-
numberToReturn: number;
284-
requestId: number;
285-
ns: string;
286-
cursorId: Long;
287-
288-
constructor(ns: string, cursorId: Long, opts: OpGetMoreOptions = {}) {
289-
this.numberToReturn = opts.numberToReturn || 0;
290-
this.requestId = _requestId++;
291-
this.ns = ns;
292-
this.cursorId = cursorId;
293-
}
294-
295-
// Uses a single allocated buffer for the process, avoiding multiple memory allocations
296-
toBin(): Buffer[] {
297-
const length = 4 + Buffer.byteLength(this.ns) + 1 + 4 + 8 + 4 * 4;
298-
// Create command buffer
299-
let index = 0;
300-
// Allocate buffer
301-
const _buffer = Buffer.alloc(length);
302-
303-
// Write header information
304-
// index = write32bit(index, _buffer, length);
305-
_buffer[index + 3] = (length >> 24) & 0xff;
306-
_buffer[index + 2] = (length >> 16) & 0xff;
307-
_buffer[index + 1] = (length >> 8) & 0xff;
308-
_buffer[index] = length & 0xff;
309-
index = index + 4;
310-
311-
// index = write32bit(index, _buffer, requestId);
312-
_buffer[index + 3] = (this.requestId >> 24) & 0xff;
313-
_buffer[index + 2] = (this.requestId >> 16) & 0xff;
314-
_buffer[index + 1] = (this.requestId >> 8) & 0xff;
315-
_buffer[index] = this.requestId & 0xff;
316-
index = index + 4;
317-
318-
// index = write32bit(index, _buffer, 0);
319-
_buffer[index + 3] = (0 >> 24) & 0xff;
320-
_buffer[index + 2] = (0 >> 16) & 0xff;
321-
_buffer[index + 1] = (0 >> 8) & 0xff;
322-
_buffer[index] = 0 & 0xff;
323-
index = index + 4;
324-
325-
// index = write32bit(index, _buffer, OP_GETMORE);
326-
_buffer[index + 3] = (OP_GETMORE >> 24) & 0xff;
327-
_buffer[index + 2] = (OP_GETMORE >> 16) & 0xff;
328-
_buffer[index + 1] = (OP_GETMORE >> 8) & 0xff;
329-
_buffer[index] = OP_GETMORE & 0xff;
330-
index = index + 4;
331-
332-
// index = write32bit(index, _buffer, 0);
333-
_buffer[index + 3] = (0 >> 24) & 0xff;
334-
_buffer[index + 2] = (0 >> 16) & 0xff;
335-
_buffer[index + 1] = (0 >> 8) & 0xff;
336-
_buffer[index] = 0 & 0xff;
337-
index = index + 4;
338-
339-
// Write collection name
340-
index = index + _buffer.write(this.ns, index, 'utf8') + 1;
341-
_buffer[index - 1] = 0;
342-
343-
// Write batch size
344-
// index = write32bit(index, _buffer, numberToReturn);
345-
_buffer[index + 3] = (this.numberToReturn >> 24) & 0xff;
346-
_buffer[index + 2] = (this.numberToReturn >> 16) & 0xff;
347-
_buffer[index + 1] = (this.numberToReturn >> 8) & 0xff;
348-
_buffer[index] = this.numberToReturn & 0xff;
349-
index = index + 4;
350-
351-
// Write cursor id
352-
// index = write32bit(index, _buffer, cursorId.getLowBits());
353-
_buffer[index + 3] = (this.cursorId.getLowBits() >> 24) & 0xff;
354-
_buffer[index + 2] = (this.cursorId.getLowBits() >> 16) & 0xff;
355-
_buffer[index + 1] = (this.cursorId.getLowBits() >> 8) & 0xff;
356-
_buffer[index] = this.cursorId.getLowBits() & 0xff;
357-
index = index + 4;
358-
359-
// index = write32bit(index, _buffer, cursorId.getHighBits());
360-
_buffer[index + 3] = (this.cursorId.getHighBits() >> 24) & 0xff;
361-
_buffer[index + 2] = (this.cursorId.getHighBits() >> 16) & 0xff;
362-
_buffer[index + 1] = (this.cursorId.getHighBits() >> 8) & 0xff;
363-
_buffer[index] = this.cursorId.getHighBits() & 0xff;
364-
index = index + 4;
365-
366-
// Return buffer
367-
return [_buffer];
368-
}
369-
}
370-
371-
/**************************************************************
372-
* KILLCURSOR
373-
**************************************************************/
374-
/** @internal */
375-
export class KillCursor {
376-
ns: string;
377-
requestId: number;
378-
cursorIds: Long[];
379-
380-
constructor(ns: string, cursorIds: Long[]) {
381-
this.ns = ns;
382-
this.requestId = _requestId++;
383-
this.cursorIds = cursorIds;
384-
}
385-
386-
// Uses a single allocated buffer for the process, avoiding multiple memory allocations
387-
toBin(): Buffer[] {
388-
const length = 4 + 4 + 4 * 4 + this.cursorIds.length * 8;
389-
390-
// Create command buffer
391-
let index = 0;
392-
const _buffer = Buffer.alloc(length);
393-
394-
// Write header information
395-
// index = write32bit(index, _buffer, length);
396-
_buffer[index + 3] = (length >> 24) & 0xff;
397-
_buffer[index + 2] = (length >> 16) & 0xff;
398-
_buffer[index + 1] = (length >> 8) & 0xff;
399-
_buffer[index] = length & 0xff;
400-
index = index + 4;
401-
402-
// index = write32bit(index, _buffer, requestId);
403-
_buffer[index + 3] = (this.requestId >> 24) & 0xff;
404-
_buffer[index + 2] = (this.requestId >> 16) & 0xff;
405-
_buffer[index + 1] = (this.requestId >> 8) & 0xff;
406-
_buffer[index] = this.requestId & 0xff;
407-
index = index + 4;
408-
409-
// index = write32bit(index, _buffer, 0);
410-
_buffer[index + 3] = (0 >> 24) & 0xff;
411-
_buffer[index + 2] = (0 >> 16) & 0xff;
412-
_buffer[index + 1] = (0 >> 8) & 0xff;
413-
_buffer[index] = 0 & 0xff;
414-
index = index + 4;
415-
416-
// index = write32bit(index, _buffer, OP_KILL_CURSORS);
417-
_buffer[index + 3] = (OP_KILL_CURSORS >> 24) & 0xff;
418-
_buffer[index + 2] = (OP_KILL_CURSORS >> 16) & 0xff;
419-
_buffer[index + 1] = (OP_KILL_CURSORS >> 8) & 0xff;
420-
_buffer[index] = OP_KILL_CURSORS & 0xff;
421-
index = index + 4;
422-
423-
// index = write32bit(index, _buffer, 0);
424-
_buffer[index + 3] = (0 >> 24) & 0xff;
425-
_buffer[index + 2] = (0 >> 16) & 0xff;
426-
_buffer[index + 1] = (0 >> 8) & 0xff;
427-
_buffer[index] = 0 & 0xff;
428-
index = index + 4;
429-
430-
// Write batch size
431-
// index = write32bit(index, _buffer, this.cursorIds.length);
432-
_buffer[index + 3] = (this.cursorIds.length >> 24) & 0xff;
433-
_buffer[index + 2] = (this.cursorIds.length >> 16) & 0xff;
434-
_buffer[index + 1] = (this.cursorIds.length >> 8) & 0xff;
435-
_buffer[index] = this.cursorIds.length & 0xff;
436-
index = index + 4;
437-
438-
// Write all the cursor ids into the array
439-
for (let i = 0; i < this.cursorIds.length; i++) {
440-
// Write cursor id
441-
// index = write32bit(index, _buffer, cursorIds[i].getLowBits());
442-
_buffer[index + 3] = (this.cursorIds[i].getLowBits() >> 24) & 0xff;
443-
_buffer[index + 2] = (this.cursorIds[i].getLowBits() >> 16) & 0xff;
444-
_buffer[index + 1] = (this.cursorIds[i].getLowBits() >> 8) & 0xff;
445-
_buffer[index] = this.cursorIds[i].getLowBits() & 0xff;
446-
index = index + 4;
447-
448-
// index = write32bit(index, _buffer, cursorIds[i].getHighBits());
449-
_buffer[index + 3] = (this.cursorIds[i].getHighBits() >> 24) & 0xff;
450-
_buffer[index + 2] = (this.cursorIds[i].getHighBits() >> 16) & 0xff;
451-
_buffer[index + 1] = (this.cursorIds[i].getHighBits() >> 8) & 0xff;
452-
_buffer[index] = this.cursorIds[i].getHighBits() & 0xff;
453-
index = index + 4;
454-
}
455-
456-
// Return buffer
457-
return [_buffer];
458-
}
459-
}
460-
461273
/** @internal */
462274
export interface MessageHeader {
463275
length: number;

0 commit comments

Comments
 (0)