Skip to content
5 changes: 4 additions & 1 deletion backend/src/gee/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ def get_preprocessed_imagery(
image_collection = get_imagery(aoi, start_date, end_date)

# Preprocess the imagery
image_collection = image_collection.select(
"SCL", "B8", "B4") # Removing unused bands
mosaicked_collection = get_mosaicked_by_date_collection(image_collection)
clipped_collection = mosaicked_collection.map(lambda img: img.clip(aoi))
cloud_masked_collection = clipped_collection.map(lambda img: get_cloud_masked(img))
cloud_masked_collection = clipped_collection.map(
lambda img: get_cloud_masked(img))

return cloud_masked_collection

Expand Down
5 changes: 3 additions & 2 deletions backend/src/gee/ndvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ def get_ndvi_info(
).getInfo()

if len(timestamp_list) != len(index_value_list):
print(f"Timestamps: {len(timestamp_list)}, Values: {len(index_value_list)}")
raise ValueError(
"The lists of gee indexing values and timestamps do not have the same size"
"The lists of gee indexing values and timestamps do not have the same size" +
f"Timestamps: {len(timestamp_list)}, Values: {len(index_value_list)}"

)

# Convert the results to a list of dictionaries
Expand Down
539 changes: 539 additions & 0 deletions backend/src/gee/ndvi_cache.py

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions backend/src/routes/ndvi_router.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from datetime import datetime
from datetime import datetime, timezone

from fastapi import APIRouter, Query
from fastapi.responses import JSONResponse

from src.constants import (
AggregationMethod,
LocationName,
Expand All @@ -13,7 +12,7 @@
from src.utils.temporal import get_optimistic_rounding
from src.validation.models import NDVIResponse
from src.validation.utils import (
validate_timestamp_in_range,
validate_timestamp_in_range_of_S2_imagery,
validate_timestamp_start_date_before_end_date,
)

Expand All @@ -22,20 +21,22 @@

@ndvi_router.get("/ndvi", response_model=NDVIResponse)
async def get_temperature_data(
startDate: int = Query(..., description="Start date as UNIX timestamp in seconds"),
endDate: int = Query(..., description="End date as UNIX timestamp in seconds"),
startDate: int = Query(...,
description="Start date as UNIX timestamp in seconds"),
endDate: int = Query(...,
description="End date as UNIX timestamp in seconds"),
location: LocationName = Query(..., description="Location name"),
temporalResolution: TemporalResolution = Query(
..., description="Temporal resolution"
),
aggregation: AggregationMethod = Query(..., description="Aggregation method"),
aggregation: AggregationMethod = Query(...,
description="Aggregation method"),
):
validate_timestamp_in_range(startDate)
validate_timestamp_in_range(endDate)
validate_timestamp_start_date_before_end_date(startDate, endDate)

start_date_dt = datetime.utcfromtimestamp(startDate)
end_date_dt = datetime.utcfromtimestamp(endDate)
validate_timestamp_start_date_before_end_date(startDate, endDate)
validate_timestamp_in_range_of_S2_imagery(startDate, endDate)
start_date_dt = datetime.fromtimestamp(startDate, tz=timezone.utc)
end_date_dt = datetime.fromtimestamp(endDate, tz=timezone.utc)

rounded_start_date, rounded_end_date = get_optimistic_rounding(
start_date_dt, end_date_dt, temporalResolution
Expand All @@ -61,5 +62,4 @@ async def get_temperature_data(
"data": data,
}

print(response)
return JSONResponse(content=response)
250 changes: 175 additions & 75 deletions backend/src/service.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,106 @@
from datetime import datetime
from datetime import datetime, timezone, timedelta

import pandas as pd

from src.constants import AggregationMethod, LocationPolygon, TemporalResolution
from src.gee.index import get_preprocessed_imagery
from src.gee.ndvi import get_ndvi_info
from src.gee.ndvi_cache import ndvi_daily_cache
from typing import List, Dict, Union
import math


def aggregate_time_series(
ndvi_info: list[dict],
temporal_resolution,
aggregation_method,
start_date: datetime,
end_date: datetime,
):
# Generate a complete date range based on temporal resolution
if temporal_resolution == "DAILY":
date_range = pd.date_range(start=start_date, end=end_date, freq="D", tz="UTC")
elif temporal_resolution == "MONTHLY":
date_range = pd.date_range(start=start_date, end=end_date, freq="MS", tz="UTC")
else:
raise ValueError("Unsupported temporal resolution")

# Create a DataFrame with the full date range, initially filled with None
df = pd.DataFrame(index=date_range)
df.index.name = "timestamp"
df["value"] = None

# Convert ndvi_info to a DataFrame and set the index
if temporal_resolution == "MONTHLY":
for record in ndvi_info:
record["timestamp"] = record["timestamp"].replace(day=1)
info_df = pd.DataFrame(ndvi_info)
info_df["timestamp"] = pd.to_datetime(info_df["timestamp"], unit="s", utc=True)

# Align info_df to the temporal resolution
if temporal_resolution == "DAILY":
info_df["timestamp"] = info_df["timestamp"].dt.floor("D")
elif temporal_resolution == "MONTHLY":
info_df["timestamp"] = info_df["timestamp"].dt.to_period("M").dt.to_timestamp()

info_df.set_index("timestamp", inplace=True)

# Update the full DataFrame with actual NDVI values
df.loc[info_df.index, "value"] = info_df["value"]

# Resample the DataFrame based on the temporal resolution
resampled_df = (
df.resample("D") if temporal_resolution == "DAILY" else df.resample("M")
)

# Aggregate the resampled DataFrame based on the aggregation method, ignoring None values
if aggregation_method == "MEAN":
aggregated_df = resampled_df.mean()
elif aggregation_method == "MIN":
aggregated_df = resampled_df.min()
elif aggregation_method == "MAX":
aggregated_df = resampled_df.max()
elif aggregation_method == "MEDIAN":
aggregated_df = resampled_df.median()
def initialize_time_series(
time_series: List[Dict[str, Union[int, float]]],
temporal_resolution: TemporalResolution,
aggregation_method: AggregationMethod
) -> pd.DataFrame:
"""
Initializes a pandas DataFrame from a time series and applies temporal resolution and aggregation.

Parameters:
time_series (List[Dict[str, Union[int, float]]]): List of dicts with 'timestamp' and 'value'.
temporal_resolution (TemporalResolution): Temporal resolution, either DAILY or MONTHLY.
aggregation_method (AggregationMethod): Aggregation method to use if resolution is MONTHLY.

Returns:
pd.DataFrame: The resulting DataFrame with applied resolution and aggregation.
"""
# Check if the time_series is empty
if not time_series:
# Return an empty DataFrame with a datetime index and 'value' column in UTC
if temporal_resolution == TemporalResolution.MONTHLY:
empty_index = pd.date_range(
start="1970-01-01", periods=0, freq='MS', tz='UTC')
else:
empty_index = pd.date_range(
start="1970-01-01", periods=0, freq='D', tz='UTC')

return pd.DataFrame(index=empty_index, columns=['value'])

# Convert timestamps to datetime in UTC and create DataFrame
df = pd.DataFrame(time_series)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s', utc=True)
df.set_index('timestamp', inplace=True)

# Resample based on temporal resolution and apply aggregation if needed
if temporal_resolution == TemporalResolution.MONTHLY:
if aggregation_method == AggregationMethod.MEAN:
df = df.resample('MS').mean()
elif aggregation_method == AggregationMethod.MEDIAN:
df = df.resample('MS').median()
elif aggregation_method == AggregationMethod.MAX:
df = df.resample('MS').max()
elif aggregation_method == AggregationMethod.MIN:
df = df.resample('MS').min()
# If DAILY, do nothing as time series is already in daily format
return df


def fill_missing_dates(
df: pd.DataFrame,
start: datetime,
end: datetime,
temporal_resolution: TemporalResolution
) -> pd.DataFrame:
"""
Fills missing entries in the time series, adding NaN for missing days or months.

Parameters:
df (pd.DataFrame): Input DataFrame with timestamps as index.
start (datetime): Start datetime for filling.
end (datetime): End datetime for filling.
temporal_resolution (TemporalResolution): Temporal resolution, either DAILY or MONTHLY.

Returns:
pd.DataFrame: DataFrame with missing dates or months filled with NaN values.
"""
# Ensure start and end are in UTC
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
else:
raise ValueError("Unsupported aggregation method")

# Replace NaNs with None for final output consistency
aggregated_df = aggregated_df.where(pd.notnull(aggregated_df), None)
start = start.astimezone(timezone.utc)

# Convert the aggregated DataFrame back to a list of dicts with ISO format timestamps
aggregated_info = [
{"timestamp": record["timestamp"].isoformat(), "value": record["value"]}
for record in aggregated_df.reset_index().to_dict(orient="records")
]
if end.tzinfo is None:
end = end.replace(tzinfo=timezone.utc)
else:
end = end.astimezone(timezone.utc)

# Generate the complete date range based on the temporal resolution
if temporal_resolution == TemporalResolution.DAILY:
date_range = pd.date_range(start=start, end=end, freq='D', tz='UTC')
elif temporal_resolution == TemporalResolution.MONTHLY:
date_range = pd.date_range(start=start, end=end, freq='MS', tz='UTC')
# If the input DataFrame is empty, create a new one with NaNs for all dates in the range
if df.empty:
df = pd.DataFrame(index=date_range, columns=['value'])
df['value'] = None
else:
# Reindex to the complete date range, filling missing dates with NaN
df = df.reindex(date_range)

return aggregated_info
df.columns = ['value']
return df


def ndvi_service(
Expand All @@ -81,15 +110,86 @@ def ndvi_service(
start_date: datetime,
end_date: datetime,
):
masked_images = get_preprocessed_imagery(
LocationPolygon[location.value].value,
start_date,
end_date,
)
NDVI_time_series = get_ndvi_info(
masked_images, LocationPolygon[location.value].value
)
aggregated_data_dict = aggregate_time_series(
NDVI_time_series, temporal_resolution, aggregation_method, start_date, end_date
)
return aggregated_data_dict
# Temporary implementation of GEE Caching strategy
current_cache_end_date = datetime(
2024, 9, 29, tzinfo=timezone.utc)
if start_date < current_cache_end_date and end_date < current_cache_end_date: # current end of cache
cache_start_date = start_date
cache_end_date = end_date
processing_start_date = None
processing_end_date = None

elif start_date < current_cache_end_date and end_date > current_cache_end_date:
cache_start_date = start_date
cache_end_date = current_cache_end_date
processing_start_date = current_cache_end_date + timedelta(days=1)
processing_end_date = end_date

elif start_date > current_cache_end_date:
cache_start_date = None
cache_end_date = None
processing_start_date = start_date
processing_end_date = end_date

if processing_start_date:

masked_images = get_preprocessed_imagery(
LocationPolygon[location.value].value,
processing_start_date,
processing_end_date,
)
NDVI_time_series = get_ndvi_info(
masked_images, LocationPolygon[location.value].value
)

if cache_start_date:
cached_data_subset = get_cache_subset(cache_start_date, cache_end_date)

if processing_start_date and cache_start_date:
ndvi_data = cached_data_subset + NDVI_time_series
else:
ndvi_data = cached_data_subset if cache_start_date else NDVI_time_series

index_df = initialize_time_series(
ndvi_data, temporal_resolution, aggregation_method)

filled_df = fill_missing_dates(
index_df, start_date, end_date, temporal_resolution)

return convert_df_to_list(filled_df)


def get_cache_subset(start_date: datetime, end_date: datetime):
subset: list[dict] = []
for entry in ndvi_daily_cache:
if entry["timestamp"] >= int(start_date.timestamp()) and entry["timestamp"] <= int(end_date.timestamp()):
subset.append(entry)
return subset


def convert_df_to_list(df: pd.DataFrame) -> List[Dict[str, Union[int, float, None]]]:
"""
Converts a DataFrame with a datetime index back to a list of dictionaries in the original format.

Parameters:
df (pd.DataFrame): Input DataFrame with datetime index and a 'value' column.

Returns:
List[Dict[str, Union[int, float, None]]]: List of dictionaries with 'timestamp' in epoch format and 'value'.
"""
# Convert the DataFrame index to epoch timestamps and reset index
df_reset = df.reset_index()
df_reset['timestamp'] = df_reset['index'].astype(int) // 10**9
df_reset = df_reset.rename(columns={'value': 'value'})

# Convert to list of dictionaries
result = df_reset[['timestamp', 'value']].to_dict(orient='records')

# Convert NaN to None (needs to handle empty df as well)
for entry in result:
if entry['value'] is None:
entry['value'] = None
elif math.isnan(entry['value']):
entry['value'] = None

return result
Loading
Loading