PCSM-221. Add test to reproduce pcsm duplicate key error when handling shard key updates#144
PCSM-221. Add test to reproduce pcsm duplicate key error when handling shard key updates#144inelpandzic merged 13 commits intomainfrom
Conversation
8887f85 to
fc589b2
Compare
…g shard key updates
fc589b2 to
1a67b07
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds a test to reproduce a duplicate key error (PCSM-221) that occurs when the PCSM (Percona Cluster Sync for MongoDB) tool handles shard key updates in sharded collections. The test simulates concurrent shard key updates during the APPLY phase to trigger the race condition.
Key Changes:
- Added
test_shard_key_update_duplicate_key_errortest function that spawns a background thread performing shard key updates while the synchronization is running - Introduced threading and time module imports to support concurrent operations in the test
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| key_id = 100 + i | ||
| coll.insert_one({"key_id": key_id, "name": f"pre_sync_doc_{i}", "value": f"value_{key_id}"}) | ||
| stop_event = threading.Event() | ||
| def perform_shard_key_updates(): |
There was a problem hiding this comment.
The num_updates variable is set to 500, but with a 0.05 second sleep per iteration, this would take ~25 seconds to complete. However, the test only runs for 3 seconds before setting the stop event. This means the loop will only complete approximately 60 iterations before stopping. Consider either: (1) reducing num_updates to match the expected runtime (e.g., ~100), or (2) documenting that the thread is intentionally interrupted early for this test scenario.
| def perform_shard_key_updates(): | |
| def perform_shard_key_updates(): | |
| # Note: The thread running this function is intentionally interrupted early (after ~3 seconds). | |
| # The loop will not complete all 500 iterations; num_updates is set high to allow for interruption. |
| time.sleep(0.05) | ||
| update_thread = threading.Thread(target=perform_shard_key_updates) | ||
| update_thread.start() | ||
| time.sleep(3) |
There was a problem hiding this comment.
Using time.sleep(3) introduces non-deterministic behavior and makes the test timing-dependent. This could lead to flaky test results depending on system load. Consider using synchronization primitives (like the threading.Event already used) to signal when the thread has performed enough operations, or add a comment explaining why the fixed sleep is necessary for reproducing the specific race condition.
| def perform_shard_key_updates(): | ||
| num_updates = 500 | ||
| for i in range(1, num_updates + 1): | ||
| if stop_event.is_set(): | ||
| break | ||
| key_id = 200 + i | ||
| new_key_id = 5000 + i | ||
| coll.insert_one({"key_id": key_id, "name": f"test_doc_{i}", "value": f"value_{key_id}"}) | ||
| coll.update_one({"key_id": key_id}, {"$set": {"key_id": new_key_id, "shard_key_updated": True}}) | ||
| time.sleep(0.05) |
There was a problem hiding this comment.
The perform_shard_key_updates function lacks error handling for database operations. If an insert or update fails (e.g., due to network issues or duplicate key errors), the exception will be silently swallowed by the thread, making the test pass even when operations fail. Consider adding try-except blocks that log or store exceptions for later assertion, similar to the pattern in test_pcsm_118_ignore_incomplete_index in test_indexes.py.
|
|
||
| t.compare_all_sharded() | ||
|
|
||
| @pytest.mark.parametrize("phase", [Runner.Phase.APPLY]) |
There was a problem hiding this comment.
Consider adding a @pytest.mark.timeout decorator to prevent the test from hanging indefinitely if the thread or database operations fail to complete. Similar tests using threading (e.g., test_pcsm_118_ignore_incomplete_index in test_indexes.py) use @pytest.mark.timeout(60) to ensure proper cleanup if operations hang.
| @pytest.mark.parametrize("phase", [Runner.Phase.APPLY]) | |
| @pytest.mark.parametrize("phase", [Runner.Phase.APPLY]) | |
| @pytest.mark.timeout(60) |
| time.sleep(3) | ||
| with t.run(phase): | ||
| stop_event.set() | ||
| update_thread.join(timeout=5) |
There was a problem hiding this comment.
The join(timeout=5) call may leave the thread running if it doesn't complete within 5 seconds. After the timeout, the thread may still be executing database operations, which could interfere with the subsequent compare_all_sharded() call or leave resources in an inconsistent state. Consider either: (1) checking if the thread is still alive after join and raising an error, or (2) removing the timeout and relying on the stop_event to ensure the thread completes.
| update_thread.join(timeout=5) | |
| update_thread.join() |
to use a standalone function.
bulk write retry logic.
06c078b to
3624fa0
Compare
Problem
When replicating sharded collections, a race condition can cause duplicate key errors during the replication phase. This occurs when:
key_id: 200)key_id: 5000)ReplaceOnewith upsert, but the filtercontains the original shard key value which doesn't match the existing document on the target
The
ReplaceOnefails with a duplicate key error on_idbecause the document already exists but with a different shard key in the filter.Solution
Added duplicate key error handling for ReplaceOne operations in bulk writes. When a duplicate key error occurs, the failed operation is retried using a delete+insert fallback (delete by _id, then insert the document). Since bulk writes use ordered mode, operations before the failure are already applied and operations after are never executed. The remaining operations are then retried recursively, allowing multiple duplicate key errors in a single batch to be handled sequentially.