1
- import { type Document } from 'bson' ;
2
-
1
+ import { MongoClientBulkWriteExecutionError , ServerType } from '../../beta' ;
3
2
import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses' ;
4
3
import type { Server } from '../../sdam/server' ;
5
4
import type { ClientSession } from '../../sessions' ;
6
5
import { MongoDBNamespace } from '../../utils' ;
7
6
import { CommandOperation } from '../command' ;
8
7
import { Aspect , defineAspects } from '../operation' ;
8
+ import { type ClientBulkWriteCommandBuilder } from './command_builder' ;
9
9
import { type ClientBulkWriteOptions } from './common' ;
10
10
11
11
/**
12
12
* Executes a single client bulk write operation within a potential batch.
13
13
* @internal
14
14
*/
15
15
export class ClientBulkWriteOperation extends CommandOperation < ClientBulkWriteCursorResponse > {
16
- command : Document ;
16
+ commandBuilder : ClientBulkWriteCommandBuilder ;
17
17
override options : ClientBulkWriteOptions ;
18
18
19
19
override get commandName ( ) {
20
20
return 'bulkWrite' as const ;
21
21
}
22
22
23
- constructor ( command : Document , options : ClientBulkWriteOptions ) {
23
+ constructor ( commandBuilder : ClientBulkWriteCommandBuilder , options : ClientBulkWriteOptions ) {
24
24
super ( undefined , options ) ;
25
- this . command = command ;
25
+ this . commandBuilder = commandBuilder ;
26
26
this . options = options ;
27
27
this . ns = new MongoDBNamespace ( 'admin' , '$cmd' ) ;
28
28
}
@@ -37,9 +37,45 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
37
37
server : Server ,
38
38
session : ClientSession | undefined
39
39
) : Promise < ClientBulkWriteCursorResponse > {
40
- return await super . executeCommand ( server , session , this . command , ClientBulkWriteCursorResponse ) ;
40
+ let command ;
41
+
42
+ if ( server . description . type === ServerType . LoadBalancer ) {
43
+ if ( session ) {
44
+ // Checkout a connection to build the command.
45
+ const connection = await server . pool . checkOut ( ) ;
46
+ // Pin the connection to the session so it get used to execute the command and we do not
47
+ // perform a double check-in/check-out.
48
+ session . pin ( connection ) ;
49
+ command = this . commandBuilder . buildBatch (
50
+ connection . hello ?. maxMessageSizeBytes ,
51
+ connection . hello ?. maxWriteBatchSize
52
+ ) ;
53
+ } else {
54
+ throw new MongoClientBulkWriteExecutionError (
55
+ 'Session provided to the client bulk write operation must be present.'
56
+ ) ;
57
+ }
58
+ } else {
59
+ // At this point we have a server and the auto connect code has already
60
+ // run in executeOperation, so the server description will be populated.
61
+ // We can use that to build the command.
62
+ if ( ! server . description . maxWriteBatchSize || ! server . description . maxMessageSizeBytes ) {
63
+ throw new MongoClientBulkWriteExecutionError (
64
+ 'In order to execute a client bulk write, both maxWriteBatchSize and maxMessageSizeBytes must be provided by the servers hello response.'
65
+ ) ;
66
+ }
67
+ command = this . commandBuilder . buildBatch (
68
+ server . description . maxMessageSizeBytes ,
69
+ server . description . maxWriteBatchSize
70
+ ) ;
71
+ }
72
+ return await super . executeCommand ( server , session , command , ClientBulkWriteCursorResponse ) ;
41
73
}
42
74
}
43
75
44
76
// Skipping the collation as it goes on the individual ops.
45
- defineAspects ( ClientBulkWriteOperation , [ Aspect . WRITE_OPERATION , Aspect . SKIP_COLLATION ] ) ;
77
+ defineAspects ( ClientBulkWriteOperation , [
78
+ Aspect . WRITE_OPERATION ,
79
+ Aspect . SKIP_COLLATION ,
80
+ Aspect . CURSOR_CREATING
81
+ ] ) ;
0 commit comments