Skip to content

Commit bd7a1fa

Browse files
fix(ingestor-api): store STAC item as string in DynamoDB (#26)
* fix: stac item serialization in ingestion model * fix: use pydantic parsing and remove float conversion * test: add initial test for missing ingestor coverage * fix: fix updated psql function name changed in db bootstrapper * style: fix black errors * test: add utils.load_items test for new item serialization * style: black and isort style changes * Make use of jsonable_encoder, buildout tests for submitting STAC items --------- Co-authored-by: Anthony Lukach <[email protected]>
1 parent 3fbaf4d commit bd7a1fa

File tree

9 files changed

+253
-55
lines changed

9 files changed

+253
-55
lines changed

lib/ingestor-api/runtime/src/collection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
from pypgstac.db import PgstacDB
44
from pypgstac.load import Methods
55

6+
from .loader import Loader
67
from .schemas import StacCollection
78
from .utils import get_db_credentials
8-
from .loader import Loader
99

1010

1111
def ingest(collection: StacCollection):

lib/ingestor-api/runtime/src/ingestor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def get_queued_ingestions(records: List["DynamodbRecord"]) -> Iterator[Ingestion
2323
k: deserializer.deserialize(v)
2424
for k, v in record["dynamodb"]["NewImage"].items()
2525
}
26-
ingestion = Ingestion.construct(**parsed)
26+
ingestion = Ingestion.parse_obj(parsed)
2727
if ingestion.status == Status.queued:
2828
yield ingestion
2929

lib/ingestor-api/runtime/src/loader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def update_collection_summaries(self, collection_id: str) -> None:
2727
)
2828
)
2929
cur.execute(
30-
"SELECT dashboard.update_collection_default_summaries(%s)",
30+
"SELECT dashboard.update_default_summaries(%s)",
3131
[collection_id],
3232
)
3333
logger.info("Updating bbox for collection: {}.".format(collection_id))

lib/ingestor-api/runtime/src/schemas.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,19 @@
33
import enum
44
import json
55
from datetime import datetime
6-
from decimal import Decimal
7-
from typing import TYPE_CHECKING, Dict, List, Optional
6+
from typing import TYPE_CHECKING, Dict, List, Optional, Union
87
from urllib.parse import urlparse
98

9+
from fastapi.encoders import jsonable_encoder
1010
from fastapi.exceptions import RequestValidationError
11-
from pydantic import BaseModel, PositiveInt, dataclasses, error_wrappers, validator
11+
from pydantic import (
12+
BaseModel,
13+
Json,
14+
PositiveInt,
15+
dataclasses,
16+
error_wrappers,
17+
validator,
18+
)
1219
from stac_pydantic import Collection, Item, shared
1320

1421
from . import validators
@@ -23,7 +30,7 @@ def is_accessible(cls, href):
2330
url = urlparse(href)
2431

