9
9
from typing import Callable , Dict , List , Optional
10
10
11
11
import pymongo
12
- from kubetester import kubetester
13
- from kubetester .kubetester import KubernetesTester
14
12
from opentelemetry import trace
15
13
from pycognito import Cognito
16
14
from pymongo .auth_oidc import OIDCCallback , OIDCCallbackContext , OIDCCallbackResult
17
15
from pymongo .errors import OperationFailure , PyMongoError , ServerSelectionTimeoutError
18
16
from pytest import fail
19
17
18
+ from tests .conftest import get_central_cluster_client
19
+ import kubernetes .client as client
20
+ from kubetester import kubetester
21
+ from kubetester .kubetester import KubernetesTester
22
+ from kubetester .mongodb_user import MongoDBUser
23
+ from kubetester .phase import Phase
24
+
20
25
TEST_DB = "test-db"
21
26
TEST_COLLECTION = "test-collection"
22
27
@@ -76,6 +81,57 @@ def fetch(self, context: OIDCCallbackContext) -> OIDCCallbackResult:
76
81
return OIDCCallbackResult (access_token = u .id_token )
77
82
78
83
84
+ def _wait_for_mongodbuser_reconciliation () -> None :
85
+ """
86
+ Wait for ALL MongoDBUser resources in the namespace to be reconciled before attempting authentication.
87
+ This prevents race conditions when passwords or user configurations have been recently changed.
88
+
89
+ Lists all MongoDBUser resources in the namespace and waits for ALL of them to reach Updated phase.
90
+ """
91
+ try :
92
+ namespace = KubernetesTester .get_namespace ()
93
+ api_client = client .CustomObjectsApi (api_client = get_central_cluster_client ())
94
+
95
+ try :
96
+ mongodb_users = api_client .list_namespaced_custom_object (
97
+ group = "mongodb.com" ,
98
+ version = "v1" ,
99
+ namespace = namespace ,
100
+ plural = "mongodbusers"
101
+ )
102
+
103
+ all_users = []
104
+
105
+ for user_item in mongodb_users .get ("items" , []):
106
+ user_name = user_item .get ("metadata" , {}).get ("name" , "unknown" )
107
+ username = user_item .get ("spec" , {}).get ("username" , "unknown" )
108
+ all_users .append ((user_name , username ))
109
+
110
+ if not all_users :
111
+ return
112
+
113
+ logging .info (f"Found { len (all_users )} MongoDBUser resource(s) in namespace '{ namespace } ', waiting for all to reach Updated phase..." )
114
+
115
+ for user_name , username in all_users :
116
+ try :
117
+ logging .info (f"Waiting for MongoDBUser '{ user_name } ' (username: { username } ) to reach Updated phase..." )
118
+
119
+ user = MongoDBUser (name = user_name , namespace = namespace )
120
+ user .assert_reaches_phase (Phase .Updated , timeout = 300 )
121
+ logging .info (f"MongoDBUser '{ user_name } ' reached Updated phase - reconciliation complete" )
122
+
123
+ except Exception as e :
124
+ logging .warning (f"Failed to wait for MongoDBUser '{ user_name } ' reconciliation: { e } " )
125
+ # Continue with other users - don't fail the entire test
126
+
127
+ logging .info ("All MongoDBUser resources reconciliation check complete" )
128
+
129
+ except Exception as e :
130
+ logging .warning (f"Failed to list MongoDBUser resources: { e } - proceeding without reconciliation wait" )
131
+ except Exception as e :
132
+ logging .warning (f"Error while waiting for MongoDBUser reconciliation: { e } - proceeding with authentication" )
133
+
134
+
79
135
class MongoTester :
80
136
"""MongoTester is a general abstraction to work with mongo database. It encapsulates the client created in
81
137
the constructor. All general methods non-specific to types of mongodb topologies should reside here."""
@@ -115,7 +171,7 @@ def _init_client(self, **kwargs):
115
171
116
172
def assert_connectivity (
117
173
self ,
118
- attempts : int = 30 ,
174
+ attempts : int = 50 ,
119
175
db : str = "admin" ,
120
176
col : str = "myCol" ,
121
177
opts : Optional [List [Dict [str , any ]]] = None ,
@@ -175,13 +231,17 @@ def assert_scram_sha_authentication(
175
231
username : str ,
176
232
password : str ,
177
233
auth_mechanism : str ,
178
- attempts : int = 30 ,
234
+ attempts : int = 50 ,
179
235
ssl : bool = False ,
180
236
** kwargs ,
181
237
) -> None :
182
238
assert attempts > 0
183
239
assert auth_mechanism in {"SCRAM-SHA-256" , "SCRAM-SHA-1" }
184
240
241
+ # Wait for ALL MongoDBUser resources to be reconciled before attempting authentication
242
+ # This prevents race conditions when passwords have been recently changed
243
+ _wait_for_mongodbuser_reconciliation ()
244
+
185
245
for i in reversed (range (attempts )):
186
246
try :
187
247
self ._authenticate_with_scram (
@@ -209,7 +269,7 @@ def assert_scram_sha_authentication_fails(
209
269
self ,
210
270
username : str ,
211
271
password : str ,
212
- retries : int = 30 ,
272
+ attempts : int = 50 ,
213
273
ssl : bool = False ,
214
274
** kwargs ,
215
275
):
@@ -219,13 +279,16 @@ def assert_scram_sha_authentication_fails(
219
279
which still exists. When we change a password, we should eventually no longer be able to auth with
220
280
that user's credentials.
221
281
"""
222
- for i in range (retries ):
282
+
283
+ _wait_for_mongodbuser_reconciliation ()
284
+
285
+ for i in range (attempts ):
223
286
try :
224
287
self ._authenticate_with_scram (username , password , ssl = ssl , ** kwargs )
225
288
except OperationFailure :
226
289
return
227
290
time .sleep (5 )
228
- fail (f"was still able to authenticate with username={ username } password={ password } after { retries } attempts" )
291
+ fail (f"was still able to authenticate with username={ username } password={ password } after { attempts } attempts" )
229
292
230
293
def _authenticate_with_scram (
231
294
self ,
@@ -247,9 +310,11 @@ def _authenticate_with_scram(
247
310
# authentication doesn't actually happen until we interact with a database
248
311
self .client ["admin" ]["myCol" ].insert_one ({})
249
312
250
- def assert_x509_authentication (self , cert_file_name : str , attempts : int = 30 , ** kwargs ):
313
+ def assert_x509_authentication (self , cert_file_name : str , attempts : int = 50 , ** kwargs ):
251
314
assert attempts > 0
252
315
316
+ _wait_for_mongodbuser_reconciliation ()
317
+
253
318
options = self ._merge_options (
254
319
[
255
320
with_x509 (cert_file_name , kwargs .get ("tlsCAFile" , kubetester .SSL_CA_CERT )),
@@ -267,14 +332,7 @@ def assert_x509_authentication(self, cert_file_name: str, attempts: int = 30, **
267
332
if attempts == 0 :
268
333
fail (f"unable to authenticate after { total_attempts } attempts" )
269
334
270
- # Use adaptive retry delays for better credential propagation handling
271
- remaining_attempts = attempts
272
- if remaining_attempts >= total_attempts * 0.7 : # First ~30% of attempts
273
- delay = 10 # Longer delay for initial propagation
274
- else :
275
- delay = 5 # Standard delay for normal retries
276
-
277
- time .sleep (delay )
335
+ time .sleep (5 )
278
336
279
337
def assert_ldap_authentication (
280
338
self ,
@@ -284,8 +342,9 @@ def assert_ldap_authentication(
284
342
collection : str = "myCol" ,
285
343
tls_ca_file : Optional [str ] = None ,
286
344
ssl_certfile : str = None ,
287
- attempts : int = 30 ,
345
+ attempts : int = 50 ,
288
346
):
347
+ _wait_for_mongodbuser_reconciliation ()
289
348
290
349
options = with_ldap (ssl_certfile , tls_ca_file )
291
350
total_attempts = attempts
@@ -307,23 +366,18 @@ def assert_ldap_authentication(
307
366
if attempts <= 0 :
308
367
fail (f"unable to authenticate after { total_attempts } attempts" )
309
368
310
- # Use adaptive retry delays for better credential propagation handling
311
- remaining_attempts = attempts
312
- if remaining_attempts >= total_attempts * 0.7 : # First ~30% of attempts
313
- delay = 10 # Longer delay for initial propagation
314
- else :
315
- delay = 5 # Standard delay for normal retries
316
-
317
- time .sleep (delay )
369
+ time .sleep (5 )
318
370
319
371
def assert_oidc_authentication (
320
372
self ,
321
373
db : str = "admin" ,
322
374
collection : str = "myCol" ,
323
- attempts : int = 10 ,
375
+ attempts : int = 50 ,
324
376
):
325
377
assert attempts > 0
326
378
379
+ _wait_for_mongodbuser_reconciliation ()
380
+
327
381
props = {"OIDC_CALLBACK" : MyOIDCCallback ()}
328
382
329
383
total_attempts = attempts
@@ -342,14 +396,7 @@ def assert_oidc_authentication(
342
396
if attempts == 0 :
343
397
raise RuntimeError (f"Unable to authenticate after { total_attempts } attempts: { e } " )
344
398
345
- # Use adaptive retry delays for better credential propagation handling
346
- remaining_attempts = attempts
347
- if remaining_attempts >= total_attempts * 0.7 : # First ~30% of attempts
348
- delay = 10 # Longer delay for initial propagation
349
- else :
350
- delay = 5 # Standard delay for normal retries
351
-
352
- time .sleep (delay )
399
+ time .sleep (5 )
353
400
354
401
def assert_oidc_authentication_fails (self , db : str = "admin" , collection : str = "myCol" , attempts : int = 10 ):
355
402
assert attempts > 0
0 commit comments