Skip to content
This repository was archived by the owner on Apr 8, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 111 additions & 95 deletions packages/rise/rise/lib/covjson/covjson.py
Original file line number Diff line number Diff line change
@@ -1,88 +1,105 @@
# Copyright 2025 Lincoln Institute of Land Policy
# SPDX-License-Identifier: MIT

from datetime import datetime
import logging
from typing import Any, Tuple
from typing import Any, Tuple, cast

from com.helpers import EDRFieldsMapping, await_
from rise.lib.covjson.template import COVJSON_TEMPLATE
from rise.lib.covjson.types import (
CoverageCollection,
Coverage,
CoverageRange,
Parameter,
)

from covjson_pydantic.coverage import Coverage, CoverageCollection
from covjson_pydantic.parameter import Parameter
from covjson_pydantic.unit import Unit
from covjson_pydantic.observed_property import ObservedProperty
from rise.lib.cache import RISECache
from rise.lib.add_results import DataNeededForCovjson
from rise.lib.add_results import DataNeededForCovjson, ParameterWithResults
from covjson_pydantic.domain import Domain, Axes, ValuesAxis, DomainType
from rise.lib.covjson.types import CoverageCollectionDict
from covjson_pydantic.ndarray import NdArrayFloat
from covjson_pydantic.reference_system import (
ReferenceSystemConnectionObject,
ReferenceSystem,
)

LOGGER = logging.getLogger(__name__)


def _generate_coverage_item(
location_type: str,
coords: list[Any] | Tuple[float, float],
times: list[str | None],
paramToCoverage: dict[str, CoverageRange],
) -> Coverage:
# if it is a point it will have different geometry

if location_type == "Point":
# z = location_feature["attributes"]["elevation"]
x, y = coords[0], coords[1]

coverage_item: Coverage = {
"type": "Coverage",
"domainType": "PointSeries",
"domain": {
"type": "Domain",
"axes": {
"x": {"values": [x]},
"y": {"values": [y]},
"t": {"values": times},
},
},
"ranges": paramToCoverage,
}

else:
coverage_item: Coverage = {
"type": "Coverage",
"domainType": "PolygonSeries",
"domain": {
"type": "Domain",
"axes": {
"composite": {
"dataType": location_type,
"coordinates": ["x", "y"],
"values": [coords],
},
"t": {"values": times},
},
},
"ranges": paramToCoverage,
}

return coverage_item


class CovJSONBuilder:
"""A helper class for building CovJSON from a Rise JSON Response"""

def __init__(self, cache: RISECache):
self._cache = cache

def _insert_parameter_metadata(
def _generate_coverage_item(
self,
location_type: str,
coords: list[Any] | Tuple[float, float],
times: list[datetime],
naturalLanguageName: str,
param: ParameterWithResults,
) -> Coverage:
# if it is a point it will have different geometry
isPoint = location_type == "Point"
if isPoint:
x, y = coords[0], coords[1]

cov = Coverage(
type="Coverage",
domain=Domain(
type="Domain",
domainType=DomainType.point_series if isPoint else "PolygonSeries", # type: ignore
axes=Axes(
t=ValuesAxis(values=times),
x=ValuesAxis(values=[x]), # type: ignore Pyright says it is possibly unbound but this isn't possible since if it is a point it will have coords
y=ValuesAxis(values=[y]), # type: ignore
)
if isPoint
else Axes(
composite=ValuesAxis(
values=[tuple(coords)], dataType=location_type
),
t=ValuesAxis(values=times),
),
referencing=[
ReferenceSystemConnectionObject(
coordinates=["x", "y"],
system=ReferenceSystem(
type="GeographicCRS",
id="http://www.opengis.net/def/crs/OGC/1.3/CRS84",
),
),
ReferenceSystemConnectionObject(
coordinates=["t"],
system=ReferenceSystem(
type="TemporalRS",
id="http://www.opengis.net/def/crs/OGC/1.3/CRS84",
),
),
],
),
ranges={
naturalLanguageName: NdArrayFloat(
shape=[len(param.timeseriesResults)],
values=param.timeseriesResults,
axisNames=["t"],
dataType="float",
type="NdArray",
)
},
)
return cov

def _get_parameters(
self,
paramsToGeoJsonOutput: EDRFieldsMapping,
location_response: list[DataNeededForCovjson],
):
) -> dict[str, Parameter]:
relevant_parameters = []
for location in location_response:
for p in location.parameters:
relevant_parameters.append(p.parameterId)

paramNameToMetadata: dict[str, Parameter] = {}

