1- from datetime import datetime
1+ from datetime import datetime , timezone , timedelta
22
33import pandas as pd
44
55from src .constants import AggregationMethod , LocationPolygon , TemporalResolution
66from src .gee .index import get_preprocessed_imagery
77from src .gee .ndvi import get_ndvi_info
8+ from src .gee .ndvi_cache import ndvi_daily_cache
9+ from typing import List , Dict , Union
10+ import math
811
912
10- def aggregate_time_series (
11- ndvi_info : list [dict ],
12- temporal_resolution ,
13- aggregation_method ,
14- start_date : datetime ,
15- end_date : datetime ,
16- ):
17- # Generate a complete date range based on temporal resolution
18- if temporal_resolution == "DAILY" :
19- date_range = pd .date_range (start = start_date , end = end_date , freq = "D" , tz = "UTC" )
20- elif temporal_resolution == "MONTHLY" :
21- date_range = pd .date_range (start = start_date , end = end_date , freq = "MS" , tz = "UTC" )
22- else :
23- raise ValueError ("Unsupported temporal resolution" )
24-
25- # Create a DataFrame with the full date range, initially filled with None
26- df = pd .DataFrame (index = date_range )
27- df .index .name = "timestamp"
28- df ["value" ] = None
29-
30- # Convert ndvi_info to a DataFrame and set the index
31- if temporal_resolution == "MONTHLY" :
32- for record in ndvi_info :
33- record ["timestamp" ] = record ["timestamp" ].replace (day = 1 )
34- info_df = pd .DataFrame (ndvi_info )
35- info_df ["timestamp" ] = pd .to_datetime (info_df ["timestamp" ], unit = "s" , utc = True )
36-
37- # Align info_df to the temporal resolution
38- if temporal_resolution == "DAILY" :
39- info_df ["timestamp" ] = info_df ["timestamp" ].dt .floor ("D" )
40- elif temporal_resolution == "MONTHLY" :
41- info_df ["timestamp" ] = info_df ["timestamp" ].dt .to_period ("M" ).dt .to_timestamp ()
42-
43- info_df .set_index ("timestamp" , inplace = True )
44-
45- # Update the full DataFrame with actual NDVI values
46- df .loc [info_df .index , "value" ] = info_df ["value" ]
47-
48- # Resample the DataFrame based on the temporal resolution
49- resampled_df = (
50- df .resample ("D" ) if temporal_resolution == "DAILY" else df .resample ("M" )
51- )
52-
53- # Aggregate the resampled DataFrame based on the aggregation method, ignoring None values
54- if aggregation_method == "MEAN" :
55- aggregated_df = resampled_df .mean ()
56- elif aggregation_method == "MIN" :
57- aggregated_df = resampled_df .min ()
58- elif aggregation_method == "MAX" :
59- aggregated_df = resampled_df .max ()
60- elif aggregation_method == "MEDIAN" :
61- aggregated_df = resampled_df .median ()
13+ def initialize_time_series (
14+ time_series : List [Dict [str , Union [int , float ]]],
15+ temporal_resolution : TemporalResolution ,
16+ aggregation_method : AggregationMethod
17+ ) -> pd .DataFrame :
18+ """
19+ Initializes a pandas DataFrame from a time series and applies temporal resolution and aggregation.
20+
21+ Parameters:
22+ time_series (List[Dict[str, Union[int, float]]]): List of dicts with 'timestamp' and 'value'.
23+ temporal_resolution (TemporalResolution): Temporal resolution, either DAILY or MONTHLY.
24+ aggregation_method (AggregationMethod): Aggregation method to use if resolution is MONTHLY.
25+
26+ Returns:
27+ pd.DataFrame: The resulting DataFrame with applied resolution and aggregation.
28+ """
29+ # Check if the time_series is empty
30+ if not time_series :
31+ # Return an empty DataFrame with a datetime index and 'value' column in UTC
32+ if temporal_resolution == TemporalResolution .MONTHLY :
33+ empty_index = pd .date_range (
34+ start = "1970-01-01" , periods = 0 , freq = 'MS' , tz = 'UTC' )
35+ else :
36+ empty_index = pd .date_range (
37+ start = "1970-01-01" , periods = 0 , freq = 'D' , tz = 'UTC' )
38+
39+ return pd .DataFrame (index = empty_index , columns = ['value' ])
40+
41+ # Convert timestamps to datetime in UTC and create DataFrame
42+ df = pd .DataFrame (time_series )
43+ df ['timestamp' ] = pd .to_datetime (df ['timestamp' ], unit = 's' , utc = True )
44+ df .set_index ('timestamp' , inplace = True )
45+
46+ # Resample based on temporal resolution and apply aggregation if needed
47+ if temporal_resolution == TemporalResolution .MONTHLY :
48+ if aggregation_method == AggregationMethod .MEAN :
49+ df = df .resample ('MS' ).mean ()
50+ elif aggregation_method == AggregationMethod .MEDIAN :
51+ df = df .resample ('MS' ).median ()
52+ elif aggregation_method == AggregationMethod .MAX :
53+ df = df .resample ('MS' ).max ()
54+ elif aggregation_method == AggregationMethod .MIN :
55+ df = df .resample ('MS' ).min ()
56+ # If DAILY, do nothing as time series is already in daily format
57+ return df
58+
59+
60+ def fill_missing_dates (
61+ df : pd .DataFrame ,
62+ start : datetime ,
63+ end : datetime ,
64+ temporal_resolution : TemporalResolution
65+ ) -> pd .DataFrame :
66+ """
67+ Fills missing entries in the time series, adding NaN for missing days or months.
68+
69+ Parameters:
70+ df (pd.DataFrame): Input DataFrame with timestamps as index.
71+ start (datetime): Start datetime for filling.
72+ end (datetime): End datetime for filling.
73+ temporal_resolution (TemporalResolution): Temporal resolution, either DAILY or MONTHLY.
74+
75+ Returns:
76+ pd.DataFrame: DataFrame with missing dates or months filled with NaN values.
77+ """
78+ # Ensure start and end are in UTC
79+ if start .tzinfo is None :
80+ start = start .replace (tzinfo = timezone .utc )
6281 else :
63- raise ValueError ("Unsupported aggregation method" )
64-
65- # Replace NaNs with None for final output consistency
66- aggregated_df = aggregated_df .where (pd .notnull (aggregated_df ), None )
82+ start = start .astimezone (timezone .utc )
6783
68- # Convert the aggregated DataFrame back to a list of dicts with ISO format timestamps
69- aggregated_info = [
70- {"timestamp" : record ["timestamp" ].isoformat (), "value" : record ["value" ]}
71- for record in aggregated_df .reset_index ().to_dict (orient = "records" )
72- ]
84+ if end .tzinfo is None :
85+ end = end .replace (tzinfo = timezone .utc )
86+ else :
87+ end = end .astimezone (timezone .utc )
88+
89+ # Generate the complete date range based on the temporal resolution
90+ if temporal_resolution == TemporalResolution .DAILY :
91+ date_range = pd .date_range (start = start , end = end , freq = 'D' , tz = 'UTC' )
92+ elif temporal_resolution == TemporalResolution .MONTHLY :
93+ date_range = pd .date_range (start = start , end = end , freq = 'MS' , tz = 'UTC' )
94+ # If the input DataFrame is empty, create a new one with NaNs for all dates in the range
95+ if df .empty :
96+ df = pd .DataFrame (index = date_range , columns = ['value' ])
97+ df ['value' ] = None
98+ else :
99+ # Reindex to the complete date range, filling missing dates with NaN
100+ df = df .reindex (date_range )
73101
74- return aggregated_info
102+ df .columns = ['value' ]
103+ return df
75104
76105
77106def ndvi_service (
@@ -81,15 +110,86 @@ def ndvi_service(
81110 start_date : datetime ,
82111 end_date : datetime ,
83112):
84- masked_images = get_preprocessed_imagery (
85- LocationPolygon [location .value ].value ,
86- start_date ,
87- end_date ,
88- )
89- NDVI_time_series = get_ndvi_info (
90- masked_images , LocationPolygon [location .value ].value
91- )
92- aggregated_data_dict = aggregate_time_series (
93- NDVI_time_series , temporal_resolution , aggregation_method , start_date , end_date
94- )
95- return aggregated_data_dict
113+ # Temporary implementation of GEE Caching strategy
114+ current_cache_end_date = datetime (
115+ 2024 , 9 , 29 , tzinfo = timezone .utc )
116+ if start_date < current_cache_end_date and end_date < current_cache_end_date : # current end of cache
117+ cache_start_date = start_date
118+ cache_end_date = end_date
119+ processing_start_date = None
120+ processing_end_date = None
121+
122+ elif start_date < current_cache_end_date and end_date > current_cache_end_date :
123+ cache_start_date = start_date
124+ cache_end_date = current_cache_end_date
125+ processing_start_date = current_cache_end_date + timedelta (days = 1 )
126+ processing_end_date = end_date
127+
128+ elif start_date > current_cache_end_date :
129+ cache_start_date = None
130+ cache_end_date = None
131+ processing_start_date = start_date
132+ processing_end_date = end_date
133+
134+ if processing_start_date :
135+
136+ masked_images = get_preprocessed_imagery (
137+ LocationPolygon [location .value ].value ,
138+ processing_start_date ,
139+ processing_end_date ,
140+ )
141+ NDVI_time_series = get_ndvi_info (
142+ masked_images , LocationPolygon [location .value ].value
143+ )
144+
145+ if cache_start_date :
146+ cached_data_subset = get_cache_subset (cache_start_date , cache_end_date )
147+
148+ if processing_start_date and cache_start_date :
149+ ndvi_data = cached_data_subset + NDVI_time_series
150+ else :
151+ ndvi_data = cached_data_subset if cache_start_date else NDVI_time_series
152+
153+ index_df = initialize_time_series (
154+ ndvi_data , temporal_resolution , aggregation_method )
155+
156+ filled_df = fill_missing_dates (
157+ index_df , start_date , end_date , temporal_resolution )
158+
159+ return convert_df_to_list (filled_df )
160+
161+
162+ def get_cache_subset (start_date : datetime , end_date : datetime ):
163+ subset : list [dict ] = []
164+ for entry in ndvi_daily_cache :
165+ if entry ["timestamp" ] >= int (start_date .timestamp ()) and entry ["timestamp" ] <= int (end_date .timestamp ()):
166+ subset .append (entry )
167+ return subset
168+
169+
170+ def convert_df_to_list (df : pd .DataFrame ) -> List [Dict [str , Union [int , float , None ]]]:
171+ """
172+ Converts a DataFrame with a datetime index back to a list of dictionaries in the original format.
173+
174+ Parameters:
175+ df (pd.DataFrame): Input DataFrame with datetime index and a 'value' column.
176+
177+ Returns:
178+ List[Dict[str, Union[int, float, None]]]: List of dictionaries with 'timestamp' in epoch format and 'value'.
179+ """
180+ # Convert the DataFrame index to epoch timestamps and reset index
181+ df_reset = df .reset_index ()
182+ df_reset ['timestamp' ] = df_reset ['index' ].astype (int ) // 10 ** 9
183+ df_reset = df_reset .rename (columns = {'value' : 'value' })
184+
185+ # Convert to list of dictionaries
186+ result = df_reset [['timestamp' , 'value' ]].to_dict (orient = 'records' )
187+
188+ # Convert NaN to None (needs to handle empty df as well)
189+ for entry in result :
190+ if entry ['value' ] is None :
191+ entry ['value' ] = None
192+ elif math .isnan (entry ['value' ]):
193+ entry ['value' ] = None
194+
195+ return result
0 commit comments