|
18 | 18 | db["Migration"].insert_one({"migration_name": "FreeTier", "migration_complete": False})
|
19 | 19 |
|
20 | 20 | earliest_timestamp = datetime.now()
|
| 21 | +latest_timestamp = datetime.min |
21 | 22 |
|
22 | 23 | for collection_name in collections_to_migrate:
|
23 | 24 | collection = db[collection_name]
|
24 | 25 | earliest_document = collection.find_one(sort=[("timestamp", ASCENDING)])
|
| 26 | + latest_document = collection.find_one(sort=[("timestamp", -1)]) |
25 | 27 | if earliest_document and earliest_document["timestamp"] < earliest_timestamp:
|
26 | 28 | earliest_timestamp = earliest_document["timestamp"]
|
| 29 | + if latest_document and latest_document["timestamp"] > latest_timestamp: |
| 30 | + latest_timestamp = latest_document["timestamp"] |
27 | 31 |
|
28 | 32 | start_date = earliest_timestamp
|
29 | 33 | one_week = timedelta(weeks=1)
|
30 | 34 |
|
31 |
| -while True: |
| 35 | +while start_date <= latest_timestamp: |
32 | 36 | end_date = start_date + one_week
|
33 | 37 | buffer = {}
|
34 |
| - has_data = False # Flag to check if data exists for the current chunk |
35 | 38 |
|
36 | 39 | for collection_name in collections_to_migrate:
|
37 | 40 | collection = db[collection_name]
|
38 | 41 | for document in collection.find({"timestamp": {"$gte": start_date, "$lt": end_date}}):
|
39 |
| - has_data = True # Data exists for this chunk |
40 |
| - |
41 | 42 | unit = document["metadata"].get("unit")
|
42 | 43 | new_document = {
|
43 | 44 | "metadata": {
|
|
64 | 65 | else:
|
65 | 66 | buffer[document["timestamp"]] = new_document
|
66 | 67 |
|
67 |
| - if not has_data: # If no data for the current chunk, stop the loop |
68 |
| - break |
69 |
| - |
70 | 68 | all_documents = sorted(buffer.values(), key=itemgetter("timestamp"))
|
71 | 69 | free_tier_collection = db["FreeTier"]
|
72 |
| - |
73 | 70 | for document in all_documents:
|
74 | 71 | free_tier_collection.insert_one(document)
|
75 | 72 |
|
76 |
| - start_date = end_date # Move to the next chunk |
| 73 | + # Update the latest_timestamp after processing this chunk, to check if new data has been added. |
| 74 | + new_latest_timestamp = datetime.min |
| 75 | + for collection_name in collections_to_migrate: |
| 76 | + collection = db[collection_name] |
| 77 | + latest_document = collection.find_one(sort=[("timestamp", -1)]) |
| 78 | + if latest_document and latest_document["timestamp"] > new_latest_timestamp: |
| 79 | + new_latest_timestamp = latest_document["timestamp"] |
| 80 | + |
| 81 | + # If there are new records added, the while loop will continue until there are no more records. |
| 82 | + latest_timestamp = new_latest_timestamp |
| 83 | + start_date = end_date |
77 | 84 |
|
78 |
| -db["Migration"].update_one({"migration_name": "FreeTier"}, {"$set": {"migration_complete": True}}) |
| 85 | +# db["Migration"].update_one({"migration_name": "FreeTier"}, {"$set": {"migration_complete": True}}) |
0 commit comments