Skip to content

Commit 13091b8

Browse files
committed
Revise migration script to consider future insertions after the script begins.
1 parent f3b625a commit 13091b8

File tree

1 file changed

+43
-65
lines changed

1 file changed

+43
-65
lines changed

migrate_collections.py

Lines changed: 43 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -11,90 +11,68 @@
1111
# Access the database
1212
db = get_open_sensor_db()
1313

14-
# List of all collections/models to migrate
1514
collections_to_migrate = ["Temperature", "Humidity", "Pressure", "Lux", "CO2", "PH", "Moisture"]
1615

1716
migration = db.Migration.find_one({"migration_name": "FreeTier"})
1817
if not migration:
1918
db["Migration"].insert_one({"migration_name": "FreeTier", "migration_complete": False})
2019

21-
# Determine the earliest and latest timestamps in your data
2220
earliest_timestamp = datetime.now()
23-
latest_timestamp = datetime.min
2421

2522
for collection_name in collections_to_migrate:
2623
collection = db[collection_name]
2724
earliest_document = collection.find_one(sort=[("timestamp", ASCENDING)])
28-
latest_document = collection.find_one(sort=[("timestamp", -1)])
2925
if earliest_document and earliest_document["timestamp"] < earliest_timestamp:
3026
earliest_timestamp = earliest_document["timestamp"]
31-
if latest_document and latest_document["timestamp"] > latest_timestamp:
32-
latest_timestamp = latest_document["timestamp"]
3327

34-
# Migrate data in chunks, e.g., one week at a time
3528
start_date = earliest_timestamp
3629
one_week = timedelta(weeks=1)
3730

38-
while start_date <= latest_timestamp:
31+
while True:
3932
end_date = start_date + one_week
40-
all_documents = []
41-
42-
# Define a "reasonable" time window
43-
reasonable_time = timedelta(seconds=3)
44-
45-
# Migrate data in chunks
46-
start_date = earliest_timestamp
47-
48-
while start_date <= latest_timestamp:
49-
end_date = start_date + one_week
50-
buffer = {} # We'll store the records for the current time window here
51-
52-
for collection_name in collections_to_migrate:
53-
collection = db[collection_name]
54-
for document in collection.find({"timestamp": {"$gte": start_date, "$lt": end_date}}):
55-
# Convert to the FreeTier model
56-
unit = document["metadata"].get("unit")
57-
new_document = {
58-
"metadata": {
59-
"device_id": document["metadata"]["device_id"],
60-
"name": document["metadata"].get("name"),
61-
"user_id": document.get("user_id"),
62-
},
63-
new_collections[collection_name]: document.get(
33+
buffer = {}
34+
has_data = False # Flag to check if data exists for the current chunk
35+
36+
for collection_name in collections_to_migrate:
37+
collection = db[collection_name]
38+
for document in collection.find({"timestamp": {"$gte": start_date, "$lt": end_date}}):
39+
has_data = True # Data exists for this chunk
40+
41+
unit = document["metadata"].get("unit")
42+
new_document = {
43+
"metadata": {
44+
"device_id": document["metadata"]["device_id"],
45+
"name": document["metadata"].get("name"),
46+
"user_id": document.get("user_id"),
47+
},
48+
new_collections[collection_name]: document.get(old_collections[collection_name]),
49+
"timestamp": document["timestamp"],
50+
}
51+
if unit:
52+
new_document[f"{new_collections[collection_name]}_unit"] = unit
53+
54+
for existing_timestamp in buffer.keys():
55+
if abs(existing_timestamp - document["timestamp"]) <= timedelta(seconds=3):
56+
buffer[existing_timestamp][new_collections[collection_name]] = document.get(
6457
old_collections[collection_name]
65-
),
66-
"timestamp": document["timestamp"],
67-
}
68-
if unit:
69-
new_document[f"{new_collections[collection_name]}_unit"] = unit
70-
71-
# Merge with an existing document if it's within a reasonable time,
72-
# otherwise add a new document to the buffer
73-
for existing_timestamp in buffer.keys():
74-
if abs(existing_timestamp - document["timestamp"]) <= reasonable_time:
75-
buffer[existing_timestamp][new_collections[collection_name]] = document.get(
76-
old_collections[collection_name]
77-
)
78-
if unit:
79-
buffer[existing_timestamp][
80-
f"{new_collections[collection_name]}_unit"
81-
] = unit
82-
break
83-
else:
84-
buffer[document["timestamp"]] = new_document
85-
86-
# Sort all documents by timestamp
87-
all_documents = sorted(buffer.values(), key=itemgetter("timestamp"))
88-
89-
# Access the destination collection
90-
free_tier_collection = db["FreeTier"]
91-
92-
# Insert all documents into the new collection, in sorted order
93-
for document in all_documents:
94-
free_tier_collection.insert_one(document)
95-
96-
# Advance to the next time chunk
97-
start_date = end_date
58+
)
59+
if unit:
60+
buffer[existing_timestamp][
61+
f"{new_collections[collection_name]}_unit"
62+
] = unit
63+
break
64+
else:
65+
buffer[document["timestamp"]] = new_document
9866

67+
if not has_data: # If no data for the current chunk, stop the loop
68+
break
69+
70+
all_documents = sorted(buffer.values(), key=itemgetter("timestamp"))
71+
free_tier_collection = db["FreeTier"]
72+
73+
for document in all_documents:
74+
free_tier_collection.insert_one(document)
75+
76+
start_date = end_date # Move to the next chunk
9977

10078
db["Migration"].update_one({"migration_name": "FreeTier"}, {"$set": {"migration_complete": True}})

0 commit comments

Comments
 (0)