|
1 | 1 | # pylint: disable=missing-docstring,redefined-outer-name |
| 2 | +import threading |
| 3 | +import time |
2 | 4 | from datetime import datetime |
3 | 5 |
|
4 | 6 | import pytest |
@@ -85,3 +87,35 @@ def test_create_collection_with_collation_with_shard_key_index_prefix( |
85 | 87 | ) |
86 | 88 |
|
87 | 89 | t.compare_all_sharded() |
| 90 | + |
| 91 | +@pytest.mark.parametrize("phase", [Runner.Phase.APPLY]) |
| 92 | +def test_shard_key_update_duplicate_key_error(t: Testing, phase: Runner.Phase): |
| 93 | + """ |
| 94 | + Test to reproduce pcsm duplicate key error when handling shard key updates |
| 95 | + """ |
| 96 | + db_name = "test_db" |
| 97 | + collection_name = "test_collection" |
| 98 | + coll = t.source[db_name][collection_name] |
| 99 | + t.source.admin.command("shardCollection", f"{db_name}.{collection_name}", key={"key_id": 1}) |
| 100 | + coll.insert_one({"key_id": 0, "name": "item_0", "value": "value_0"}) |
| 101 | + for i in range(1, 10): |
| 102 | + key_id = 100 + i |
| 103 | + coll.insert_one({"key_id": key_id, "name": f"pre_sync_doc_{i}", "value": f"value_{key_id}"}) |
| 104 | + stop_event = threading.Event() |
| 105 | + def perform_shard_key_updates(): |
| 106 | + num_updates = 500 |
| 107 | + for i in range(1, num_updates + 1): |
| 108 | + if stop_event.is_set(): |
| 109 | + break |
| 110 | + key_id = 200 + i |
| 111 | + new_key_id = 5000 + i |
| 112 | + coll.insert_one({"key_id": key_id, "name": f"test_doc_{i}", "value": f"value_{key_id}"}) |
| 113 | + coll.update_one({"key_id": key_id}, {"$set": {"key_id": new_key_id, "shard_key_updated": True}}) |
| 114 | + time.sleep(0.05) |
| 115 | + update_thread = threading.Thread(target=perform_shard_key_updates) |
| 116 | + update_thread.start() |
| 117 | + time.sleep(3) |
| 118 | + with t.run(phase): |
| 119 | + stop_event.set() |
| 120 | + update_thread.join(timeout=5) |
| 121 | + t.compare_all_sharded() |
0 commit comments