Skip to content

Commit f63b3bb

Browse files
authored
PCSM-237: Fix race conditions for clone phase in tests (#157)
1 parent 100da38 commit f63b3bb

File tree

5 files changed

+58
-19
lines changed

5 files changed

+58
-19
lines changed

CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
* @inelpandzic @boris-ilijic
1+
* @inelpandzic @chupe

tests/pcsm.py

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,14 @@ class Phase(StrEnum):
116116
def __init__(
117117
self,
118118
source: MongoClient,
119+
target: MongoClient,
119120
pcsm: PCSM,
120121
phase: Phase,
121122
options: dict,
122123
wait_timeout=None,
123124
):
124125
self.source: MongoClient = source
126+
self.target: MongoClient = target
125127
self.pcsm = pcsm
126128
self.phase = phase
127129
self.options = options
@@ -156,12 +158,33 @@ def finalize(self, *, fast=False):
156158
if state["state"] == PCSM.State.PAUSED:
157159
if state["initialSync"]["cloneCompleted"]:
158160
self.pcsm.resume()
161+
# After resuming, wait briefly to ensure the replication stream is fully active
162+
# before attempting to sync to current optime. This prevents race conditions
163+
# where wait_for_current_optime() is called before the change stream starts.
164+
time.sleep(0.5)
159165
state = self.pcsm.status()
160166

161167
if state["state"] == PCSM.State.RUNNING:
162-
if not fast:
163-
self.wait_for_current_optime()
168+
# First, always wait for initial sync to complete
164169
self.wait_for_initial_sync()
170+
171+
if not fast:
172+
# Refresh state after initial sync completes
173+
state = self.pcsm.status()
174+
175+
# For CLONE phase: if initial sync completed, PCSM has caught up and might
176+
# not be actively processing new oplogs. Skip wait_for_current_optime().
177+
# For APPLY/MANUAL phase: always wait to ensure real-time operations are replicated.
178+
skip_wait = self.phase == self.Phase.CLONE and state["initialSync"]["completed"]
179+
180+
if not skip_wait:
181+
self.wait_for_current_optime()
182+
else:
183+
# CLONE phase with initial sync done - just ensure metadata is visible
184+
for retry in range(6):
185+
self.target.admin.command("ping")
186+
time.sleep(min(0.05 * (2**retry), 0.2))
187+
165188
self.pcsm.finalize()
166189
state = self.pcsm.status()
167190

@@ -194,9 +217,23 @@ def wait_for_current_optime(self):
194217
for _ in range(self.wait_timeout * 2):
195218
last_applied = self.last_applied_op
196219
if curr_optime <= last_applied:
197-
# Even though the oplog entry is replicated, PCSM may not have finished
198-
# writing to the target database. Add a small delay to ensure completion.
199-
time.sleep(0.3)
220+
# Even though PCSM has processed the oplog entry, MongoDB metadata updates
221+
# (like collection/database creation) may not be immediately visible to other
222+
# connections. Poll the target with exponential backoff to ensure visibility.
223+
for retry in range(6):
224+
# When PCSM creates a collection or database on the target cluster:
225+
# - PCSM writes the change and confirms it's applied
226+
# - But test's MongoDB connection still has stale metadata cached
227+
# - Immediately querying for that collection might return "not found"
228+
229+
# The ping command causes the driver to refresh its metadata cache,
230+
# ensuring subsequent queries see the latest state.
231+
self.target.admin.command("ping")
232+
233+
# Exponential backoff with cap: 0.05s, 0.10s, 0.20s, 0.20s, 0.20s, 0.20s
234+
# Total wait: ~0.95 seconds
235+
time.sleep(min(0.05 * (2**retry), 0.2))
236+
200237
return
201238

202239
time.sleep(0.5)

tests/test_selective.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,17 @@
55
from pymongo import MongoClient
66

77

8-
def perform_with_options(source, pcsm, phase: Runner.Phase, include_ns=None, exclude_ns=None):
8+
def perform_with_options(
9+
source, target, pcsm, phase: Runner.Phase, include_ns=None, exclude_ns=None
10+
):
911
"""Perform the PCSM operation with the given options."""
1012
pcsm_options = {}
1113
if include_ns:
1214
pcsm_options["include_namespaces"] = include_ns
1315
if exclude_ns:
1416
pcsm_options["exclude_namespaces"] = exclude_ns
1517

16-
return Runner(source, pcsm, phase, pcsm_options)
18+
return Runner(source, target, pcsm, phase, pcsm_options)
1719

1820

1921
def check_if_target_is_subset(source: MongoClient, target: MongoClient):
@@ -35,6 +37,7 @@ def check_if_target_is_subset(source: MongoClient, target: MongoClient):
3537
def test_create_collection_with_include_only(t: testing.Testing, phase: Runner.Phase):
3638
with perform_with_options(
3739
t.source,
40+
t.target,
3841
t.pcsm,
3942
phase,
4043
include_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1", "db_2.coll_0", "db_2.coll_1"],
@@ -63,7 +66,7 @@ def test_create_collection_with_include_only(t: testing.Testing, phase: Runner.P
6366
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
6467
def test_create_collection_with_exclude_only(t: testing.Testing, phase: Runner.Phase):
6568
with perform_with_options(
66-
t.source, t.pcsm, phase, exclude_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1"]
69+
t.source, t.target, t.pcsm, phase, exclude_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1"]
6770
):
6871
for db in range(3):
6972
for coll in range(3):
@@ -90,6 +93,7 @@ def test_create_collection_with_exclude_only(t: testing.Testing, phase: Runner.P
9093
def test_create_collection(t: testing.Testing, phase: Runner.Phase):
9194
with perform_with_options(
9295
t.source,
96+
t.target,
9397
t.pcsm,
9498
phase,
9599
include_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1", "db_2.coll_0", "db_2.coll_1"],
@@ -104,15 +108,12 @@ def test_create_collection(t: testing.Testing, phase: Runner.Phase):
104108
# "db_0.coll_0",
105109
# "db_0.coll_1",
106110
# "db_0.coll_2",
107-
108111
# "db_1.coll_0",
109112
"db_1.coll_1",
110113
# "db_1.coll_2",
111-
112114
"db_2.coll_0",
113115
"db_2.coll_1",
114116
# "db_2.coll_2",
115-
116117
"db_3.coll_0",
117118
# "db_3.coll_1",
118119
"db_3.coll_2",

tests/test_selective_sharded.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,17 @@
55
from pymongo import MongoClient
66

77

8-
def perform_with_options(source, pcsm, phase: Runner.Phase, include_ns=None, exclude_ns=None):
8+
def perform_with_options(
9+
source, target, pcsm, phase: Runner.Phase, include_ns=None, exclude_ns=None
10+
):
911
"""Perform the PCSM operation with the given options."""
1012
pcsm_options = {}
1113
if include_ns:
1214
pcsm_options["include_namespaces"] = include_ns
1315
if exclude_ns:
1416
pcsm_options["exclude_namespaces"] = exclude_ns
1517

16-
return Runner(source, pcsm, phase, pcsm_options)
18+
return Runner(source, target, pcsm, phase, pcsm_options)
1719

1820

1921
def check_if_target_is_subset(source: MongoClient, target: MongoClient):
@@ -35,6 +37,7 @@ def check_if_target_is_subset(source: MongoClient, target: MongoClient):
3537
def test_create_collection_with_include_only(t: testing.Testing, phase: Runner.Phase):
3638
with perform_with_options(
3739
t.source,
40+
t.target,
3841
t.pcsm,
3942
phase,
4043
include_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1", "db_2.coll_0", "db_2.coll_1"],
@@ -64,7 +67,7 @@ def test_create_collection_with_include_only(t: testing.Testing, phase: Runner.P
6467
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
6568
def test_create_collection_with_exclude_only(t: testing.Testing, phase: Runner.Phase):
6669
with perform_with_options(
67-
t.source, t.pcsm, phase, exclude_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1"]
70+
t.source, t.target, t.pcsm, phase, exclude_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1"]
6871
):
6972
for db in range(3):
7073
for coll in range(3):
@@ -92,6 +95,7 @@ def test_create_collection_with_exclude_only(t: testing.Testing, phase: Runner.P
9295
def test_create_collection(t: testing.Testing, phase: Runner.Phase):
9396
with perform_with_options(
9497
t.source,
98+
t.target,
9599
t.pcsm,
96100
phase,
97101
include_ns=["db_0.*", "db_1.coll_0", "db_1.coll_1", "db_2.coll_0", "db_2.coll_1"],
@@ -107,15 +111,12 @@ def test_create_collection(t: testing.Testing, phase: Runner.Phase):
107111
# "db_0.coll_0",
108112
# "db_0.coll_1",
109113
# "db_0.coll_2",
110-
111114
# "db_1.coll_0",
112115
"db_1.coll_1",
113116
# "db_1.coll_2",
114-
115117
"db_2.coll_0",
116118
"db_2.coll_1",
117119
# "db_2.coll_2",
118-
119120
"db_3.coll_0",
120121
# "db_3.coll_1",
121122
"db_3.coll_2",

tests/testing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def __init__(self, source: MongoClient, target: MongoClient, pcsm: PCSM):
1717

1818
def run(self, phase: Runner.Phase, wait_timeout=None):
1919
"""Perform the PCSM operation for the given phase."""
20-
return Runner(self.source, self.pcsm, phase, {}, wait_timeout=wait_timeout)
20+
return Runner(self.source, self.target, self.pcsm, phase, {}, wait_timeout=wait_timeout)
2121

2222
def compare_all(self, sort=None):
2323
"""Compare all databases and collections between source and target MongoDB."""

0 commit comments

Comments
 (0)