|
1 | 1 | from datetime import datetime, timedelta
|
2 | 2 | from operator import itemgetter
|
3 | 3 |
|
4 |
| -from pymongo import ASCENDING |
5 |
| - |
6 | 4 | from opensensor.collection_apis import new_collections, old_collections
|
7 |
| - |
8 |
| -# Create a MongoDB client |
9 | 5 | from opensensor.db import get_open_sensor_db
|
10 | 6 |
|
11 | 7 | # Access the database
|
|
17 | 13 | if not migration:
|
18 | 14 | db["Migration"].insert_one({"migration_name": "FreeTier", "migration_complete": False})
|
19 | 15 |
|
20 |
| -earliest_timestamp = datetime.now() |
21 |
| -latest_timestamp = datetime.min |
22 |
| - |
23 |
| -for collection_name in collections_to_migrate: |
24 |
| - collection = db[collection_name] |
25 |
| - earliest_document = collection.find_one(sort=[("timestamp", ASCENDING)]) |
26 |
| - latest_document = collection.find_one(sort=[("timestamp", -1)]) |
27 |
| - if earliest_document and earliest_document["timestamp"] < earliest_timestamp: |
28 |
| - earliest_timestamp = earliest_document["timestamp"] |
29 |
| - if latest_document and latest_document["timestamp"] > latest_timestamp: |
30 |
| - latest_timestamp = latest_document["timestamp"] |
31 |
| - |
| 16 | +earliest_timestamp = datetime(2023, 1, 1) |
32 | 17 | start_date = earliest_timestamp
|
33 |
| -one_week = timedelta(weeks=1) |
| 18 | +one_day = timedelta(days=1) # Change to one day |
34 | 19 |
|
35 |
| -while start_date <= latest_timestamp: |
36 |
| - end_date = start_date + one_week |
| 20 | +while start_date <= datetime(2023, 11, 10): |
| 21 | + end_date = start_date + one_day # Use one day |
37 | 22 | buffer = {}
|
| 23 | + timestamps_set = set() # For faster timestamp lookups |
| 24 | + print(start_date, end_date) |
38 | 25 |
|
39 | 26 | for collection_name in collections_to_migrate:
|
40 | 27 | collection = db[collection_name]
|
|
44 | 31 | "metadata": {
|
45 | 32 | "device_id": document["metadata"]["device_id"],
|
46 | 33 | "name": document["metadata"].get("name"),
|
47 |
| - "user_id": document.get("user_id"), |
| 34 | + "user_id": document["metadata"].get("user_id"), |
48 | 35 | },
|
49 | 36 | new_collections[collection_name]: document.get(old_collections[collection_name]),
|
50 | 37 | "timestamp": document["timestamp"],
|
51 | 38 | }
|
52 | 39 | if unit:
|
53 | 40 | new_document[f"{new_collections[collection_name]}_unit"] = unit
|
54 | 41 |
|
55 |
| - for existing_timestamp in buffer.keys(): |
| 42 | + found = False |
| 43 | + for existing_timestamp in timestamps_set: |
56 | 44 | if abs(existing_timestamp - document["timestamp"]) <= timedelta(seconds=3):
|
57 | 45 | buffer[existing_timestamp][new_collections[collection_name]] = document.get(
|
58 | 46 | old_collections[collection_name]
|
|
61 | 49 | buffer[existing_timestamp][
|
62 | 50 | f"{new_collections[collection_name]}_unit"
|
63 | 51 | ] = unit
|
| 52 | + found = True |
64 | 53 | break
|
65 |
| - else: |
| 54 | + |
| 55 | + if not found: |
66 | 56 | buffer[document["timestamp"]] = new_document
|
| 57 | + timestamps_set.add(document["timestamp"]) |
67 | 58 |
|
68 | 59 | all_documents = sorted(buffer.values(), key=itemgetter("timestamp"))
|
69 |
| - free_tier_collection = db["FreeTier"] |
70 |
| - for document in all_documents: |
71 |
| - free_tier_collection.insert_one(document) |
72 | 60 |
|
73 |
| - # Update the latest_timestamp after processing this chunk, to check if new data has been added. |
74 |
| - new_latest_timestamp = datetime.min |
75 |
| - for collection_name in collections_to_migrate: |
76 |
| - collection = db[collection_name] |
77 |
| - latest_document = collection.find_one(sort=[("timestamp", -1)]) |
78 |
| - if latest_document and latest_document["timestamp"] > new_latest_timestamp: |
79 |
| - new_latest_timestamp = latest_document["timestamp"] |
| 61 | + if all_documents: # Only insert if there are documents |
| 62 | + db["FreeTier"].insert_many(all_documents) |
80 | 63 |
|
81 |
| - # If there are new records added, the while loop will continue until there are no more records. |
82 |
| - latest_timestamp = new_latest_timestamp |
83 | 64 | start_date = end_date
|
84 | 65 |
|
85 |
| -# db["Migration"].update_one({"migration_name": "FreeTier"}, {"$set": {"migration_complete": True}}) |
| 66 | +db["Migration"].update_one({"migration_name": "FreeTier"}, {"$set": {"migration_complete": True}}) |
0 commit comments