1
- import { type Document } from '../../bson' ;
1
+ import { BSON , type Document } from '../../bson' ;
2
2
import { DocumentSequence } from '../../cmap/commands' ;
3
3
import { type PkFactory } from '../../mongo_client' ;
4
4
import type { Filter , OptionalId , UpdateFilter , WithoutId } from '../../mongo_types' ;
@@ -28,6 +28,11 @@ export interface ClientBulkWriteCommand {
28
28
comment ?: any ;
29
29
}
30
30
31
+ /**
32
+ * The bytes overhead for the extra fields added post command generation.
33
+ */
34
+ const MESSAGE_OVERHEAD_BYTES = 1000 ;
35
+
31
36
/** @internal */
32
37
export class ClientBulkWriteCommandBuilder {
33
38
models : AnyClientBulkWriteModel [ ] ;
@@ -62,32 +67,123 @@ export class ClientBulkWriteCommandBuilder {
62
67
/**
63
68
* Build the bulk write commands from the models.
64
69
*/
65
- buildCommands ( ) : ClientBulkWriteCommand [ ] {
70
+ buildCommands ( maxMessageSizeBytes : number , maxWriteBatchSize : number ) : ClientBulkWriteCommand [ ] {
66
71
// Iterate the models to build the ops and nsInfo fields.
67
- const operations = [ ] ;
72
+ // We need to do this in a loop which creates one command each up
73
+ // to the max bson size or max message size.
74
+ const commands : ClientBulkWriteCommand [ ] = [ ] ;
75
+ let currentCommandLength = MESSAGE_OVERHEAD_BYTES ;
68
76
let currentNamespaceIndex = 0 ;
77
+ let currentCommand : ClientBulkWriteCommand = this . baseCommand ( ) ;
69
78
const namespaces = new Map < string , number > ( ) ;
70
79
for ( const model of this . models ) {
71
80
const ns = model . namespace ;
72
81
const index = namespaces . get ( ns ) ;
73
82
if ( index != null ) {
74
- operations . push ( buildOperation ( model , index , this . pkFactory ) ) ;
83
+ // Pushing to the ops document sequence returns the bytes length added.
84
+ const operation = buildOperation ( model , index , this . pkFactory ) ;
85
+ const operationBuffer = BSON . serialize ( operation ) ;
86
+
87
+ // Check if the operation buffer can fit in the current command. If it can,
88
+ // then add the operation to the document sequence and increment the
89
+ // current length as long as the ops don't exceed the maxWriteBatchSize.
90
+ if (
91
+ currentCommandLength + operationBuffer . length < maxMessageSizeBytes &&
92
+ currentCommand . ops . documents . length < maxWriteBatchSize
93
+ ) {
94
+ // Pushing to the ops document sequence returns the bytes length added.
95
+ currentCommandLength += this . addOperation ( currentCommand , operation , operationBuffer ) ;
96
+ } else {
97
+ // We need to batch. Push the current command to the commands
98
+ // array and create a new current command.
99
+ commands . push ( currentCommand ) ;
100
+ currentCommand = this . baseCommand ( ) ;
101
+ currentCommandLength += this . addOperation ( currentCommand , operation , operationBuffer ) ;
102
+ }
75
103
} else {
76
104
namespaces . set ( ns , currentNamespaceIndex ) ;
77
- operations . push ( buildOperation ( model , currentNamespaceIndex , this . pkFactory ) ) ;
78
- currentNamespaceIndex ++ ;
105
+ const nsInfo = { ns : ns } ;
106
+ const nsInfoBuffer = BSON . serialize ( nsInfo ) ;
107
+ const operation = buildOperation ( model , currentNamespaceIndex , this . pkFactory ) ;
108
+ const operationBuffer = BSON . serialize ( operation ) ;
109
+
110
+ // Check if the operation and nsInfo buffers can fit in the command. If they
111
+ // can, then add the operation and nsInfo to their respective document
112
+ // sequences and increment the current length as long as the ops don't exceed
113
+ // the maxWriteBatchSize.
114
+ if (
115
+ currentCommandLength + nsInfoBuffer . length + operationBuffer . length <
116
+ maxMessageSizeBytes &&
117
+ currentCommand . ops . documents . length < maxWriteBatchSize
118
+ ) {
119
+ currentCommandLength += this . addOperationAndNsInfo (
120
+ currentCommand ,
121
+ operation ,
122
+ operationBuffer ,
123
+ nsInfo ,
124
+ nsInfoBuffer
125
+ ) ;
126
+
127
+ // We've added a new namespace, increment the namespace index.
128
+ currentNamespaceIndex ++ ;
129
+ } else {
130
+ // We need to batch. Push the current command to the commands
131
+ // array and create a new current command.
132
+ commands . push ( currentCommand ) ;
133
+ currentCommand = this . baseCommand ( ) ;
134
+
135
+ currentCommandLength += this . addOperationAndNsInfo (
136
+ currentCommand ,
137
+ operation ,
138
+ operationBuffer ,
139
+ nsInfo ,
140
+ nsInfoBuffer
141
+ ) ;
142
+
143
+ // We've added a new namespace, increment the namespace index.
144
+ currentNamespaceIndex ++ ;
145
+ }
79
146
}
80
147
}
81
148
82
- const nsInfo = Array . from ( namespaces . keys ( ) , ns => ( { ns } ) ) ;
149
+ // After we've finisihed iterating all the models put the last current command
150
+ // only if there are operations in it.
151
+ if ( currentCommand . ops . documents . length > 0 ) {
152
+ commands . push ( currentCommand ) ;
153
+ }
154
+
155
+ return commands ;
156
+ }
157
+
158
+ private addOperation (
159
+ command : ClientBulkWriteCommand ,
160
+ operation : Document ,
161
+ operationBuffer : Uint8Array
162
+ ) : number {
163
+ // Pushing to the ops document sequence returns the bytes length added.
164
+ return command . ops . push ( operation , operationBuffer ) ;
165
+ }
166
+
167
+ private addOperationAndNsInfo (
168
+ command : ClientBulkWriteCommand ,
169
+ operation : Document ,
170
+ operationBuffer : Uint8Array ,
171
+ nsInfo : Document ,
172
+ nsInfoBuffer : Uint8Array
173
+ ) : number {
174
+ // Pushing to the nsInfo document sequence returns the bytes length added.
175
+ const nsInfoLength = command . nsInfo . push ( nsInfo , nsInfoBuffer ) ;
176
+ const opsLength = this . addOperation ( command , operation , operationBuffer ) ;
177
+ return nsInfoLength + opsLength ;
178
+ }
83
179
84
- // The base command.
180
+ private baseCommand ( ) : ClientBulkWriteCommand {
85
181
const command : ClientBulkWriteCommand = {
86
182
bulkWrite : 1 ,
87
183
errorsOnly : this . errorsOnly ,
88
184
ordered : this . options . ordered ?? true ,
89
- ops : new DocumentSequence ( operations ) ,
90
- nsInfo : new DocumentSequence ( nsInfo )
185
+ ops : new DocumentSequence ( 'ops' ) ,
186
+ nsInfo : new DocumentSequence ( ' nsInfo' )
91
187
} ;
92
188
// Add bypassDocumentValidation if it was present in the options.
93
189
if ( this . options . bypassDocumentValidation != null ) {
@@ -103,7 +199,8 @@ export class ClientBulkWriteCommandBuilder {
103
199
if ( this . options . comment !== undefined ) {
104
200
command . comment = this . options . comment ;
105
201
}
106
- return [ command ] ;
202
+
203
+ return command ;
107
204
}
108
205
}
109
206
0 commit comments