Skip to content

Commit 74c51d3

Browse files
author
Tom Augspurger
authored
[core]: Models, containers for items & events (#169)
* [core]: Models, containers for items & events In preparation for low-latency, we're adding / updating a handful of models, some of which are persisted to Cosmos DB. This adds the pydantic models for that.
1 parent 3c00c5d commit 74c51d3

File tree

10 files changed

+381
-3
lines changed

10 files changed

+381
-3
lines changed

deployment/terraform/resources/cosmosdb.tf

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,24 @@ resource "azurerm_cosmosdb_sql_container" "leases" {
7979
database_name = azurerm_cosmosdb_sql_database.pctasks.name
8080
partition_key_path = "/id"
8181
partition_key_version = 1
82-
}
82+
}
83+
84+
## Storage Events (for low-latency ingest)
85+
86+
resource "azurerm_cosmosdb_sql_container" "storage-events" {
87+
name = "storage-events"
88+
resource_group_name = data.azurerm_cosmosdb_account.pctasks.resource_group_name
89+
account_name = data.azurerm_cosmosdb_account.pctasks.name
90+
database_name = azurerm_cosmosdb_sql_database.pctasks.name
91+
partition_key_path = "/id"
92+
}
93+
94+
## Items (for source of truth and low-latency ingest)
95+
96+
resource "azurerm_cosmosdb_sql_container" "items" {
97+
name = "items"
98+
resource_group_name = data.azurerm_cosmosdb_account.pctasks.resource_group_name
99+
account_name = data.azurerm_cosmosdb_account.pctasks.name
100+
database_name = azurerm_cosmosdb_sql_database.pctasks.name
101+
partition_key_path = "/stac_id"
102+
}

pctasks/core/pctasks/core/cosmos/container.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
)
3636
from pctasks.core.cosmos.page import Page
3737
from pctasks.core.cosmos.settings import CosmosDBSettings
38-
from pctasks.core.models.run import Record
38+
from pctasks.core.models.record import Record
3939
from pctasks.core.models.utils import tzutc_now
4040
from pctasks.core.utils import grouped
4141
from pctasks.core.utils.backoff import BackoffStrategy, with_backoff, with_backoff_async
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from typing import Dict, Optional, Type, TypeVar
2+
3+
from pydantic import BaseModel
4+
5+
from pctasks.core.cosmos.container import (
6+
ContainerOperation,
7+
CosmosDBContainer,
8+
CosmosDBDatabase,
9+
TriggerType,
10+
)
11+
from pctasks.core.cosmos.settings import CosmosDBSettings
12+
from pctasks.core.models.item import ItemUpdatedRecord, StacItemRecord
13+
14+
# Records that this container can hold
15+
T = TypeVar(
16+
"T",
17+
StacItemRecord,
18+
ItemUpdatedRecord,
19+
)
20+
21+
PARTITION_KEY = "/stac_id"
22+
23+
STORED_PROCEDURES: Dict[ContainerOperation, Dict[Type[BaseModel], str]] = {}
24+
25+
TRIGGERS: Dict[ContainerOperation, Dict[TriggerType, str]] = {}
26+
27+
28+
class ItemsContainer(CosmosDBContainer[T]):
29+
def __init__(
30+
self,
31+
model_type: Type[T],
32+
db: Optional[CosmosDBDatabase] = None,
33+
settings: Optional[CosmosDBSettings] = None,
34+
) -> None:
35+
super().__init__(
36+
lambda settings: settings.get_items_container_name(),
37+
PARTITION_KEY,
38+
model_type=model_type, # type: ignore[arg-type]
39+
db=db,
40+
settings=settings,
41+
stored_procedures=STORED_PROCEDURES,
42+
triggers=TRIGGERS,
43+
)
44+
45+
def get_partition_key(self, model: T) -> str:
46+
return model.stac_id
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from typing import Dict, Optional, Type
2+
3+
from pydantic import BaseModel
4+
5+
from pctasks.core.cosmos.container import (
6+
ContainerOperation,
7+
CosmosDBContainer,
8+
CosmosDBDatabase,
9+
TriggerType,
10+
)
11+
from pctasks.core.cosmos.settings import CosmosDBSettings
12+
from pctasks.core.models.event import StorageEventRecord
13+
14+
T = StorageEventRecord
15+
16+
PARTITION_KEY = "/id"
17+
18+
STORED_PROCEDURES: Dict[ContainerOperation, Dict[Type[BaseModel], str]] = {}
19+
20+
TRIGGERS: Dict[ContainerOperation, Dict[TriggerType, str]] = {}
21+
22+
23+
class StorageEventsContainer(CosmosDBContainer[T]):
24+
def __init__(
25+
self,
26+
model_type: Type[T],
27+
db: Optional[CosmosDBDatabase] = None,
28+
settings: Optional[CosmosDBSettings] = None,
29+
) -> None:
30+
super().__init__(
31+
lambda settings: settings.get_storage_events_container_name(),
32+
PARTITION_KEY,
33+
model_type=model_type,
34+
db=db,
35+
settings=settings,
36+
stored_procedures=STORED_PROCEDURES,
37+
triggers=TRIGGERS,
38+
)
39+
40+
def get_partition_key(self, model: T) -> str:
41+
return model.id

