Skip to content

refactor(NODE-6325): implement document sequence support #4201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 65 additions & 4 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const QUERY_FAILURE = 2;
const SHARD_CONFIG_STALE = 4;
const AWAIT_CAPABLE = 8;

const encodeUTF8Into = BSON.BSON.onDemand.ByteUtils.encodeUTF8Into;

/** @internal */
export type WriteProtocolMessageType = OpQueryRequest | OpMsgRequest;

Expand Down Expand Up @@ -411,6 +413,15 @@ export interface OpMsgOptions {
readPreference: ReadPreference;
}

/** @internal */
export class DocumentSequence {
documents: Document[];

constructor(documents: Document[]) {
this.documents = documents;
}
}

/** @internal */
export class OpMsgRequest {
requestId: number;
Expand Down Expand Up @@ -480,7 +491,7 @@ export class OpMsgRequest {

let totalLength = header.length;
const command = this.command;
totalLength += this.makeDocumentSegment(buffers, command);
totalLength += this.makeSections(buffers, command);

header.writeInt32LE(totalLength, 0); // messageLength
header.writeInt32LE(this.requestId, 4); // requestID
Expand All @@ -490,15 +501,65 @@ export class OpMsgRequest {
return buffers;
}

makeDocumentSegment(buffers: Uint8Array[], document: Document): number {
const payloadTypeBuffer = Buffer.alloc(1);
/**
* Add the sections to the OP_MSG request's buffers and returns the length.
*/
makeSections(buffers: Uint8Array[], document: Document): number {
const sequencesBuffer = this.extractDocumentSequences(document);
const payloadTypeBuffer = Buffer.allocUnsafe(1);
payloadTypeBuffer[0] = 0;

const documentBuffer = this.serializeBson(document);
// First section, type 0
buffers.push(payloadTypeBuffer);
buffers.push(documentBuffer);
// Subsequent sections, type 1
buffers.push(sequencesBuffer);

return payloadTypeBuffer.length + documentBuffer.length;
return payloadTypeBuffer.length + documentBuffer.length + sequencesBuffer.length;
}

/**
* Extracts the document sequences from the command document and returns
* a buffer to be added as multiple sections after the initial type 0
* section in the message.
*/
extractDocumentSequences(document: Document): Uint8Array {
// Pull out any field in the command document that's value is a document sequence.
const chunks = [];
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + key.length);
buffer[0] = 1;
// Third part is the field name at offset 5.
encodeUTF8Into(buffer, key, 5);
chunks.push(buffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
for (const doc of value.documents) {
const docBson = this.serializeBson(doc);
docsLength += docBson.length;
chunks.push(docBson);
}
// Second part of the sequence is the length at offset 1;
buffer.writeInt32LE(key.length + docsLength, 1);
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
// our BSON serializer can do about this. Since DocumentSequence is not exposed
// in the public API and only used internally, we are never mutating an original
// command provided by the user, just our own, and it's cheaper to delete from
// our own command than copying it.
delete document[key];
}
}
if (chunks.length > 0) {
return Buffer.concat(chunks);
}
// If we have no document sequences we return an empty buffer for nothing to add
// to the payload.
return Buffer.alloc(0);
}

serializeBson(document: Document): Uint8Array {
Expand Down
111 changes: 0 additions & 111 deletions test/unit/cmap/commands.test.js

This file was deleted.

Loading