Skip to content
This repository was archived by the owner on Jun 18, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/cicd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ permissions:
contents: read

on:
workflow_dispatch:
push:
branches:
- main
Expand Down
28 changes: 25 additions & 3 deletions API_usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,21 @@ One you've logically grouped the datasets into collectionss, create dataset defi
],
"discovery_items": [
{
"discovery": "s3",
"cogify": false,
"upload": false,
"dry_run": false,
"prefix": "EIS/COG/LIS_GLOBAL_DA/Evap/",
"bucket": "veda-data-store-staging",
"filename_regex": "(.*)LIS_Evap_(.*).tif$",
"datetime_range": "day"
"id_regex": "li_global_da(?:x*)",
"datetime_range": "day",
"assets": {
"lis-evaporaevapotranspiration": {
"title": "Evapotranspiration - LIS 10km Global DA",
"description": "Gridded total evapotranspiration (in kg m-2 s-1) from 10km global LIS with assimilation",
"regex": ".*"
}
}
}
]
}
Expand All @@ -126,14 +133,29 @@ The following table describes what each of these fields mean:
| `temporal_extent["start_date"]` | the `start_date` of the dataset | iso datetime that ends in `Z` | `2002-08-02T00:00:00Z` |
| `temporal_extent["end_date"]` | the `end_date` of the dataset | iso datetime that ends in `Z` | `2021-12-01T00:00:00Z` |
| `sample_files` | a list of s3 urls for the sample files that go into the collection | | `[ "s3://veda-data-store-staging/no2-diff/no2-diff_201506.tif", "s3://veda-data-store-staging/no2-diff/no2-diff_201507.tif"]` |
| `discovery_items["discovery"]` | where to discover the data from; currently supported are s3 buckets and cmr | `s3` \| `cmr` | `s3` |
| `discovery_items["cogify"]` | does the file need to be converted to a cloud optimized geptiff (COG)? `false` if it is already a COG | `true` \| `false` | `false` |
| `discovery_items["upload"]` | does it need to be uploaded to the veda s3 bucket? `false` if it already exists in `veda-data-store-staging` | `true` \| `false` | `false` |
| `discovery_items["dry_run"]` | if set to `true`, the items will go through the pipeline, but won't actually publish to the stac catalog; useful for testing purposes | `true` \| `false` | `false` |
| `discovery_items["bucket"]` | the s3 bucket where the data is uploaded to | any bucket that the data pipelines has access to | `veda-data-store-staging` \| `climatedashboard-data` \| `{any-public-bucket}` | `veda-data-store-staging` |
| `discovery_items["prefix"]`| within the s3 bucket, the prefix or path to the "folder" where the data files exist | any valid path winthin the bucket | `EIS/COG/LIS_GLOBAL_DA/Evap/` |
| `discovery_items["filename_regex"]` | a common filename pattern that all the files in the collection follow | a valid regex expression | `(.*)LIS_Evap_(.*).cog.tif$` |
| `discovery_items["datetime_range"]` | based on the naming convention in [STEP I](#STEP I: Prepare the data), the datetime range to be extracted from the filename | `year` \| `month` \| `day` | `year` |
| `discovery_items["id_regex"]` | a regex that much include a group, this regex should match the relevant files and the group is how the files are grouped into a single item; example: if there are multiple files `ch4-emissions-agriculture_2015`, `ch4-emissions-human_2015`, providing `id_regex` `.*_(.*)$` will group these two files into the same item as different assets.
| `discovery_items["assets"]` | a dictionary of assets, where the `regex` determines the pattern of grouping into assets; example: if we have two files: `ch4-emissions-agriculture_2015`, `ch4-emissions-human_2015` and we want to add the first to the `agriculture-emission` asset and second to `human-emissions` asset, the `assets` would look like the following:
```json
{
"agriculture-emissions": {
"title": "CH4 emissions from agriculture",
"description": "CH4 emissions from agriculture",
"regex": ".*-agriculture_",
},
"agriculture-emissions": {
"title": "CH4 emissions from agriculture",
"description": "CH4 emissions from agriculture",
"regex": ".*-human_",
}
}
```

> Note: The steps after this are technical, so at this point the scientists can send the json to the VEDA POC and they'll handle the publication process. The plan is to make this directly available to the scientists in the future.

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# GHGC STAC Ingestor

> [!IMPORTANT]
> The US GHG Center has started using [veda-backend](https://github.com/NASA-IMPACT/veda-backend/) repository directly for its backend services, which includes the STAC ingestion services. Hence, this forked version of the veda-stac-ingestor repository is no longer maintained and so the repository is now archived.

This service provides an entry-point for users/services to add new records to our STAC database. Its primary functions are to 1) validate the STAC Items before insertion, 2) batch the insertions to reduce load on our STAC database.

