|
21 | 21 | sys.path[0:0] = [""]
|
22 | 22 |
|
23 | 23 | from test import IntegrationTest, client_context, unittest
|
24 |
| -from test.utils import rs_client |
| 24 | +from test.utils import rs_client, wait_until |
25 | 25 |
|
26 | 26 | import pymongo
|
27 | 27 | from pymongo.errors import ConnectionFailure, OperationFailure
|
@@ -1297,5 +1297,90 @@ def strptime(s):
|
1297 | 1297 | # End Versioned API Example 8
|
1298 | 1298 |
|
1299 | 1299 |
|
| 1300 | +class TestSnapshotQueryExamples(IntegrationTest): |
| 1301 | + @client_context.require_version_min(5, 0) |
| 1302 | + def test_snapshot_query(self): |
| 1303 | + client = self.client |
| 1304 | + |
| 1305 | + if not client_context.is_topology_type(["replicaset", "sharded"]): |
| 1306 | + self.skipTest("Must be a sharded or replicaset") |
| 1307 | + |
| 1308 | + self.addCleanup(client.drop_database, "pets") |
| 1309 | + db = client.pets |
| 1310 | + db.drop_collection("cats") |
| 1311 | + db.drop_collection("dogs") |
| 1312 | + db.cats.insert_one({"name": "Whiskers", "color": "white", "age": 10, "adoptable": True}) |
| 1313 | + db.dogs.insert_one({"name": "Pebbles", "color": "Brown", "age": 10, "adoptable": True}) |
| 1314 | + wait_until(lambda: self.check_for_snapshot(db.cats), "success") |
| 1315 | + wait_until(lambda: self.check_for_snapshot(db.dogs), "success") |
| 1316 | + |
| 1317 | + # Start Snapshot Query Example 1 |
| 1318 | + |
| 1319 | + db = client.pets |
| 1320 | + with client.start_session(snapshot=True) as s: |
| 1321 | + adoptablePetsCount = db.cats.aggregate( |
| 1322 | + [{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}], session=s |
| 1323 | + ).next()["adoptableCatsCount"] |
| 1324 | + |
| 1325 | + adoptablePetsCount += db.dogs.aggregate( |
| 1326 | + [{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}], session=s |
| 1327 | + ).next()["adoptableDogsCount"] |
| 1328 | + |
| 1329 | + print(adoptablePetsCount) |
| 1330 | + |
| 1331 | + # End Snapshot Query Example 1 |
| 1332 | + db = client.retail |
| 1333 | + self.addCleanup(client.drop_database, "retail") |
| 1334 | + db.drop_collection("sales") |
| 1335 | + |
| 1336 | + saleDate = datetime.datetime.now() |
| 1337 | + db.sales.insert_one({"shoeType": "boot", "price": 30, "saleDate": saleDate}) |
| 1338 | + wait_until(lambda: self.check_for_snapshot(db.sales), "success") |
| 1339 | + |
| 1340 | + # Start Snapshot Query Example 2 |
| 1341 | + db = client.retail |
| 1342 | + with client.start_session(snapshot=True) as s: |
| 1343 | + total = db.sales.aggregate( |
| 1344 | + [ |
| 1345 | + { |
| 1346 | + "$match": { |
| 1347 | + "$expr": { |
| 1348 | + "$gt": [ |
| 1349 | + "$saleDate", |
| 1350 | + { |
| 1351 | + "$dateSubtract": { |
| 1352 | + "startDate": "$$NOW", |
| 1353 | + "unit": "day", |
| 1354 | + "amount": 1, |
| 1355 | + } |
| 1356 | + }, |
| 1357 | + ] |
| 1358 | + } |
| 1359 | + } |
| 1360 | + }, |
| 1361 | + {"$count": "totalDailySales"}, |
| 1362 | + ], |
| 1363 | + session=s, |
| 1364 | + ).next()["totalDailySales"] |
| 1365 | + |
| 1366 | + # End Snapshot Query Example 2 |
| 1367 | + |
| 1368 | + def check_for_snapshot(self, collection): |
| 1369 | + """Wait for snapshot reads to become available to prevent this error: |
| 1370 | + [246:SnapshotUnavailable]: Unable to read from a snapshot due to pending collection catalog changes; please retry the operation. Snapshot timestamp is Timestamp(1646666892, 4). Collection minimum is Timestamp(1646666892, 5) (on localhost:27017, modern retry, attempt 1) |
| 1371 | + From https://github.com/mongodb/mongo-ruby-driver/commit/7c4117b58e3d12e237f7536f7521e18fc15f79ac |
| 1372 | + """ |
| 1373 | + with self.client.start_session(snapshot=True) as s: |
| 1374 | + try: |
| 1375 | + with collection.aggregate([], session=s): |
| 1376 | + pass |
| 1377 | + return True |
| 1378 | + except OperationFailure as e: |
| 1379 | + # Retry them as the server demands... |
| 1380 | + if e.code == 246: # SnapshotUnavailable |
| 1381 | + return False |
| 1382 | + raise |
| 1383 | + |
| 1384 | + |
1300 | 1385 | if __name__ == "__main__":
|
1301 | 1386 | unittest.main()
|
0 commit comments