diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 447eb8c..79d83a4 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -5,6 +5,7 @@ permissions: contents: read on: + workflow_dispatch: push: branches: - main diff --git a/API_usage.md b/API_usage.md index b6282af..3389047 100644 --- a/API_usage.md +++ b/API_usage.md @@ -93,14 +93,21 @@ One you've logically grouped the datasets into collectionss, create dataset defi ], "discovery_items": [ { - "discovery": "s3", "cogify": false, "upload": false, "dry_run": false, "prefix": "EIS/COG/LIS_GLOBAL_DA/Evap/", "bucket": "veda-data-store-staging", "filename_regex": "(.*)LIS_Evap_(.*).tif$", - "datetime_range": "day" + "id_regex": "li_global_da(?:x*)", + "datetime_range": "day", + "assets": { + "lis-evaporaevapotranspiration": { + "title": "Evapotranspiration - LIS 10km Global DA", + "description": "Gridded total evapotranspiration (in kg m-2 s-1) from 10km global LIS with assimilation", + "regex": ".*" + } + } } ] } @@ -126,7 +133,6 @@ The following table describes what each of these fields mean: | `temporal_extent["start_date"]` | the `start_date` of the dataset | iso datetime that ends in `Z` | `2002-08-02T00:00:00Z` | | `temporal_extent["end_date"]` | the `end_date` of the dataset | iso datetime that ends in `Z` | `2021-12-01T00:00:00Z` | | `sample_files` | a list of s3 urls for the sample files that go into the collection | | `[ "s3://veda-data-store-staging/no2-diff/no2-diff_201506.tif", "s3://veda-data-store-staging/no2-diff/no2-diff_201507.tif"]` | -| `discovery_items["discovery"]` | where to discover the data from; currently supported are s3 buckets and cmr | `s3` \| `cmr` | `s3` | | `discovery_items["cogify"]` | does the file need to be converted to a cloud optimized geptiff (COG)? `false` if it is already a COG | `true` \| `false` | `false` | | `discovery_items["upload"]` | does it need to be uploaded to the veda s3 bucket? `false` if it already exists in `veda-data-store-staging` | `true` \| `false` | `false` | | `discovery_items["dry_run"]` | if set to `true`, the items will go through the pipeline, but won't actually publish to the stac catalog; useful for testing purposes | `true` \| `false` | `false` | @@ -134,6 +140,22 @@ The following table describes what each of these fields mean: | `discovery_items["prefix"]`| within the s3 bucket, the prefix or path to the "folder" where the data files exist | any valid path winthin the bucket | `EIS/COG/LIS_GLOBAL_DA/Evap/` | | `discovery_items["filename_regex"]` | a common filename pattern that all the files in the collection follow | a valid regex expression | `(.*)LIS_Evap_(.*).cog.tif$` | | `discovery_items["datetime_range"]` | based on the naming convention in [STEP I](#STEP I: Prepare the data), the datetime range to be extracted from the filename | `year` \| `month` \| `day` | `year` | +| `discovery_items["id_regex"]` | a regex that much include a group, this regex should match the relevant files and the group is how the files are grouped into a single item; example: if there are multiple files `ch4-emissions-agriculture_2015`, `ch4-emissions-human_2015`, providing `id_regex` `.*_(.*)$` will group these two files into the same item as different assets. +| `discovery_items["assets"]` | a dictionary of assets, where the `regex` determines the pattern of grouping into assets; example: if we have two files: `ch4-emissions-agriculture_2015`, `ch4-emissions-human_2015` and we want to add the first to the `agriculture-emission` asset and second to `human-emissions` asset, the `assets` would look like the following: +```json +{ + "agriculture-emissions": { + "title": "CH4 emissions from agriculture", + "description": "CH4 emissions from agriculture", + "regex": ".*-agriculture_", + }, + "agriculture-emissions": { + "title": "CH4 emissions from agriculture", + "description": "CH4 emissions from agriculture", + "regex": ".*-human_", + } +} +``` > Note: The steps after this are technical, so at this point the scientists can send the json to the VEDA POC and they'll handle the publication process. The plan is to make this directly available to the scientists in the future. diff --git a/README.md b/README.md index 6cbe994..7fc57e0 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,8 @@ # GHGC STAC Ingestor +> [!IMPORTANT] +> The US GHG Center has started using [veda-backend](https://github.com/NASA-IMPACT/veda-backend/) repository directly for its backend services, which includes the STAC ingestion services. Hence, this forked version of the veda-stac-ingestor repository is no longer maintained and so the repository is now archived. + This service provides an entry-point for users/services to add new records to our STAC database. Its primary functions are to 1) validate the STAC Items before insertion, 2) batch the insertions to reduce load on our STAC database. STAC items are validated to ensure that: diff --git a/api/requirements.txt b/api/requirements.txt index adeb2ca..55dc3bf 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -9,7 +9,7 @@ orjson>=3.6.8 psycopg[binary,pool]>=3.0.15 pydantic_ssm_settings>=0.2.0 pydantic>=1.9.0 -pypgstac==0.7.9 +pypgstac==0.7.10 python-multipart==0.0.5 requests>=2.27.1 s3fs==2023.3.0 diff --git a/api/src/collection.py b/api/src/collection.py index a1dd55f..a0389c3 100644 --- a/api/src/collection.py +++ b/api/src/collection.py @@ -125,14 +125,12 @@ def create_cog_collection(self, dataset: COGDataset) -> dict: }, } ) - collection_stac["item_assets"] = { - "cog_default": { - "type": "image/tiff; application=geotiff; profile=cloud-optimized", - "roles": ["data", "layer"], - "title": "Default COG Layer", - "description": "Cloud optimized default layer to display on map", - } - } + collection_stac["item_assets"] = {} + for discovery in dataset.discovery_items: + for key, asset in discovery.assets.items(): + collection_stac["item_assets"][key] = { + k: v for k, v in asset.dict().items() if k != "regex" + } return collection_stac def generate_stac( diff --git a/api/src/custom_loader.py b/api/src/custom_loader.py index 0acc707..92d7e83 100644 --- a/api/src/custom_loader.py +++ b/api/src/custom_loader.py @@ -30,18 +30,7 @@ def update_collection_summaries(self, collection_id: str) -> None: logger.info(f"Updating extents for collection: {collection_id}.") cur.execute( """ - UPDATE collections SET - content = content || - jsonb_build_object( - 'extent', jsonb_build_object( - 'spatial', jsonb_build_object( - 'bbox', collection_bbox(collections.id) - ), - 'temporal', jsonb_build_object( - 'interval', collection_temporal_extent(collections.id) - ) - ) - ) + UPDATE collections set content = content || pgstac.collection_extent(collections.id) WHERE collections.id=%s; """, (collection_id,), diff --git a/api/src/doc.py b/api/src/doc.py index 76108bf..1e9cfe6 100644 --- a/api/src/doc.py +++ b/api/src/doc.py @@ -104,7 +104,7 @@ ## Workflow Executions The workflow execution API is used to start a new workflow execution. The workflow - execution API accepts discovery from s3 or cmr. + execution API accepts discovery from s3. To run a workflow execution, the user must provide the following information: **For s3 discovery:** diff --git a/api/src/main.py b/api/src/main.py index 886a93a..9c4e17e 100644 --- a/api/src/main.py +++ b/api/src/main.py @@ -178,9 +178,7 @@ def delete_collection(collection_id: str): status_code=201, ) async def start_workflow_execution( - input: Union[schemas.CmrInput, schemas.S3Input] = Body( - ..., discriminator="discovery" - ), + input: schemas.S3Input = Body(...), ) -> schemas.BaseResponse: """ Triggers the ingestion workflow diff --git a/api/src/schema_helpers.py b/api/src/schema_helpers.py index d66a10a..ada941d 100644 --- a/api/src/schema_helpers.py +++ b/api/src/schema_helpers.py @@ -1,17 +1,10 @@ -import enum +# Smaller utility models to support the larger models in schemas.py from datetime import datetime -from typing import List, Union +from typing import List, Union, Optional from pydantic import BaseModel, root_validator from stac_pydantic.collection import Extent, TimeInterval -# Smaller utility models to support the larger models in schemas.py - - -class DiscoveryEnum(str, enum.Enum): - s3 = "s3" - cmr = "cmr" - class DatetimeInterval(TimeInterval): # reimplement stac_pydantic's TimeInterval to leverage datetime types @@ -54,3 +47,10 @@ def check_dates(cls, v): if v["startdate"] >= v["enddate"]: raise ValueError("Invalid extent - startdate must be before enddate") return v + + +class DiscoveryItemAsset(BaseModel): + title: str + description: Optional[str] + roles: Optional[List[str]] + regex: str diff --git a/api/src/schemas.py b/api/src/schemas.py index b026728..4b718a1 100644 --- a/api/src/schemas.py +++ b/api/src/schemas.py @@ -18,10 +18,14 @@ root_validator, validator, ) -from src.schema_helpers import BboxExtent, SpatioTemporalExtent, TemporalExtent +from src.schema_helpers import ( + BboxExtent, + SpatioTemporalExtent, + TemporalExtent, + DiscoveryItemAsset, +) from stac_pydantic import Collection, Item, shared from stac_pydantic.links import Link -from typing_extensions import Annotated if TYPE_CHECKING: from src import services @@ -248,7 +252,6 @@ def exists(cls, collection): class S3Input(WorkflowInputBase): - discovery: Literal["s3"] prefix: str bucket: str filename_regex: str = r"[\s\S]*" # default to match all files in prefix @@ -256,6 +259,9 @@ class S3Input(WorkflowInputBase): start_datetime: Optional[datetime] end_datetime: Optional[datetime] single_datetime: Optional[datetime] + id_regex: Optional[str] + id_template: Optional[str] + assets: Dict[str, DiscoveryItemAsset] zarr_store: Optional[str] @root_validator @@ -268,17 +274,11 @@ def object_is_accessible(cls, values): ) return values - -class CmrInput(WorkflowInputBase): - discovery: Literal["cmr"] - version: Optional[str] - include: Optional[str] - temporal: Optional[List[datetime]] - bounding_box: Optional[List[float]] - - -# allows the construction of models with a list of discriminated unions -ItemUnion = Annotated[Union[S3Input, CmrInput], Field(discriminator="discovery")] + @validator("assets", always=True, pre=True) + def item_assets_required(cls, assets): + if not assets: + raise ValueError("Specify at least one asset.") + return assets class Dataset(BaseModel): @@ -289,7 +289,7 @@ class Dataset(BaseModel): is_periodic: Optional[bool] = False time_density: Optional[str] = None links: Optional[List[Link]] = [] - discovery_items: List[ItemUnion] + discovery_items: List[S3Input] # collection id must be all lowercase, with optional - delimiter @validator("collection") @@ -314,7 +314,7 @@ class DataType(str, enum.Enum): class COGDataset(Dataset): spatial_extent: BboxExtent temporal_extent: TemporalExtent - sample_files: List[str] # unknown how this will work with CMR + sample_files: List[str] data_type: Literal[DataType.cog] @root_validator @@ -325,17 +325,12 @@ def check_sample_files(cls, values): if not (discovery_items := values.get("discovery_items")): return - if "s3" not in [item.discovery for item in discovery_items]: - return values - - # TODO cmr handling/validation invalid_fnames = [] for fname in values.get("sample_files", []): found_match = False for item in discovery_items: if all( [ - item.discovery == "s3", re.search(item.filename_regex, fname.split("/")[-1]), "/".join(fname.split("/")[3:]).startswith(item.prefix), ] diff --git a/api/tests/test_validators.py b/api/tests/test_validators.py index e4a1884..68cc68f 100644 --- a/api/tests/test_validators.py +++ b/api/tests/test_validators.py @@ -18,7 +18,6 @@ "sample_files": ["s3://veda-data-store-staging/foo/bar.tif"], "discovery_items": [ { - "discovery": "s3", "cogify": False, "upload": False, "dry_run": True, @@ -48,7 +47,6 @@ "sample_files": ["s3://veda-data-store-staging/foo/bar.tif"], "discovery_items": [ { - "discovery": "s3", "cogify": False, "upload": False, "dry_run": True, diff --git a/cdk/stack.py b/cdk/stack.py index eccaa5f..e0d2624 100644 --- a/cdk/stack.py +++ b/cdk/stack.py @@ -210,7 +210,7 @@ def build_api_lambda( vpc_subnets=ec2.SubnetSelection( subnet_type=ec2.SubnetType.PUBLIC if db_subnet_public - else ec2.SubnetType.PRIVATE_ISOLATED + else ec2.SubnetType.PRIVATE_WITH_EGRESS ), allow_public_subnet=True, memory_size=2048, @@ -285,7 +285,7 @@ def build_ingestor( vpc_subnets=ec2.SubnetSelection( subnet_type=ec2.SubnetType.PUBLIC if db_subnet_public - else ec2.SubnetType.PRIVATE_ISOLATED + else ec2.SubnetType.PRIVATE_WITH_EGRESS ), allow_public_subnet=True, memory_size=2048, diff --git a/requirements.txt b/requirements.txt index dd349b2..04a9a51 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -aws-cdk-lib==2.21.1 +aws-cdk-lib==2.128.0 aws-cdk.aws-lambda-python-alpha==2.21.1a0 black boto3-stubs[essential] @@ -8,7 +8,7 @@ httpx isort moto[s3,dynamodb] pytest -pydantic +pydantic==1.10.12 python-dotenv python-multipart pytest-mock