params = {}
for param_id in relevant_parameters:
if param_id not in paramsToGeoJsonOutput:
LOGGER.error(
Expand All @@ -92,19 +109,20 @@ def _insert_parameter_metadata(

associatedData = paramsToGeoJsonOutput[param_id]

_param: Parameter = {
"type": "Parameter",
"description": {"en": associatedData["description"]},
"unit": {"symbol": associatedData["x-ogc-unit"]},
"observedProperty": {
"id": param_id,
"label": {"en": associatedData["title"]},
},
}
natural_language_name = associatedData["title"]
paramNameToMetadata[natural_language_name] = _param
naturalLanguageName = associatedData["title"]

return paramNameToMetadata
params[naturalLanguageName] = Parameter(
type="Parameter",
id=param_id,
unit=Unit(symbol=associatedData["x-ogc-unit"]),
observedProperty=ObservedProperty(
label={"en": associatedData["title"]},
id=param_id,
description={"en": associatedData["description"]},
),
)

return params

def _get_coverages(
self,
Expand All @@ -127,41 +145,39 @@ def _get_coverages(
"title"
]

range: dict[str, CoverageRange] = {
naturalLanguageName: {
"axisNames": ["t"],
"dataType": "float",
"shape": [len(param.timeseriesResults)],
"values": param.timeseriesResults,
"type": "NdArray",
}
}

coverage_item = _generate_coverage_item(
datesAsDatetimeObjs = [
datetime.fromisoformat(d) for d in param.timeseriesDates if d
]
coverage_item = self._generate_coverage_item(
location_feature.locationType,
location_feature.geometry,
param.timeseriesDates,
range,
datesAsDatetimeObjs,
naturalLanguageName,
param,
)

coverages.append(coverage_item)

return coverages

def fill_template(
def render(
self, location_response: list[DataNeededForCovjson]
) -> CoverageCollection:
templated_covjson: CoverageCollection = COVJSON_TEMPLATE
) -> CoverageCollectionDict:
paramIdToMetadata: EDRFieldsMapping = await_(
self._cache.get_or_fetch_parameters()
)
templated_covjson["coverages"] = self._get_coverages(
location_response, paramIdToMetadata
)
templated_covjson["parameters"] = self._insert_parameter_metadata(
coverages = self._get_coverages(location_response, paramIdToMetadata)
parameters = self._get_parameters(
paramIdToMetadata, location_response=location_response
)

# don't actually care about the pydantic model, we just want to use it to validate
# PydanticCoverageCollection.model_validate(templated_covjson)
return templated_covjson
# we have to cast since pydantic model dump returns a generic dict
# but we want to narrow it based on our known schema
return cast(
CoverageCollectionDict,
CoverageCollection(
coverages=coverages,
domainType=DomainType.point_series,
parameters=parameters,
).model_dump(by_alias=True, exclude_none=True),
)
40 changes: 0 additions & 40 deletions packages/rise/rise/lib/covjson/template.py

This file was deleted.

21 changes: 14 additions & 7 deletions packages/rise/rise/lib/covjson/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,38 @@

from typing import Literal, TypedDict

"""
All of these typeddicts are used for type hinting the result of
a pydantic model dump. They are thus not used for validation and
just for dev ux. They are prefixed with _Dict to distinguish them
from the pydantic models of the same name
"""

class Parameter(TypedDict):

class ParameterDict(TypedDict):
type: str
description: dict[str, str]
unit: dict
observedProperty: dict


class CoverageRange(TypedDict):
class CoverageRangeDict(TypedDict):
type: Literal["NdArray"]
dataType: Literal["float"]
axisNames: list[str]
shape: list[int]
values: list[float | None]


class Coverage(TypedDict):
class CoverageDict(TypedDict):
type: Literal["Coverage"]
domain: dict
ranges: dict[str, CoverageRange]
ranges: dict[str, CoverageRangeDict]
domainType: Literal["PolygonSeries", "PointSeries"]


class CoverageCollection(TypedDict):
class CoverageCollectionDict(TypedDict):
type: str
parameters: dict[str, Parameter]
parameters: dict[str, ParameterDict]
referencing: list
coverages: list[Coverage]
coverages: list[CoverageDict]
6 changes: 3 additions & 3 deletions packages/rise/rise/rise_edr.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def locations(
# if we are returning covjson we need to fetch the results and fill in the json
builder = LocationResultBuilder(cache=self.cache, base_response=response)
response_with_results = builder.load_results(time_filter=datetime_)
return CovJSONBuilder(self.cache).fill_template(response_with_results)
return CovJSONBuilder(self.cache).render(response_with_results)

def get_fields(self):
"""Get the list of all parameters (i.e. fields) that the user can filter by"""
Expand Down Expand Up @@ -135,7 +135,7 @@ def cube(

builder = LocationResultBuilder(cache=self.cache, base_response=response)
response_with_results = builder.load_results(time_filter=datetime_)
return CovJSONBuilder(self.cache).fill_template(response_with_results)
return CovJSONBuilder(self.cache).render(response_with_results)

@TRACER.start_as_current_span("area")
@BaseEDRProvider.register()
Expand Down Expand Up @@ -178,7 +178,7 @@ def area(

builder = LocationResultBuilder(cache=self.cache, base_response=response)
response_with_results = builder.load_results(time_filter=datetime_)
return CovJSONBuilder(self.cache).fill_template(response_with_results)
return CovJSONBuilder(self.cache).render(response_with_results)

@BaseEDRProvider.register()
def items(self, **kwargs):
Expand Down
4 changes: 2 additions & 2 deletions packages/snotel/snotel/lib/covjson_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
self.triplesToGeometry = triplesToGeometry
self.fieldsMapper = fieldsMapper

def _generate_parameter(self, triple: str, datastream: DataDTO):
def _generate_parameter(self, datastream: DataDTO):
"""Given a triple and an associated datastream, generate a covjson parameter that describes its properties and unit"""
assert datastream.stationElement
assert datastream.stationElement.elementCode
Expand Down Expand Up @@ -124,7 +124,7 @@ def render(self):
assert len(cov.ranges) == 1

id = self.fieldsMapper[datastream.stationElement.elementCode]["title"]
parameters[id] = self._generate_parameter(triple, datastream)
parameters[id] = self._generate_parameter(datastream)

covCol = CoverageCollection(
coverages=coverages,
Expand Down
Loading