1
- import { MongoClient , Db , Collection , GridFSBucket , Document } from '../../../src/index' ;
1
+ import {
2
+ MongoClient ,
3
+ Db ,
4
+ Collection ,
5
+ GridFSBucket ,
6
+ Document ,
7
+ HostAddress
8
+ } from '../../../src/index' ;
2
9
import { ReadConcern } from '../../../src/read_concern' ;
3
10
import { WriteConcern } from '../../../src/write_concern' ;
4
11
import { ReadPreference } from '../../../src/read_preference' ;
@@ -18,10 +25,6 @@ interface UnifiedChangeStream extends ChangeStream {
18
25
eventCollector : InstanceType < typeof import ( '../../tools/utils' ) [ 'EventCollector' ] > ;
19
26
}
20
27
21
- interface UnifiedClientSession extends ClientSession {
22
- client : UnifiedMongoClient ;
23
- }
24
-
25
28
export type CommandEvent = CommandStartedEvent | CommandSucceededEvent | CommandFailedEvent ;
26
29
27
30
export class UnifiedMongoClient extends MongoClient {
@@ -75,23 +78,49 @@ export class UnifiedMongoClient extends MongoClient {
75
78
}
76
79
return this . events ;
77
80
}
81
+ }
78
82
79
- async enableFailPoint ( failPoint : Document ) : Promise < Document > {
80
- const admin = this . db ( ) . admin ( ) ;
83
+ export class FailPointMap extends Map < string , Document > {
84
+ async enableFailPoint (
85
+ addressOrClient : HostAddress | UnifiedMongoClient ,
86
+ failPoint : Document
87
+ ) : Promise < Document > {
88
+ let client : MongoClient ;
89
+ let address : string ;
90
+ if ( addressOrClient instanceof MongoClient ) {
91
+ client = addressOrClient ;
92
+ address = client . topology . s . seedlist . join ( ',' ) ;
93
+ } else {
94
+ // create a new client
95
+ address = addressOrClient . toString ( ) ;
96
+ client = new MongoClient ( `mongodb://${ address } ` ) ;
97
+ await client . connect ( ) ;
98
+ }
99
+
100
+ const admin = client . db ( 'admin' ) ;
81
101
const result = await admin . command ( failPoint ) ;
102
+
103
+ if ( ! ( addressOrClient instanceof MongoClient ) ) {
104
+ // we created this client
105
+ await client . close ( ) ;
106
+ }
107
+
82
108
expect ( result ) . to . have . property ( 'ok' , 1 ) ;
83
- this . failPoints . push ( failPoint . configureFailPoint ) ;
109
+ this . set ( address , failPoint . configureFailPoint ) ;
84
110
return result ;
85
111
}
86
112
87
- async disableFailPoints ( ) : Promise < Document [ ] > {
88
- return Promise . all (
89
- this . failPoints . map ( configureFailPoint =>
90
- this . db ( ) . admin ( ) . command ( {
91
- configureFailPoint,
92
- mode : 'off'
93
- } )
94
- )
113
+ async disableFailPoints ( ) : Promise < void > {
114
+ const entries = Array . from ( this . entries ( ) ) ;
115
+ await Promise . all (
116
+ entries . map ( async ( [ hostAddress , configureFailPoint ] ) => {
117
+ const client = new MongoClient ( `mongodb://${ hostAddress } ` ) ;
118
+ await client . connect ( ) ;
119
+ const admin = client . db ( 'admin' ) ;
120
+ const result = await admin . command ( { configureFailPoint, mode : 'off' } ) ;
121
+ expect ( result ) . to . have . property ( 'ok' , 1 ) ;
122
+ await client . close ( ) ;
123
+ } )
95
124
) ;
96
125
}
97
126
}
@@ -100,7 +129,7 @@ export type Entity =
100
129
| UnifiedMongoClient
101
130
| Db
102
131
| Collection
103
- | UnifiedClientSession
132
+ | ClientSession
104
133
| UnifiedChangeStream
105
134
| GridFSBucket
106
135
| Document ; // Results from operations
@@ -124,10 +153,17 @@ ENTITY_CTORS.set('bucket', GridFSBucket);
124
153
ENTITY_CTORS . set ( 'stream' , ChangeStream ) ;
125
154
126
155
export class EntitiesMap < E = Entity > extends Map < string , E > {
156
+ failPoints : FailPointMap ;
157
+
158
+ constructor ( entries ?: readonly ( readonly [ string , E ] ) [ ] | null ) {
159
+ super ( entries ) ;
160
+ this . failPoints = new FailPointMap ( ) ;
161
+ }
162
+
127
163
mapOf ( type : 'client' ) : EntitiesMap < UnifiedMongoClient > ;
128
164
mapOf ( type : 'db' ) : EntitiesMap < Db > ;
129
165
mapOf ( type : 'collection' ) : EntitiesMap < Collection > ;
130
- mapOf ( type : 'session' ) : EntitiesMap < UnifiedClientSession > ;
166
+ mapOf ( type : 'session' ) : EntitiesMap < ClientSession > ;
131
167
mapOf ( type : 'bucket' ) : EntitiesMap < GridFSBucket > ;
132
168
mapOf ( type : 'stream' ) : EntitiesMap < UnifiedChangeStream > ;
133
169
mapOf ( type : EntityTypeId ) : EntitiesMap < Entity > {
@@ -141,7 +177,7 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
141
177
getEntity ( type : 'client' , key : string , assertExists ?: boolean ) : UnifiedMongoClient ;
142
178
getEntity ( type : 'db' , key : string , assertExists ?: boolean ) : Db ;
143
179
getEntity ( type : 'collection' , key : string , assertExists ?: boolean ) : Collection ;
144
- getEntity ( type : 'session' , key : string , assertExists ?: boolean ) : UnifiedClientSession ;
180
+ getEntity ( type : 'session' , key : string , assertExists ?: boolean ) : ClientSession ;
145
181
getEntity ( type : 'bucket' , key : string , assertExists ?: boolean ) : GridFSBucket ;
146
182
getEntity ( type : 'stream' , key : string , assertExists ?: boolean ) : UnifiedChangeStream ;
147
183
getEntity ( type : EntityTypeId , key : string , assertExists = true ) : Entity {
@@ -161,8 +197,8 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
161
197
}
162
198
163
199
async cleanup ( ) : Promise < void > {
200
+ await this . failPoints . disableFailPoints ( ) ;
164
201
for ( const [ , client ] of this . mapOf ( 'client' ) ) {
165
- await client . disableFailPoints ( ) ;
166
202
await client . close ( ) ;
167
203
}
168
204
for ( const [ , session ] of this . mapOf ( 'session' ) ) {
@@ -178,7 +214,9 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
178
214
const map = new EntitiesMap ( ) ;
179
215
for ( const entity of entities ?? [ ] ) {
180
216
if ( 'client' in entity ) {
181
- const uri = config . url ( { useMultipleMongoses : entity . client . useMultipleMongoses } ) ;
217
+ const useMultipleMongoses =
218
+ config . topologyType === 'Sharded' && entity . client . useMultipleMongoses ;
219
+ const uri = config . url ( { useMultipleMongoses } ) ;
182
220
const client = new UnifiedMongoClient ( uri , entity . client ) ;
183
221
await client . connect ( ) ;
184
222
map . set ( entity . client . id , client ) ;
@@ -228,10 +266,7 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
228
266
}
229
267
}
230
268
231
- const session = client . startSession ( options ) as UnifiedClientSession ;
232
- // targetedFailPoint operations need to access the client the session came from
233
- session . client = client ;
234
-
269
+ const session = client . startSession ( options ) ;
235
270
map . set ( entity . session . id , session ) ;
236
271
} else if ( 'bucket' in entity ) {
237
272
const db = map . getEntity ( 'db' , entity . bucket . database ) ;
0 commit comments