Skip to content

Commit a65a539

Browse files
committed
feat(NODE-6325): implement document sequence support
1 parent b70c885 commit a65a539

File tree

2 files changed

+152
-5
lines changed

2 files changed

+152
-5
lines changed

src/cmap/commands.ts

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,17 @@ export interface OpMsgOptions {
411411
readPreference: ReadPreference;
412412
}
413413

414+
/** @internal */
415+
export class DocumentSequence {
416+
field: string;
417+
documents: Document[];
418+
419+
constructor(field: string, documents: Document[]) {
420+
this.field = field;
421+
this.documents = documents;
422+
}
423+
}
424+
414425
/** @internal */
415426
export class OpMsgRequest {
416427
requestId: number;
@@ -480,7 +491,7 @@ export class OpMsgRequest {
480491

481492
let totalLength = header.length;
482493
const command = this.command;
483-
totalLength += this.makeDocumentSegment(buffers, command);
494+
totalLength += this.makeSections(buffers, command);
484495

485496
header.writeInt32LE(totalLength, 0); // messageLength
486497
header.writeInt32LE(this.requestId, 4); // requestID
@@ -490,15 +501,66 @@ export class OpMsgRequest {
490501
return buffers;
491502
}
492503

493-
makeDocumentSegment(buffers: Uint8Array[], document: Document): number {
504+
/**
505+
* Add the sections to the OP_MSG request's buffers and returns the length.
506+
*/
507+
makeSections(buffers: Uint8Array[], document: Document): number {
508+
const sequencesBuffer = this.extractDocumentSequences(document);
494509
const payloadTypeBuffer = Buffer.alloc(1);
495510
payloadTypeBuffer[0] = 0;
496511

497512
const documentBuffer = this.serializeBson(document);
513+
// First section, type 0
498514
buffers.push(payloadTypeBuffer);
499515
buffers.push(documentBuffer);
516+
// Subsequent sections, type 1
517+
buffers.push(sequencesBuffer);
500518

501-
return payloadTypeBuffer.length + documentBuffer.length;
519+
return payloadTypeBuffer.length + documentBuffer.length + sequencesBuffer.length;
520+
}
521+
522+
/**
523+
* Extracts the document sequences from the command document and returns
524+
* a buffer to be added as multiple sections after the initial type 0
525+
* section in the message.
526+
*/
527+
extractDocumentSequences(document: Document): Uint8Array {
528+
// Pull out any field in the command document that's value is a document sequence.
529+
const chunks = [];
530+
for (const [key, value] of Object.entries(document)) {
531+
if (value instanceof DocumentSequence) {
532+
// Document sequences starts with type 1 at the first byte.
533+
const payloadTypeBuffer = Buffer.alloc(1);
534+
payloadTypeBuffer[0] = 1;
535+
chunks.push(payloadTypeBuffer);
536+
// Second part of the sequence is the length;
537+
const lengthBuffer = Buffer.alloc(4);
538+
chunks.push(lengthBuffer);
539+
// Third part is the field name.
540+
const fieldBuffer = Buffer.from(key);
541+
chunks.push(fieldBuffer);
542+
// Fourth part are the documents' bytes.
543+
let docsLength = 0;
544+
for (const doc of value.documents) {
545+
const docBson = this.serializeBson(doc);
546+
docsLength += docBson.length;
547+
chunks.push(docBson);
548+
}
549+
lengthBuffer.writeInt32LE(fieldBuffer.length + docsLength);
550+
// Why are we removing the field from the command? This is because it needs to be
551+
// removed in the OP_MSG request first section, and DocumentSequence is not a
552+
// BSON type and is specific to the MongoDB wire protocol so there's nothing
553+
// our BSON serializer can do about this. Since DocumentSequence is not exposed
554+
// in the public API and only used internally, we are never mutating an original
555+
// command provided by the user, just our own, and it's cheaper to delete from
556+
// our own command than copying it.
557+
delete document[key];
558+
}
559+
}
560+
if (chunks.length > 0) {
561+
return Buffer.concat(chunks);
562+
}
563+
return Buffer.alloc(0);
502564
}
503565

504566
serializeBson(document: Document): Uint8Array {

test/unit/cmap/commands.test.js renamed to test/unit/cmap/commands.test.ts

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,92 @@
1-
const { expect } = require('chai');
2-
const { OpReply } = require('../../mongodb');
1+
import { expect } from 'chai';
2+
3+
import { DocumentSequence, OpMsgRequest, OpReply } from '../../mongodb';
34

45
describe('commands', function () {
6+
describe('OpMsgRequest', function () {
7+
describe('#toBin', function () {
8+
/**
9+
* Note that #toBin returns an array of buffers, in this case we are interested in
10+
* the buffer at index 3 of the array, which is a single buffer of all the
11+
* document sequence sections.
12+
*/
13+
context('when the command has document sequences', function () {
14+
context('when there is one document sequence', function () {
15+
const command = {
16+
test: 1,
17+
field: new DocumentSequence('test', [{ test: 1 }])
18+
};
19+
const msg = new OpMsgRequest('admin', command, {});
20+
const buffers = msg.toBin();
21+
22+
it('removes the document sequence fields from the command', function () {
23+
expect(command).to.not.haveOwnProperty('field');
24+
});
25+
26+
it('sets the document sequence section type to 1', function () {
27+
// First byte is a one byte type.
28+
expect(buffers[3][0]).to.equal(1);
29+
});
30+
31+
it('sets the length of the document sequence', function () {
32+
// Bytes starting at index 1 is a 4 byte length.
33+
expect(buffers[3].readInt32LE(1)).to.equal(20);
34+
});
35+
36+
it('sets the name of the first field to be replaced', function () {
37+
// Bytes starting at index 5 is the field name.
38+
expect(buffers[3].toString('utf8', 5, 10)).to.equal('field');
39+
});
40+
});
41+
42+
context('when there are multiple document sequences', function () {
43+
const command = {
44+
test: 1,
45+
fieldOne: new DocumentSequence('test', [{ test: 1 }]),
46+
fieldTwo: new DocumentSequence('test', [{ test: 1 }])
47+
};
48+
const msg = new OpMsgRequest('admin', command, {});
49+
const buffers = msg.toBin();
50+
51+
it('removes the document sequence fields from the command', function () {
52+
expect(command).to.not.haveOwnProperty('fieldOne');
53+
expect(command).to.not.haveOwnProperty('fieldTwo');
54+
});
55+
56+
it('sets the document sequence sections first type to 1', function () {
57+
// First byte is a one byte type.
58+
expect(buffers[3][0]).to.equal(1);
59+
});
60+
61+
it('sets the length of the first document sequence', function () {
62+
// Bytes starting at index 1 is a 4 byte length.
63+
expect(buffers[3].readInt32LE(1)).to.equal(23);
64+
});
65+
66+
it('sets the name of the first field to be replaced', function () {
67+
// Bytes starting at index 5 is the field name.
68+
expect(buffers[3].toString('utf8', 5, 13)).to.equal('fieldOne');
69+
});
70+
71+
it('sets the document sequence sections second type to 1', function () {
72+
// First byte is a one byte type.
73+
expect(buffers[3][28]).to.equal(1);
74+
});
75+
76+
it('sets the length of the first document sequence', function () {
77+
// Bytes starting at index 1 is a 4 byte length.
78+
expect(buffers[3].readInt32LE(29)).to.equal(23);
79+
});
80+
81+
it('sets the name of the second field to be replaced', function () {
82+
// Bytes starting at index 33 is the field name.
83+
expect(buffers[3].toString('utf8', 33, 41)).to.equal('fieldTwo');
84+
});
85+
});
86+
});
87+
});
88+
});
89+
590
describe('Response', function () {
691
describe('#parse', function () {
792
context('when the message body is invalid', function () {

0 commit comments

Comments
 (0)