pctasks/core/pctasks/core/cosmos/settings.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
DEFAULT_DATABASE_NAME = "pctasks"
1515
DEFAULT_WORKFLOW_RUNS_CONTAINER = "workflow-runs"
1616
DEFAULT_WORKFLOWS_CONTAINER = "workflows"
17+
DEFAULT_ITEMS_CONTAINER = "items"
1718
DEFAULT_RECORDS_CONTAINER = "records"
19+
DEFAULT_STORAGE_EVENTS_CONTAINER_NAME = "storage-events"
1820

1921
DEFAULT_SINGLE_PARTITION_KEY_VALUE = "partition_key"
2022

@@ -37,7 +39,9 @@ def section_name(cls) -> str:
3739
database: str = DEFAULT_DATABASE_NAME
3840
workflows_container_name: str = DEFAULT_WORKFLOWS_CONTAINER
3941
workflow_runs_container_name: str = DEFAULT_WORKFLOW_RUNS_CONTAINER
42+
items_container_name: str = DEFAULT_ITEMS_CONTAINER
4043
records_container_name: str = DEFAULT_RECORDS_CONTAINER
44+
storage_events_container_name: str = DEFAULT_STORAGE_EVENTS_CONTAINER_NAME
4145

4246
max_bulk_put_size: int = 250
4347

@@ -54,6 +58,19 @@ def get_workflow_runs_container_name(self) -> str:
5458
)
5559
return self.workflow_runs_container_name
5660

61+
def get_storage_events_container_name(self) -> str:
62+
if self.test_container_suffix:
63+
return (
64+
f"tmp-{self.storage_events_container_name}"
65+
f"-{self.test_container_suffix}"
66+
)
67+
return self.storage_events_container_name
68+
69+
def get_items_container_name(self) -> str:
70+
if self.test_container_suffix:
71+
return f"tmp-{self.items_container_name}" f"-{self.test_container_suffix}"
72+
return self.items_container_name
73+
5774
def get_records_container_name(self) -> str:
5875
if self.test_container_suffix:
5976
return f"tmp-{self.records_container_name}-{self.test_container_suffix}"

pctasks/core/pctasks/core/models/event.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from pctasks.core.constants import MICROSOFT_OWNER
77
from pctasks.core.models.base import ForeachConfig, PCBaseModel, RunRecordId
8+
from pctasks.core.models.record import Record
89
from pctasks.core.models.utils import tzutc_now
910
from pctasks.core.utils import StrEnum
1011

@@ -20,6 +21,58 @@ class CloudEvent(PCBaseModel):
2021
data: Union[Dict[str, Any], BaseModel]
2122

2223

