1- import decimal
21import os
32from datetime import datetime
4- from typing import TYPE_CHECKING , Any , Dict , Iterator , List , Optional , Sequence
3+ from typing import TYPE_CHECKING , Iterator , List , Optional , Sequence
54
6- import boto3
7- import orjson
8- import pydantic
95from boto3 .dynamodb .types import TypeDeserializer
106from pypgstac .db import PgstacDB
11- from pypgstac .load import Methods
127
138from .dependencies import get_settings , get_table
149from .schemas import Ingestion , Status
15- from .vedaloader import VEDALoader
10+ from .utils import (
11+ IngestionType ,
12+ convert_decimals_to_float ,
13+ get_db_credentials ,
14+ load_into_pgstac ,
15+ )
1616
1717if TYPE_CHECKING :
1818 from aws_lambda_typing import context as context_
@@ -33,78 +33,6 @@ def get_queued_ingestions(records: List["DynamodbRecord"]) -> Iterator[Ingestion
3333 yield ingestion
3434
3535
36- class DbCreds (pydantic .BaseModel ):
37- username : str
38- password : str
39- host : str
40- port : int
41- dbname : str
42- engine : str
43-
44- @property
45- def dsn_string (self ) -> str :
46- return f"{ self .engine } ://{ self .username } :{ self .password } @{ self .host } :{ self .port } /{ self .dbname } " # noqa
47-
48-
49- def get_db_credentials (secret_arn : str ) -> DbCreds :
50- """
51- Load pgSTAC database credentials from AWS Secrets Manager.
52- """
53- print ("Fetching DB credentials..." )
54- session = boto3 .session .Session (region_name = secret_arn .split (":" )[3 ])
55- client = session .client (service_name = "secretsmanager" )
56- response = client .get_secret_value (SecretId = secret_arn )
57- return DbCreds .parse_raw (response ["SecretString" ])
58-
59-
60- def convert_decimals_to_float (item : Dict [str , Any ]) -> Dict [str , Any ]:
61- """
62- DynamoDB stores floats as Decimals. We want to convert them back to floats
63- before inserting them into pgSTAC to avoid any issues when the records are
64- converted to JSON by pgSTAC.
65- """
66-
67- def decimal_to_float (obj ):
68- if isinstance (obj , decimal .Decimal ):
69- return float (obj )
70- raise TypeError
71-
72- return orjson .loads (
73- orjson .dumps (
74- item ,
75- default = decimal_to_float ,
76- )
77- )
78-
79-
80- def load_into_pgstac (creds : DbCreds , ingestions : Sequence [Ingestion ]):
81- """
82- Bulk insert STAC records into pgSTAC.
83- """
84- with PgstacDB (dsn = creds .dsn_string , debug = True ) as db :
85- loader = VEDALoader (db = db )
86-
87- items = [
88- # NOTE: Important to deserialize values to convert decimals to floats
89- convert_decimals_to_float (i .item )
90- for i in ingestions
91- ]
92-
93- print (f"Ingesting { len (items )} items" )
94- loading_result = loader .load_items (
95- file = items ,
96- # use insert_ignore to avoid overwritting existing items or upsert to replace
97- insert_mode = Methods .upsert ,
98- )
99-
100- # Trigger update on summaries and extents
101- collections = set ([item .collection for item in items ])
102- for collection in collections :
103- loader .update_collection_summaries (collection )
104-
105- return loading_result
106-
107-
10836def update_dynamodb (
10937 ingestions : Sequence [Ingestion ],
11038 status : Status ,
@@ -136,14 +64,24 @@ def handler(event: "events.DynamoDBStreamEvent", context: "context_.Context"):
13664 print ("No queued ingestions to process" )
13765 return
13866
67+ items = [
68+ # NOTE: Important to deserialize values to convert decimals to floats
69+ convert_decimals_to_float (ingestion .item )
70+ for ingestion in ingestions
71+ ]
72+
73+ creds = get_db_credentials (os .environ ["DB_SECRET_ARN" ])
74+
13975 # Insert into PgSTAC DB
14076 outcome = Status .succeeded
14177 message = None
14278 try :
143- load_into_pgstac (
144- creds = get_db_credentials (os .environ ["DB_SECRET_ARN" ]),
145- ingestions = ingestions ,
146- )
79+ with PgstacDB (dsn = creds .dsn_string , debug = True ) as db :
80+ load_into_pgstac (
81+ db = db ,
82+ ingestions = items ,
83+ table = IngestionType .items ,
84+ )
14785 except Exception as e :
14886 print (f"Encountered failure loading items into pgSTAC: { e } " )
14987 outcome = Status .failed
0 commit comments