Skip to content

Commit 191ae64

Browse files
Fix ingestor regressions introduced by /pull/18. (#19)
* Refactor use of load_into_pgstac and move collection load logic. * Fix regression not included in be40af7 * Fix uncaught regression accessing collection property from an item dict. * Linting fixes.
1 parent da1cd77 commit 191ae64

File tree

3 files changed

+41
-83
lines changed

3 files changed

+41
-83
lines changed

lib/ingestor-api/runtime/src/collection.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
import os
22

33
from pypgstac.db import PgstacDB
4+
from pypgstac.load import Methods
45

56
from .schemas import StacCollection
6-
from .utils import (
7-
IngestionType,
8-
convert_decimals_to_float,
9-
get_db_credentials,
10-
load_into_pgstac,
11-
)
7+
from .utils import get_db_credentials
128
from .vedaloader import VEDALoader
139

1410

@@ -18,12 +14,13 @@ def ingest(collection: StacCollection):
1814
does necessary preprocessing,
1915
and loads into the PgSTAC collection table
2016
"""
21-
creds = get_db_credentials(os.environ["DB_SECRET_ARN"])
22-
collection = [
23-
convert_decimals_to_float(collection.dict(by_alias=True, exclude_unset=True))
24-
]
25-
with PgstacDB(dsn=creds.dsn_string, debug=True) as db:
26-
load_into_pgstac(db=db, ingestions=collection, table=IngestionType.collections)
17+
try:
18+
creds = get_db_credentials(os.environ["DB_SECRET_ARN"])
19+
with PgstacDB(dsn=creds.dsn_string, debug=True) as db:
20+
loader = VEDALoader(db=db)
21+
loader.load_collection(file=collection, insert_mode=Methods.upsert)
22+
except Exception as e:
23+
print(f"Encountered failure loading collection into pgSTAC: {e}")
2724

2825

2926
def delete(collection_id: str):

lib/ingestor-api/runtime/src/ingestor.py

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,11 @@
33
from typing import TYPE_CHECKING, Iterator, List, Optional, Sequence
44

55
from boto3.dynamodb.types import TypeDeserializer
6-
from pypgstac.db import PgstacDB
76

8-
from .dependencies import get_settings, get_table
7+
from .config import settings
8+
from .dependencies import get_table
99
from .schemas import Ingestion, Status
10-
from .utils import (
11-
IngestionType,
12-
convert_decimals_to_float,
13-
get_db_credentials,
14-
load_into_pgstac,
15-
)
10+
from .utils import get_db_credentials, load_items
1611

1712
if TYPE_CHECKING:
1813
from aws_lambda_typing import context as context_
@@ -43,7 +38,7 @@ def update_dynamodb(
4338
"""
4439
# Update records in DynamoDB
4540
print(f"Updating ingested items status in DynamoDB, marking as {status}...")
46-
table = get_table(get_settings())
41+
table = get_table(settings)
4742
with table.batch_writer(overwrite_by_pkeys=["created_by", "id"]) as batch:
4843
for ingestion in ingestions:
4944
batch.put_item(
@@ -64,24 +59,14 @@ def handler(event: "events.DynamoDBStreamEvent", context: "context_.Context"):
6459
print("No queued ingestions to process")
6560
return
6661

67-
items = [
68-
# NOTE: Important to deserialize values to convert decimals to floats
69-
convert_decimals_to_float(ingestion.item)
70-
for ingestion in ingestions
71-
]
72-
73-
creds = get_db_credentials(os.environ["DB_SECRET_ARN"])
74-
7562
# Insert into PgSTAC DB
7663
outcome = Status.succeeded
7764
message = None
7865
try:
79-
with PgstacDB(dsn=creds.dsn_string, debug=True) as db:
80-
load_into_pgstac(
81-
db=db,
82-
ingestions=items,
83-
table=IngestionType.items,
84-
)
66+
load_items(
67+
creds=get_db_credentials(os.environ["DB_SECRET_ARN"]),
68+
ingestions=ingestions,
69+
)
8570
except Exception as e:
8671
print(f"Encountered failure loading items into pgSTAC: {e}")
8772
outcome = Status.failed
Lines changed: 24 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,17 @@
11
import decimal
22
import json
3-
from enum import Enum
4-
from typing import Any, Dict, Sequence, Union
3+
from typing import Any, Dict, Sequence
54

65
import boto3
76
import orjson
87
import pydantic
98
from pypgstac.db import PgstacDB
109
from pypgstac.load import Methods
1110

12-
from .schemas import AccessibleItem, StacCollection
11+
from .schemas import Ingestion
1312
from .vedaloader import VEDALoader
1413

1514

16-
class IngestionType(str, Enum):
17-
collections = "collections"
18-
items = "items"
19-
20-
2115
class DbCreds(pydantic.BaseModel):
2216
username: str
2317
password: str
@@ -62,47 +56,29 @@ def decimal_to_float(obj):
6256
)
6357

6458

65-
def load_items(items: Sequence[AccessibleItem], loader):
59+
def load_items(creds: DbCreds, ingestions: Sequence[Ingestion]):
6660
"""
67-
Loads items into the PgSTAC database and
68-
updates the summaries and extent for the collections involved
69-
"""
70-
loading_result = loader.load_items(
71-
file=items,
72-
# use insert_ignore to avoid overwritting existing items or upsert to replace
73-
insert_mode=Methods.upsert,
74-
)
75-
76-
# Trigger update on summaries and extents
77-
collections = set([item["collection"] for item in items])
78-
for collection in collections:
79-
loader.update_collection_summaries(collection)
80-
81-
return loading_result
82-
83-
84-
def load_collection(collection: Sequence[StacCollection], loader):
85-
"""
86-
Loads the collection to the PgSTAC database
61+
Bulk insert STAC records into pgSTAC.
8762
"""
88-
return loader.load_collections(
89-
file=collection,
90-
# use insert_ignore to avoid overwritting existing items or upsert to replace
91-
insert_mode=Methods.upsert,
92-
)
63+
with PgstacDB(dsn=creds.dsn_string, debug=True) as db:
64+
loader = VEDALoader(db=db)
65+
66+
items = [
67+
# NOTE: Important to deserialize values to convert decimals to floats
68+
convert_decimals_to_float(i.item)
69+
for i in ingestions
70+
]
71+
72+
print(f"Ingesting {len(items)} items")
73+
loading_result = loader.load_items(
74+
file=items,
75+
# use insert_ignore to avoid overwritting existing items or upsert to replace
76+
insert_mode=Methods.upsert,
77+
)
9378

79+
# Trigger update on summaries and extents
80+
collections = set([item["collection"] for item in items])
81+
for collection in collections:
82+
loader.update_collection_summaries(collection)
9483

95-
def load_into_pgstac(
96-
db: "PgstacDB",
97-
ingestions: Union[Sequence[AccessibleItem], Sequence[StacCollection]],
98-
table: IngestionType,
99-
):
100-
"""
101-
Bulk insert STAC records into pgSTAC.
102-
The ingestion can be items or collection, determined by the `table` arg.
103-
"""
104-
loader = VEDALoader(db=db)
105-
loading_function = load_items
106-
if table == IngestionType.collections:
107-
loading_function = load_collection
108-
return loading_function(ingestions, loader)
84+
return loading_result

0 commit comments

Comments
 (0)