STAC items are validated to ensure that:
Expand Down
2 changes: 1 addition & 1 deletion api/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ orjson>=3.6.8
psycopg[binary,pool]>=3.0.15
pydantic_ssm_settings>=0.2.0
pydantic>=1.9.0
pypgstac==0.7.9
pypgstac==0.7.10
python-multipart==0.0.5
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
python-multipart==0.0.5
python-multipart==0.0.7

requests>=2.27.1
s3fs==2023.3.0
Expand Down
14 changes: 6 additions & 8 deletions api/src/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,12 @@ def create_cog_collection(self, dataset: COGDataset) -> dict:
},
}
)
collection_stac["item_assets"] = {
"cog_default": {
"type": "image/tiff; application=geotiff; profile=cloud-optimized",
"roles": ["data", "layer"],
"title": "Default COG Layer",
"description": "Cloud optimized default layer to display on map",
}
}
collection_stac["item_assets"] = {}
for discovery in dataset.discovery_items:
for key, asset in discovery.assets.items():
collection_stac["item_assets"][key] = {
k: v for k, v in asset.dict().items() if k != "regex"
}
return collection_stac

def generate_stac(
Expand Down
13 changes: 1 addition & 12 deletions api/src/custom_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,7 @@ def update_collection_summaries(self, collection_id: str) -> None:
logger.info(f"Updating extents for collection: {collection_id}.")
cur.execute(
"""
UPDATE collections SET
content = content ||
jsonb_build_object(
'extent', jsonb_build_object(
'spatial', jsonb_build_object(
'bbox', collection_bbox(collections.id)
),
'temporal', jsonb_build_object(
'interval', collection_temporal_extent(collections.id)
)
)
)
UPDATE collections set content = content || pgstac.collection_extent(collections.id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For veda-backend we are overriding the default pgstac collection extent with a new user defined function that captures the max end datetime of items in a collection (when this matters is for annual items that have a nominal datetime of January 1 but you want the collection extent to include the full year so a November search will also intersect that item). https://github.com/NASA-IMPACT/veda-backend/blob/develop/database/runtime/handler.py#L260-L268

WHERE collections.id=%s;
""",
(collection_id,),
Expand Down
2 changes: 1 addition & 1 deletion api/src/doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@

## Workflow Executions
The workflow execution API is used to start a new workflow execution. The workflow
execution API accepts discovery from s3 or cmr.
execution API accepts discovery from s3.
To run a workflow execution, the user must provide the following information:

**For s3 discovery:**
Expand Down
4 changes: 1 addition & 3 deletions api/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,7 @@ def delete_collection(collection_id: str):
status_code=201,
)
async def start_workflow_execution(
input: Union[schemas.CmrInput, schemas.S3Input] = Body(
..., discriminator="discovery"
),
input: schemas.S3Input = Body(...),
) -> schemas.BaseResponse:
"""
Triggers the ingestion workflow
Expand Down
18 changes: 9 additions & 9 deletions api/src/schema_helpers.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
import enum
# Smaller utility models to support the larger models in schemas.py
from datetime import datetime
from typing import List, Union
from typing import List, Union, Optional

from pydantic import BaseModel, root_validator
from stac_pydantic.collection import Extent, TimeInterval

# Smaller utility models to support the larger models in schemas.py


class DiscoveryEnum(str, enum.Enum):
s3 = "s3"
cmr = "cmr"


class DatetimeInterval(TimeInterval):
# reimplement stac_pydantic's TimeInterval to leverage datetime types
Expand Down Expand Up @@ -54,3 +47,10 @@ def check_dates(cls, v):
if v["startdate"] >= v["enddate"]:
raise ValueError("Invalid extent - startdate must be before enddate")
return v


class DiscoveryItemAsset(BaseModel):
title: str
description: Optional[str]
roles: Optional[List[str]]
regex: str
37 changes: 16 additions & 21 deletions api/src/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
root_validator,
validator,
)
from src.schema_helpers import BboxExtent, SpatioTemporalExtent, TemporalExtent
from src.schema_helpers import (
BboxExtent,
SpatioTemporalExtent,
TemporalExtent,
DiscoveryItemAsset,
)
from stac_pydantic import Collection, Item, shared
from stac_pydantic.links import Link
from typing_extensions import Annotated

if TYPE_CHECKING:
from src import services
Expand Down Expand Up @@ -248,14 +252,16 @@ def exists(cls, collection):


class S3Input(WorkflowInputBase):
discovery: Literal["s3"]
prefix: str
bucket: str
filename_regex: str = r"[\s\S]*" # default to match all files in prefix
datetime_range: Optional[str]
start_datetime: Optional[datetime]
end_datetime: Optional[datetime]
single_datetime: Optional[datetime]
id_regex: Optional[str]
id_template: Optional[str]
assets: Dict[str, DiscoveryItemAsset]
zarr_store: Optional[str]

@root_validator
Expand All @@ -268,17 +274,11 @@ def object_is_accessible(cls, values):
)
return values


class CmrInput(WorkflowInputBase):
discovery: Literal["cmr"]
version: Optional[str]
include: Optional[str]
temporal: Optional[List[datetime]]
bounding_box: Optional[List[float]]


# allows the construction of models with a list of discriminated unions
ItemUnion = Annotated[Union[S3Input, CmrInput], Field(discriminator="discovery")]
@validator("assets", always=True, pre=True)
def item_assets_required(cls, assets):
if not assets:
raise ValueError("Specify at least one asset.")
return assets


class Dataset(BaseModel):
Expand All @@ -289,7 +289,7 @@ class Dataset(BaseModel):
is_periodic: Optional[bool] = False
time_density: Optional[str] = None
links: Optional[List[Link]] = []
discovery_items: List[ItemUnion]
discovery_items: List[S3Input]

# collection id must be all lowercase, with optional - delimiter
@validator("collection")
Expand All @@ -314,7 +314,7 @@ class DataType(str, enum.Enum):
class COGDataset(Dataset):
spatial_extent: BboxExtent
temporal_extent: TemporalExtent
sample_files: List[str] # unknown how this will work with CMR
sample_files: List[str]
data_type: Literal[DataType.cog]

@root_validator
Expand All @@ -325,17 +325,12 @@ def check_sample_files(cls, values):
if not (discovery_items := values.get("discovery_items")):
return

if "s3" not in [item.discovery for item in discovery_items]:
return values

# TODO cmr handling/validation
invalid_fnames = []
for fname in values.get("sample_files", []):
found_match = False
for item in discovery_items:
if all(
[
item.discovery == "s3",
re.search(item.filename_regex, fname.split("/")[-1]),
"/".join(fname.split("/")[3:]).startswith(item.prefix),
]
Expand Down
2 changes: 0 additions & 2 deletions api/tests/test_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"sample_files": ["s3://veda-data-store-staging/foo/bar.tif"],
"discovery_items": [
{
"discovery": "s3",
"cogify": False,
"upload": False,
"dry_run": True,
Expand Down Expand Up @@ -48,7 +47,6 @@
"sample_files": ["s3://veda-data-store-staging/foo/bar.tif"],
"discovery_items": [
{
"discovery": "s3",
"cogify": False,
"upload": False,
"dry_run": True,
Expand Down
4 changes: 2 additions & 2 deletions cdk/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def build_api_lambda(
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PUBLIC
if db_subnet_public
else ec2.SubnetType.PRIVATE_ISOLATED
else ec2.SubnetType.PRIVATE_WITH_EGRESS
),
allow_public_subnet=True,
memory_size=2048,
Expand Down Expand Up @@ -285,7 +285,7 @@ def build_ingestor(
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PUBLIC
if db_subnet_public
else ec2.SubnetType.PRIVATE_ISOLATED
else ec2.SubnetType.PRIVATE_WITH_EGRESS
),
allow_public_subnet=True,
memory_size=2048,
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
aws-cdk-lib==2.21.1
aws-cdk-lib==2.128.0
aws-cdk.aws-lambda-python-alpha==2.21.1a0
black
boto3-stubs[essential]
Expand All @@ -8,7 +8,7 @@ httpx
isort
moto[s3,dynamodb]
pytest
pydantic
pydantic==1.10.12
python-dotenv
python-multipart
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
python-multipart
python-multipart==0.0.7

pytest-mock
Expand Down