11
11
from opensensor .collections import (
12
12
CO2 ,
13
13
PH ,
14
+ VPD ,
14
15
DeviceMetadata ,
15
16
Environment ,
16
17
Humidity ,
@@ -140,20 +141,6 @@ def get_collection_name(response_model: Type[T]):
140
141
return response_model .__name__
141
142
142
143
143
- def _get_old_project_projection (response_model : Type [T ]):
144
- project_projection = {
145
- "_id" : False ,
146
- }
147
- for field_name , _ in response_model .__fields__ .items ():
148
- if field_name == "timestamp" :
149
- project_projection ["timestamp" ] = "$timestamp"
150
- if field_name == "unit" :
151
- project_projection ["unit" ] = "$metadata.unit"
152
- else :
153
- project_projection [field_name ] = f"${ field_name } "
154
- return project_projection
155
-
156
-
157
144
def _get_project_projection (response_model : Type [T ]):
158
145
old_name = get_collection_name (response_model )
159
146
new_collection_name = new_collections [old_name ]
@@ -170,37 +157,121 @@ def _get_project_projection(response_model: Type[T]):
170
157
return project_projection
171
158
172
159
173
- def get_uniform_sample_pipeline (
174
- response_model : Type [T ],
175
- device_ids : List [str ], # Update the type of the device_id parameter to List[str]
160
+ def get_initial_match_clause (
161
+ device_ids : List [str ],
176
162
device_name : str ,
177
163
start_date : datetime ,
178
164
end_date : datetime ,
179
165
resolution : int ,
180
- old_collection : bool ,
181
166
):
182
- sampling_interval = timedelta (minutes = resolution )
183
167
if start_date is None :
184
168
start_date = datetime .utcnow () - timedelta (days = 100 )
185
169
if end_date is None :
186
170
end_date = datetime .utcnow ()
187
171
172
+ # Defining the match clause for the pipeline
188
173
match_clause = {
189
174
"timestamp" : {"$gte" : start_date , "$lte" : end_date },
190
175
"metadata.device_id" : {
191
- "$in" : device_ids
192
- }, # Use $in operator for matching any device_id in the list
176
+ "$in" : device_ids # Use $in operator for matching any device_id in the list
177
+ },
193
178
"metadata.name" : device_name ,
194
179
}
180
+ return match_clause
181
+
182
+
183
+ def get_vpd_pipeline (
184
+ device_ids : List [str ],
185
+ device_name : str ,
186
+ start_date : datetime ,
187
+ end_date : datetime ,
188
+ resolution : int ,
189
+ ):
190
+ sampling_interval = timedelta (minutes = resolution )
191
+ match_clause = get_initial_match_clause (
192
+ device_ids , device_name , start_date , end_date , resolution
193
+ )
194
+
195
+ # We ensure both temperature and humidity exist for the calculation of VPD
196
+ match_clause ["temp" ] = {"$exists" : True }
197
+ match_clause ["rh" ] = {"$exists" : True }
198
+
199
+ # The MongoDB aggregation pipeline for VPD calculation
200
+ pipeline = [
201
+ {"$match" : match_clause },
202
+ {
203
+ "$addFields" : {
204
+ "group" : {
205
+ "$floor" : {
206
+ "$divide" : [
207
+ {"$subtract" : ["$timestamp" , start_date ]},
208
+ sampling_interval .total_seconds () * 1000 ,
209
+ ]
210
+ }
211
+ }
212
+ }
213
+ },
214
+ {
215
+ "$group" : {
216
+ "_id" : "$group" ,
217
+ "temp" : {"$avg" : "$temp" },
218
+ "rh" : {"$avg" : "$rh" },
219
+ "timestamp" : {"$first" : "$timestamp" },
220
+ }
221
+ },
222
+ {
223
+ "$addFields" : {
224
+ "satvp" : {
225
+ "$multiply" : [
226
+ 0.61078 ,
227
+ {
228
+ "$exp" : {
229
+ "$multiply" : [
230
+ {"$divide" : [17.27 , {"$add" : ["$temp" , 237.3 ]}]},
231
+ "$temp" ,
232
+ ]
233
+ }
234
+ },
235
+ ]
236
+ }
237
+ }
238
+ },
239
+ {
240
+ "$addFields" : {
241
+ "vpd" : {"$multiply" : ["$satvp" , {"$subtract" : [1 , {"$divide" : ["$rh" , 100 ]}]}]}
242
+ }
243
+ },
244
+ {
245
+ "$project" : {
246
+ "_id" : False ,
247
+ "timestamp" : 1 ,
248
+ "vpd" : 1 ,
249
+ "unit" : "kPa" , # assuming kPa as the unit for VPD
250
+ }
251
+ },
252
+ {"$sort" : {"timestamp" : 1 }},
253
+ ]
254
+ return pipeline
255
+
256
+
257
+ def get_uniform_sample_pipeline (
258
+ response_model : Type [T ],
259
+ device_ids : List [str ], # Update the type of the device_id parameter to List[str]
260
+ device_name : str ,
261
+ start_date : datetime ,
262
+ end_date : datetime ,
263
+ resolution : int ,
264
+ ):
265
+ sampling_interval = timedelta (minutes = resolution )
266
+ match_clause = get_initial_match_clause (
267
+ device_ids , device_name , start_date , end_date , resolution
268
+ )
195
269
196
270
# Determine the $project
197
- if old_collection :
198
- project_projection = _get_old_project_projection (response_model )
199
- else :
200
- old_name = get_collection_name (response_model )
201
- new_collection_name = new_collections [old_name ]
202
- project_projection = _get_project_projection (response_model )
203
- match_clause [new_collection_name ] = {"$exists" : True }
271
+ old_name = get_collection_name (response_model )
272
+ new_collection_name = new_collections [old_name ]
273
+ project_projection = _get_project_projection (response_model )
274
+ match_clause [new_collection_name ] = {"$exists" : True }
204
275
205
276
# Query a uniform sample of documents within the timestamp range
206
277
pipeline = [
@@ -234,6 +305,7 @@ def get_uniform_sample_pipeline(
234
305
"co2" : CO2 ,
235
306
"readings" : Moisture ,
236
307
"pH" : PH ,
308
+ "VPD" : VPD ,
237
309
}
238
310
model_class_attributes = {v : k for k , v in model_classes .items ()}
239
311
@@ -248,38 +320,57 @@ def sample_and_paginate_collection(
248
320
page : int ,
249
321
size : int ,
250
322
unit : str ,
251
- old_collection : bool ,
252
323
):
253
324
api_keys , _ = get_api_keys_by_device_id (device_id )
254
325
device_ids , target_device_name = reduce_api_keys_to_device_ids (api_keys , device_id )
255
326
offset = (page - 1 ) * size
256
- pipeline = get_uniform_sample_pipeline (
257
- response_model ,
258
- device_ids ,
259
- target_device_name ,
260
- start_date ,
261
- end_date ,
262
- resolution ,
263
- old_collection ,
264
- )
327
+
328
+ # Determine the right pipeline to use based on the response model
329
+ if response_model is VPD :
330
+ pipeline = get_vpd_pipeline (
331
+ device_ids ,
332
+ target_device_name ,
333
+ start_date ,
334
+ end_date ,
335
+ resolution ,
336
+ )
337
+ else :
338
+ pipeline = get_uniform_sample_pipeline (
339
+ response_model ,
340
+ device_ids ,
341
+ target_device_name ,
342
+ start_date ,
343
+ end_date ,
344
+ resolution ,
345
+ )
346
+
265
347
pipeline .extend ([{"$skip" : offset }, {"$limit" : size }])
266
348
267
349
db = get_open_sensor_db ()
268
350
collection = db [collection_name ]
269
351
raw_data = list (collection .aggregate (pipeline ))
352
+
270
353
# Add UTC offset to timestamp field
271
354
for item in raw_data :
272
355
item ["timestamp" ] = item ["timestamp" ].replace (tzinfo = timezone .utc ).isoformat ()
273
- data_field = model_class_attributes [response_model ]
274
- model_class = model_classes [data_field ]
275
- data = [model_class (** item ) for item in raw_data ]
276
- if data_field == "temp" and unit :
277
- for item in data :
278
- convert_temperature (item , unit )
356
+
357
+ if response_model is VPD :
358
+ # If the response model is VPD, you already have VPD-related data from the pipeline.
359
+ # So, you can directly use it to create the response model instances.
360
+ data = [VPD (** item ) for item in raw_data ]
361
+ else :
362
+ data_field = model_class_attributes [response_model ]
363
+ model_class = model_classes [data_field ]
364
+ data = [model_class (** item ) for item in raw_data ]
365
+ if data_field == "temp" and unit :
366
+ for item in data :
367
+ convert_temperature (item , unit )
368
+
279
369
# Re-run for total page count
280
370
pipeline .append ({"$count" : "total" })
281
371
data_count = list (collection .aggregate (pipeline ))
282
372
total_count = data_count [0 ]["total" ] if data else 0
373
+
283
374
return Page (items = data , total = total_count , page = page , size = size )
284
375
285
376
@@ -308,9 +399,6 @@ async def historical_data_route(
308
399
collection_name = user .collection_name
309
400
else :
310
401
collection_name = "FreeTier"
311
- migration_finished = migration_complete (collection_name )
312
- if not migration_finished :
313
- collection_name = get_collection_name (entity )
314
402
315
403
return sample_and_paginate_collection (
316
404
entity ,
@@ -322,7 +410,6 @@ async def historical_data_route(
322
410
page = page ,
323
411
size = size ,
324
412
unit = unit ,
325
- old_collection = not migration_finished ,
326
413
)
327
414
328
415
return historical_data_route
@@ -355,6 +442,12 @@ async def historical_data_route(
355
442
response_model = Page [PH ],
356
443
methods = ["GET" ],
357
444
)
445
+ router .add_api_route (
446
+ "/VPD/{device_id}" ,
447
+ create_historical_data_route (VPD ),
448
+ response_model = Page [VPD ],
449
+ methods = ["GET" ],
450
+ )
358
451
359
452
360
453
@router .post ("/environment/" )
0 commit comments