22
33import pandas as pd
44
5- from src .constants import AggregationMethod , LocationPolygon , TemporalResolution , IndexType
5+ from src .constants import (
6+ AggregationMethod ,
7+ LocationPolygon ,
8+ TemporalResolution ,
9+ IndexType ,
10+ )
611from src .gee .image_preprocessing import get_preprocessed_imagery
712from src .gee .sat_index_info import get_sat_index_info
813from src .gee .ndvi_cache import ndvi_daily_cache
1318def initialize_time_series (
1419 time_series : List [Dict [str , Union [int , float ]]],
1520 temporal_resolution : TemporalResolution ,
16- aggregation_method : AggregationMethod
21+ aggregation_method : AggregationMethod ,
1722) -> pd .DataFrame :
1823 """
1924 Initializes a pandas DataFrame from a time series and applies temporal resolution and aggregation.
@@ -31,28 +36,30 @@ def initialize_time_series(
3136 # Return an empty DataFrame with a datetime index and 'value' column in UTC
3237 if temporal_resolution == TemporalResolution .MONTHLY :
3338 empty_index = pd .date_range (
34- start = "1970-01-01" , periods = 0 , freq = 'MS' , tz = 'UTC' )
39+ start = "1970-01-01" , periods = 0 , freq = "MS" , tz = "UTC"
40+ )
3541 else :
3642 empty_index = pd .date_range (
37- start = "1970-01-01" , periods = 0 , freq = 'D' , tz = 'UTC' )
43+ start = "1970-01-01" , periods = 0 , freq = "D" , tz = "UTC"
44+ )
3845
39- return pd .DataFrame (index = empty_index , columns = [' value' ])
46+ return pd .DataFrame (index = empty_index , columns = [" value" ])
4047
4148 # Convert timestamps to datetime in UTC and create DataFrame
4249 df = pd .DataFrame (time_series )
43- df [' timestamp' ] = pd .to_datetime (df [' timestamp' ], unit = 's' , utc = True )
44- df .set_index (' timestamp' , inplace = True )
50+ df [" timestamp" ] = pd .to_datetime (df [" timestamp" ], unit = "s" , utc = True )
51+ df .set_index (" timestamp" , inplace = True )
4552
4653 # Resample based on temporal resolution and apply aggregation if needed
4754 if temporal_resolution == TemporalResolution .MONTHLY :
4855 if aggregation_method == AggregationMethod .MEAN :
49- df = df .resample ('MS' ).mean ()
56+ df = df .resample ("MS" ).mean ()
5057 elif aggregation_method == AggregationMethod .MEDIAN :
51- df = df .resample ('MS' ).median ()
58+ df = df .resample ("MS" ).median ()
5259 elif aggregation_method == AggregationMethod .MAX :
53- df = df .resample ('MS' ).max ()
60+ df = df .resample ("MS" ).max ()
5461 elif aggregation_method == AggregationMethod .MIN :
55- df = df .resample ('MS' ).min ()
62+ df = df .resample ("MS" ).min ()
5663 # If DAILY, do nothing as time series is already in daily format
5764 return df
5865
@@ -61,7 +68,7 @@ def fill_missing_dates(
6168 df : pd .DataFrame ,
6269 start : datetime ,
6370 end : datetime ,
64- temporal_resolution : TemporalResolution
71+ temporal_resolution : TemporalResolution ,
6572) -> pd .DataFrame :
6673 """
6774 Fills missing entries in the time series, adding NaN for missing days or months.
@@ -88,18 +95,18 @@ def fill_missing_dates(
8895
8996 # Generate the complete date range based on the temporal resolution
9097 if temporal_resolution == TemporalResolution .DAILY :
91- date_range = pd .date_range (start = start , end = end , freq = 'D' , tz = ' UTC' )
98+ date_range = pd .date_range (start = start , end = end , freq = "D" , tz = " UTC" )
9299 elif temporal_resolution == TemporalResolution .MONTHLY :
93- date_range = pd .date_range (start = start , end = end , freq = 'MS' , tz = ' UTC' )
100+ date_range = pd .date_range (start = start , end = end , freq = "MS" , tz = " UTC" )
94101 # If the input DataFrame is empty, create a new one with NaNs for all dates in the range
95102 if df .empty :
96- df = pd .DataFrame (index = date_range , columns = [' value' ])
97- df [' value' ] = None
103+ df = pd .DataFrame (index = date_range , columns = [" value" ])
104+ df [" value" ] = None
98105 else :
99106 # Reindex to the complete date range, filling missing dates with NaN
100107 df = df .reindex (date_range )
101108
102- df .columns = [' value' ]
109+ df .columns = [" value" ]
103110 return df
104111
105112
@@ -109,31 +116,40 @@ def sat_index_service(
109116 aggregation_method : AggregationMethod ,
110117 start_date : datetime ,
111118 end_date : datetime ,
112- index_type : IndexType
119+ index_type : IndexType ,
113120):
114121 # Temporary implementation of GEE Caching strategy
115122 current_cache_end_date = datetime (
116- 2024 , 9 , 29 , tzinfo = timezone .utc )
117- if start_date < current_cache_end_date and end_date < current_cache_end_date : # current end of cache
123+ 2024 , 9 , 29 , tzinfo = timezone .utc
124+ ) # current end of cache
125+
126+ # Entire range is within the cache,
127+ # get entire range from cache, process nothing.
128+ if start_date < current_cache_end_date and end_date < current_cache_end_date :
118129 cache_start_date = start_date
119130 cache_end_date = end_date
120131 processing_start_date = None
121132 processing_end_date = None
122133
134+ # Partial overlap with the cache,
135+ # get cached part from cache, process the rest until end of range.
123136 elif start_date < current_cache_end_date and end_date > current_cache_end_date :
124137 cache_start_date = start_date
125138 cache_end_date = current_cache_end_date
126139 processing_start_date = current_cache_end_date + timedelta (days = 1 )
127140 processing_end_date = end_date
128141
142+ # Entire range is outside the cache,
143+ # get nothing from cache, process entire range.
129144 elif start_date > current_cache_end_date :
130145 cache_start_date = None
131146 cache_end_date = None
132147 processing_start_date = start_date
133148 processing_end_date = end_date
134149
150+ # Get and process uncached range
135151 if processing_start_date :
136-
152+ print ( f'Getting { processing_start_date . date () } to { processing_end_date . date () } from GEE.' )
137153 masked_images = get_preprocessed_imagery (
138154 LocationPolygon [location .value ].value ,
139155 processing_start_date ,
@@ -143,7 +159,9 @@ def sat_index_service(
143159 masked_images , LocationPolygon [location .value ].value , index_type
144160 )
145161
162+ # Get cached range
146163 if cache_start_date :
164+ print (f'Getting { cache_start_date .date ()} to { cache_end_date .date ()} from cache.' )
147165 cached_data_subset = get_cache_subset (cache_start_date , cache_end_date )
148166
149167 if processing_start_date and cache_start_date :
@@ -152,18 +170,20 @@ def sat_index_service(
152170 ndvi_data = cached_data_subset if cache_start_date else NDVI_time_series
153171
154172 index_df = initialize_time_series (
155- ndvi_data , temporal_resolution , aggregation_method )
173+ ndvi_data , temporal_resolution , aggregation_method
174+ )
156175
157- filled_df = fill_missing_dates (
158- index_df , start_date , end_date , temporal_resolution )
176+ filled_df = fill_missing_dates (index_df , start_date , end_date , temporal_resolution )
159177
160178 return convert_df_to_list (filled_df )
161179
162180
163181def get_cache_subset (start_date : datetime , end_date : datetime ):
164182 subset : list [dict ] = []
165183 for entry in ndvi_daily_cache :
166- if entry ["timestamp" ] >= int (start_date .timestamp ()) and entry ["timestamp" ] <= int (end_date .timestamp ()):
184+ if entry ["timestamp" ] >= int (start_date .timestamp ()) and entry [
185+ "timestamp"
186+ ] <= int (end_date .timestamp ()):
167187 subset .append (entry )
168188 return subset
169189
@@ -180,17 +200,17 @@ def convert_df_to_list(df: pd.DataFrame) -> List[Dict[str, Union[int, float, Non
180200 """
181201 # Convert the DataFrame index to epoch timestamps and reset index
182202 df_reset = df .reset_index ()
183- df_reset [' timestamp' ] = df_reset [' index' ].astype (int ) // 10 ** 9
184- df_reset = df_reset .rename (columns = {' value' : ' value' })
203+ df_reset [" timestamp" ] = df_reset [" index" ].astype (int ) // 10 ** 9
204+ df_reset = df_reset .rename (columns = {" value" : " value" })
185205
186206 # Convert to list of dictionaries
187- result = df_reset [[' timestamp' , ' value' ]].to_dict (orient = ' records' )
207+ result = df_reset [[" timestamp" , " value" ]].to_dict (orient = " records" )
188208
189209 # Convert NaN to None (needs to handle empty df as well)
190210 for entry in result :
191- if entry [' value' ] is None :
192- entry [' value' ] = None
193- elif math .isnan (entry [' value' ]):
194- entry [' value' ] = None
211+ if entry [" value" ] is None :
212+ entry [" value" ] = None
213+ elif math .isnan (entry [" value" ]):
214+ entry [" value" ] = None
195215
196216 return result
0 commit comments