24+
class StorageEventType(StrEnum):
25+
# Doesn't have an update
26+
CREATED = "Microsoft.Storage.BlobCreated"
27+
DELETED = "Microsoft.Storage.BlobDeleted"
28+
29+
30+
class StorageEventData(PCBaseModel):
31+
"""
32+
A model for Blob Storage Events.
33+
34+
https://learn.microsoft.com/en-us/azure/event-grid/event-schema-blob-storage?tabs=cloud-event-schema
35+
"""
36+
37+
api: str
38+
clientRequestId: str
39+
requestId: str
40+
eTag: str
41+
contentType: str
42+
contentLength: Optional[int]
43+
contentOffset: Optional[int]
44+
blobType: str
45+
url: str
46+
sequencer: str
47+
storageDiagnostics: Dict[str, str]
48+
49+
50+
class StorageEvent(CloudEvent):
51+
type: StorageEventType
52+
data: StorageEventData
53+
54+
55+
class StorageEventRecord(StorageEvent, Record):
56+
"""
57+
A Cosmos DB record for storage events.
58+
59+
Cloud Events are valid Cosmos DB records, so we just inherit the
60+
additional methods from Record to satisfy mypy.
61+
"""
62+
63+
# We need to repeat this type definition to please mypy, which
64+
# didn't like the "conflicting" definitions for 'type', despite
65+
# them both being strings at the end of the day.
66+
type: StorageEventType
67+
68+
def get_id(self) -> str:
69+
return self.id
70+
71+
@staticmethod
72+
def migrate(item: Dict[str, Any]) -> Dict[str, Any]:
73+
return item
74+
75+
2376
class STACItemEventType(StrEnum):
2477
CREATED = "Microsoft.PlanetaryComputer.ItemCreated"
2578
UPDATED = "Microsoft.PlanetaryComputer.ItemUpdated"
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from datetime import datetime
2+
from typing import Any, Dict, Optional
3+
4+
import pystac
5+
from pydantic import Field, validator
6+
7+
from pctasks.core.models.record import Record
8+
from pctasks.core.utils import StrEnum
9+
10+
11+
class ItemRecordType(StrEnum):
12+
STAC_ITEM = "StacItem"
13+
ITEM_UPDATED = "ItemUpdated"
14+
15+
16+
class ItemRecord(Record):
17+
type: ItemRecordType
18+
stac_id: str
19+
20+
@validator("stac_id")
21+
def _stac_id_validator(cls, v: str) -> str:
22+
# Ensure a single forward slash in the string
23+
if v.count("/") != 1:
24+
raise ValueError("stac_id must contain a single forward slash")
25+
return v
26+
27+
def get_id(self) -> str:
28+
version = getattr(self, "version", "")
29+
return f"{self.collection_id}:{self.item_id}:{version}:{self.type}"
30+
31+
@property
32+
def collection_id(self) -> str:
33+
return self.stac_id.split("/")[0]
34+
35+
@property
36+
def item_id(self) -> str:
37+
return self.stac_id.split("/")[1]
38+
39+
40+
class StacItemRecord(ItemRecord):
41+
"""
42+
Record for STAC items.
43+
44+
These records are used in the items container of the Cosmos DB database.
45+
46+
Parameters
47+
----------
48+
type: ItemRecordType, 'StacItem'
49+
This is always 'StacItem'
50+
stac_id: str
51+
The "STAC identifier" which is the STAC collection ID and Item ID joined
52+
by a single ``/``
53+
version: str, optional
54+
The STAC version identifier.
55+
"""
56+
57+
type: ItemRecordType = Field(default=ItemRecordType.STAC_ITEM, const=True)
58+
item: Dict[str, Any]
59+
deleted: bool = False
60+
61+
@classmethod
62+
def from_item(cls, item: pystac.Item) -> "StacItemRecord":
63+
collection_id = item.collection_id
64+
item_id = item.id
65+
stac_id = f"{collection_id}/{item_id}"
66+
return cls(
67+
stac_id=stac_id, version=item.properties.get("version"), item=item.to_dict()
68+
)
69+
70+
@property
71+
def version(self) -> str:
72+
return self.item.get("properties", {}).get("version", "")
73+
74+
75+
class ItemUpdatedRecord(ItemRecord):
76+
"""Record that records an item update.
77+
78+
Does not specify if the item was created or updated.
79+
"""
80+
81+
type: ItemRecordType = Field(default=ItemRecordType.ITEM_UPDATED, const=True)
82+
83+
run_id: str
84+
"""The run ID of the workflow that updated this Item version"""
85+
86+
delete: bool = False
87+
"""True if the update was to delete this Item version"""
88+
89+
storage_event_time: Optional[datetime] = None
90+
message_inserted_time: Optional[datetime] = None
91+
version: Optional[str]
92+
93+
@validator("version")
94+
def _version_validator(cls, v: Optional[str]) -> str:
95+
return v or ""
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import datetime
2+
import typing
3+
4+
import pystac
5+
import pytest
6+
7+
import pctasks.core.cosmos.containers.items
8+
import pctasks.core.models.item
9+
10+
11+
@pytest.fixture
12+
def stac_item() -> typing.Generator[pystac.Item, None, None]:
13+
"""A simple STAC item for record tests."""
14+
item = pystac.Item(
15+
"my-item",
16+
None,
17+
None,
18+
datetime.datetime(2000, 1, 1),
19+
properties={},
20+
collection="my-collection",
21+
)
22+
yield item
23+
24+
25+
def test_from_item(stac_item: pystac.Item):
26+
result = pctasks.core.models.item.StacItemRecord.from_item(stac_item)
27+
assert result.stac_id == "my-collection/my-item"
28+
assert result.collection_id == "my-collection"
29+
assert result.item_id == "my-item"
30+
assert result.get_id() == "my-collection:my-item::StacItem"
31+
32+
33+
def test_id_validator(stac_item: pystac.Item):
34+
stac_item.id = "my/item"
35+
with pytest.raises(ValueError, match="stac_id must contain"):
36+
pctasks.core.models.item.StacItemRecord.from_item(stac_item)
37+
38+
39+
def test_get_version(stac_item: pystac.Item):
40+
stac_item.properties["version"] = "2"
41+
result = pctasks.core.models.item.StacItemRecord.from_item(stac_item)
42+
assert result.version == "2"
43+
44+
45+
@pytest.mark.parametrize(
46+
"version, expected",
47+
[
48+
(None, "my-collection:my-item::ItemUpdated"),
49+
("2", "my-collection:my-item:2:ItemUpdated"),
50+
],
51+
)
52+
def test_item_update_record(version, expected):
53+
record = pctasks.core.models.item.ItemUpdatedRecord(
54+
stac_id="my-collection/my-item", run_id="test", version=version
55+
)
56+
result = record.get_id()
57+
assert result == expected

0 commit comments

Comments
 (0)