18
18
one_day = timedelta (days = 1 )
19
19
20
20
21
- # Function to create a composite key
22
- def create_key (timestamp , metadata ):
23
- return f"{ timestamp } _{ metadata ['device_id' ]} _{ metadata .get ('name' , 'NA' )} _{ metadata .get ('user_id' , 'NA' )} "
21
+ TIME_WINDOW = timedelta (seconds = 3 )
22
+
23
+
24
+ def find_nearby_key (timestamp , metadata ):
25
+ for (key_time , key_device_id , key_name , key_user_id ) in buffer .keys ():
26
+ if (
27
+ key_device_id == metadata ["device_id" ]
28
+ and key_name == metadata .get ("name" , "NA" )
29
+ and key_user_id == metadata .get ("user_id" , "NA" )
30
+ and abs (key_time - timestamp ) <= TIME_WINDOW
31
+ ):
32
+ return (key_time , key_device_id , key_name , key_user_id )
33
+ return None
24
34
25
35
26
36
while start_date <= datetime (2023 , 11 , 10 ):
@@ -33,18 +43,27 @@ def create_key(timestamp, metadata):
33
43
collection = db [collection_name ]
34
44
for document in collection .find ({"timestamp" : {"$gte" : start_date , "$lt" : end_date }}):
35
45
unit = document ["metadata" ].get ("unit" )
36
- key = create_key (document ["timestamp" ], document ["metadata" ])
37
46
38
- # This will check if a buffer entry exists for the given timestamp and metadata
39
- # If it doesn't exist, it initializes a new dictionary for it
47
+ key = (
48
+ document ["timestamp" ],
49
+ document ["metadata" ]["device_id" ],
50
+ document ["metadata" ].get ("name" , "NA" ),
51
+ document ["metadata" ].get ("user_id" , "NA" ),
52
+ )
53
+ nearby_key = find_nearby_key (document ["timestamp" ], document ["metadata" ])
54
+
55
+ if nearby_key :
56
+ key = nearby_key
57
+
58
+ # Initialize the key if it doesn't exist yet in the buffer
40
59
if key not in buffer :
41
60
buffer [key ] = {
42
61
"metadata" : {
43
62
"device_id" : document ["metadata" ]["device_id" ],
44
63
"name" : document ["metadata" ].get ("name" ),
45
64
"user_id" : document ["metadata" ].get ("user_id" ),
46
65
},
47
- "timestamp" : document [ "timestamp" ],
66
+ "timestamp" : key [ 0 ], # first part of the key is the timestamp
48
67
}
49
68
50
69
buffer [key ][new_collections [collection_name ]] = document .get (
@@ -55,11 +74,9 @@ def create_key(timestamp, metadata):
55
74
56
75
all_documents = sorted (buffer .values (), key = itemgetter ("timestamp" ))
57
76
58
- # Insert the batch of documents for the current day
59
77
if all_documents :
60
78
db ["FreeTier" ].insert_many (all_documents )
61
79
62
- # Move to the next day
63
80
start_date = end_date
64
81
65
82
db ["Migration" ].update_one ({"migration_name" : "FreeTier" }, {"$set" : {"migration_complete" : True }})
0 commit comments