2532
if url.scheme in ["https", "http"]:
26-
validators.url_is_accessible(href)
33+
validators.url_is_accessible(href=href)
2734
elif url.scheme in ["s3"]:
2835
validators.s3_object_is_accessible(
2936
bucket=url.hostname, key=url.path.lstrip("/")
@@ -64,7 +71,7 @@ class Ingestion(BaseModel):
6471
created_at: datetime = None
6572
updated_at: datetime = None
6673

67-
item: Item
74+
item: Union[Item, Json[Item]]
6875

6976
@validator("created_at", pre=True, always=True, allow_reuse=True)
7077
@validator("updated_at", pre=True, always=True, allow_reuse=True)
@@ -84,9 +91,16 @@ def save(self, db: "services.Database"):
8491
db.write(self)
8592
return self
8693

87-
def dynamodb_dict(self, by_alias=True):
94+
def dynamodb_dict(self):
8895
"""DynamoDB-friendly serialization"""
89-
return json.loads(self.json(by_alias=by_alias), parse_float=Decimal)
96+
# convert to dictionary
97+
output = self.dict(exclude={"item"})
98+
99+
# add STAC item as string
100+
output["item"] = self.item.json()
101+
102+
# make JSON-friendly (will be able to do with Pydantic V2, https://github.com/pydantic/pydantic/issues/1409#issuecomment-1423995424)
103+
return jsonable_encoder(output)
90104

91105

92106
@dataclasses.dataclass

lib/ingestor-api/runtime/src/utils.py

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
1-
import decimal
2-
import json
3-
from typing import Any, Dict, Sequence
1+
from typing import Sequence
42

53
import boto3
6-
import orjson
74
import pydantic
85
from pypgstac.db import PgstacDB
96
from pypgstac.load import Methods
7+
from fastapi.encoders import jsonable_encoder
108

11-
from .schemas import Ingestion
129
from .loader import Loader
10+
from .schemas import Ingestion
1311

1412

1513
class DbCreds(pydantic.BaseModel):
@@ -36,40 +34,15 @@ def get_db_credentials(secret_arn: str) -> DbCreds:
3634
return DbCreds.parse_raw(response["SecretString"])
3735

3836

39-
def convert_decimals_to_float(item: Dict[str, Any]) -> Dict[str, Any]:
40-
"""
41-
DynamoDB stores floats as Decimals. We want to convert them back to floats
42-
before inserting them into pgSTAC to avoid any issues when the records are
43-
converted to JSON by pgSTAC.
44-
"""
45-
46-
def decimal_to_float(obj):
47-
if isinstance(obj, decimal.Decimal):
48-
return float(obj)
49-
raise TypeError
50-
51-
return json.loads(
52-
orjson.dumps(
53-
item,
54-
default=decimal_to_float,
55-
)
56-
)
57-
58-
5937
def load_items(creds: DbCreds, ingestions: Sequence[Ingestion]):
6038
"""
6139
Bulk insert STAC records into pgSTAC.
6240
"""
6341
with PgstacDB(dsn=creds.dsn_string, debug=True) as db:
6442
loader = Loader(db=db)
6543

66-
items = [
67-
# NOTE: Important to deserialize values to convert decimals to floats
68-
convert_decimals_to_float(i.item)
69-
for i in ingestions
70-
]
71-
72-
print(f"Ingesting {len(items)} items")
44+
# serialize to JSON-friendly dicts (won't be necessary in Pydantic v2, https://github.com/pydantic/pydantic/issues/1409#issuecomment-1423995424)
45+
items = jsonable_encoder(i.item for i in ingestions)
7346
loading_result = loader.load_items(
7447
file=items,
7548
# use insert_ignore to avoid overwritting existing items or upsert to replace

lib/ingestor-api/runtime/tests/conftest.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def test_environ():
2020
os.environ["JWKS_URL"] = "https://test-jwks.url"
2121
os.environ["STAC_URL"] = "https://test-stac.url"
2222
os.environ["DATA_ACCESS_ROLE"] = "arn:aws:iam::123456789012:role/test-role"
23+
os.environ["DB_SECRET_ARN"] = "testing"
2324

2425

2526
@pytest.fixture
@@ -101,7 +102,10 @@ def example_stac_item():
101102
]
102103
],
103104
},
104-
"properties": {"datetime": "2020-12-11T22:38:32.125000Z"},
105+
"properties": {
106+
"datetime": "2020-12-11T22:38:32.125000Z",
107+
"eo:cloud_cover": 1,
108+
},
105109
"collection": "simple-collection",
106110
"links": [
107111
{
@@ -125,13 +129,13 @@ def example_stac_item():
125129
],
126130
"assets": {
127131
"visual": {
128-
"href": "https://storage.googleapis.com/open-cogs/stac-examples/20201211_223832_CS2.tif", # noqa
132+
"href": "https://TEST_API.com/open-cogs/stac-examples/20201211_223832_CS2.tif", # noqa
129133
"type": "image/tiff; application=geotiff; profile=cloud-optimized",
130134
"title": "3-Band Visual",
131135
"roles": ["visual"],
132136
},
133137
"thumbnail": {
134-
"href": "https://storage.googleapis.com/open-cogs/stac-examples/20201211_223832_CS2.jpg", # noqa
138+
"href": "https://TEST_API.com/open-cogs/stac-examples/20201211_223832_CS2.jpg", # noqa
135139
"title": "Thumbnail",
136140
"type": "image/jpeg",
137141
"roles": ["thumbnail"],
@@ -244,10 +248,7 @@ def client_authenticated(app):
244248
"""
245249
from src.dependencies import get_username
246250

247-
def skip_auth():
248-
pass
249-
250-
app.dependency_overrides[get_username] = skip_auth
251+
app.dependency_overrides[get_username] = lambda: 'test_user'
251252
return TestClient(app)
252253

253254

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from unittest.mock import patch
2+
3+
import pytest
4+
5+
6+
@pytest.fixture()
7+
def dynamodb_stream_event():
8+
return {"Records": None}
9+
10+
11+
@pytest.fixture()
12+
def get_queued_ingestions(example_ingestion):
13+
with patch(
14+
"src.ingestor.get_queued_ingestions",
15+
return_value=iter([example_ingestion]),
16+
autospec=True,
17+
) as m:
18+
yield m
19+
20+
21+
@pytest.fixture()
22+
def get_db_credentials():
23+
with patch("src.ingestor.get_db_credentials", return_value="", autospec=True) as m:
24+
yield m
25+
26+
27+
@pytest.fixture()
28+
def load_items():
29+
with patch("src.ingestor.load_items", return_value=0, autospec=True) as m:
30+
yield m
31+
32+
33+
@pytest.fixture()
34+
def get_table(mock_table):
35+
with patch("src.ingestor.get_table", return_value=mock_table, autospec=True) as m:
36+
yield m
37+
38+
39+
def test_handler(
40+
monkeypatch,
41+
test_environ,
42+
dynamodb_stream_event,
43+
example_ingestion,
44+
get_queued_ingestions,
45+
get_db_credentials,
46+
load_items,
47+
get_table,
48+
mock_table,
49+
):
50+
import src.ingestor as ingestor
51+
52+
ingestor.handler(dynamodb_stream_event, {})
53+
load_items.assert_called_once_with(
54+
creds="",
55+
ingestions=list([example_ingestion]),
56+
)
57+
response = mock_table.get_item(
58+
Key={"created_by": example_ingestion.created_by, "id": example_ingestion.id}
59+
)
60+
assert response["Item"]["status"] == "succeeded"

0 commit comments

Comments
 (0)