Skip to content

Commit d34c6cd

Browse files
committed
feat(ingestor-api): add StacIngestor construct
1 parent 5614d78 commit d34c6cd

File tree

16 files changed

+1175
-0
lines changed

16 files changed

+1175
-0
lines changed

lib/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export * from "./bastion-host";
22
export * from "./bootstrapper";
33
export * from "./database";
4+
export * from "./ingestor-api";
45
export * from "./stac-api";

lib/ingestor-api/index.ts

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
import {
2+
aws_apigateway as apigateway,
3+
aws_dynamodb as dynamodb,
4+
aws_ec2 as ec2,
5+
aws_iam as iam,
6+
aws_lambda as lambda,
7+
aws_lambda_event_sources as events,
8+
aws_secretsmanager as secretsmanager,
9+
aws_ssm as ssm,
10+
Duration,
11+
RemovalPolicy,
12+
Stack,
13+
} from "aws-cdk-lib";
14+
import { PythonFunction } from "@aws-cdk/aws-lambda-python-alpha";
15+
import { Construct } from "constructs";
16+
17+
export class StacIngestor extends Construct {
18+
table: dynamodb.Table;
19+
20+
constructor(scope: Construct, id: string, props: StacIngestorProps) {
21+
super(scope, id);
22+
23+
this.table = this.buildTable();
24+
25+
const env: Record<string, string> = {
26+
DYNAMODB_TABLE: this.table.tableName,
27+
ROOT_PATH: `/${props.stage}`,
28+
NO_PYDANTIC_SSM_SETTINGS: "1",
29+
STAC_URL: props.stacUrl,
30+
DATA_ACCESS_ROLE: props.dataAccessRole.roleArn,
31+
...props.apiEnv,
32+
};
33+
34+
const handler = this.buildApiLambda({
35+
table: this.table,
36+
env,
37+
dataAccessRole: props.dataAccessRole,
38+
stage: props.stage,
39+
});
40+
41+
this.buildApiEndpoint({
42+
handler,
43+
stage: props.stage,
44+
endpointConfiguration: props.apiEndpointConfiguration,
45+
policy: props.apiPolicy,
46+
});
47+
48+
this.buildIngestor({
49+
table: this.table,
50+
env: env,
51+
dbSecret: props.stacDbSecret,
52+
dbVpc: props.vpc,
53+
dbSecurityGroup: props.stacDbSecurityGroup,
54+
subnetSelection: props.subnetSelection,
55+
});
56+
57+
this.registerSsmParameter({
58+
name: "dynamodb_table",
59+
value: this.table.tableName,
60+
description: "Name of table used to store ingestions",
61+
});
62+
}
63+
64+
private buildTable(): dynamodb.Table {
65+
const table = new dynamodb.Table(this, "ingestions-table", {
66+
partitionKey: { name: "created_by", type: dynamodb.AttributeType.STRING },
67+
sortKey: { name: "id", type: dynamodb.AttributeType.STRING },
68+
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
69+
removalPolicy: RemovalPolicy.DESTROY,
70+
stream: dynamodb.StreamViewType.NEW_IMAGE,
71+
});
72+
73+
table.addGlobalSecondaryIndex({
74+
indexName: "status",
75+
partitionKey: { name: "status", type: dynamodb.AttributeType.STRING },
76+
sortKey: { name: "created_at", type: dynamodb.AttributeType.STRING },
77+
});
78+
79+
return table;
80+
}
81+
82+
private buildApiLambda(props: {
83+
table: dynamodb.ITable;
84+
env: Record<string, string>;
85+
dataAccessRole: iam.IRole;
86+
stage: string;
87+
}): PythonFunction {
88+
const handler_role = new iam.Role(this, "execution-role", {
89+
description:
90+
"Role used by STAC Ingestor. Manually defined so that we can choose a name that is supported by the data access roles trust policy",
91+
roleName: `stac-ingestion-api-${props.stage}`,
92+
assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
93+
managedPolicies: [
94+
iam.ManagedPolicy.fromAwsManagedPolicyName(
95+
"service-role/AWSLambdaBasicExecutionRole"
96+
),
97+
],
98+
});
99+
100+
const handler = new PythonFunction(this, "api-handler", {
101+
entry: `${__dirname}/runtime`,
102+
index: "src/handler.py",
103+
runtime: lambda.Runtime.PYTHON_3_9,
104+
environment: props.env,
105+
timeout: Duration.seconds(30),
106+
role: handler_role,
107+
memorySize: 2048,
108+
});
109+
110+
props.table.grantReadWriteData(handler);
111+
props.dataAccessRole.grant(handler.grantPrincipal, "sts:AssumeRole");
112+
113+
return handler;
114+
}
115+
116+
private buildIngestor(props: {
117+
table: dynamodb.ITable;
118+
env: Record<string, string>;
119+
dbSecret: secretsmanager.ISecret;
120+
dbVpc: ec2.IVpc;
121+
dbSecurityGroup: ec2.ISecurityGroup;
122+
subnetSelection: ec2.SubnetSelection;
123+
}): PythonFunction {
124+
const handler = new PythonFunction(this, "stac-ingestor", {
125+
entry: `${__dirname}/runtime`,
126+
index: "src/ingestor.py",
127+
runtime: lambda.Runtime.PYTHON_3_9,
128+
timeout: Duration.seconds(180),
129+
environment: { DB_SECRET_ARN: props.dbSecret.secretArn, ...props.env },
130+
vpc: props.dbVpc,
131+
vpcSubnets: props.subnetSelection,
132+
allowPublicSubnet: true,
133+
memorySize: 2048,
134+
});
135+
136+
// Allow handler to read DB secret
137+
props.dbSecret.grantRead(handler);
138+
139+
// Allow handler to connect to DB
140+
props.dbSecurityGroup.addIngressRule(
141+
handler.connections.securityGroups[0],
142+
ec2.Port.tcp(5432),
143+
"Allow connections from STAC Ingestor"
144+
);
145+
146+
// Allow handler to write results back to DBƒ
147+
props.table.grantWriteData(handler);
148+
149+
// Trigger handler from writes to DynamoDB table
150+
handler.addEventSource(
151+
new events.DynamoEventSource(props.table, {
152+
// Read when batches reach size...
153+
batchSize: 1000,
154+
// ... or when window is reached.
155+
maxBatchingWindow: Duration.seconds(10),
156+
// Read oldest data first.
157+
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
158+
retryAttempts: 1,
159+
})
160+
);
161+
162+
return handler;
163+
}
164+
165+
private buildApiEndpoint(props: {
166+
handler: lambda.IFunction;
167+
stage: string;
168+
policy?: iam.PolicyDocument;
169+
endpointConfiguration?: apigateway.EndpointConfiguration;
170+
}): apigateway.LambdaRestApi {
171+
return new apigateway.LambdaRestApi(
172+
this,
173+
`${Stack.of(this).stackName}-ingestor-api`,
174+
{
175+
handler: props.handler,
176+
proxy: true,
177+
178+
cloudWatchRole: true,
179+
deployOptions: { stageName: props.stage },
180+
endpointExportName: `ingestor-api-${props.stage}`,
181+
182+
endpointConfiguration: props.endpointConfiguration,
183+
policy: props.policy,
184+
}
185+
);
186+
}
187+
188+
private registerSsmParameter(props: {
189+
name: string;
190+
value: string;
191+
description: string;
192+
}): ssm.IStringParameter {
193+
const parameterNamespace = Stack.of(this).stackName;
194+
return new ssm.StringParameter(
195+
this,
196+
`${props.name.replace("_", "-")}-parameter`,
197+
{
198+
description: props.description,
199+
parameterName: `/${parameterNamespace}/${props.name}`,
200+
stringValue: props.value,
201+
}
202+
);
203+
}
204+
}
205+
206+
export interface StacIngestorProps {
207+
/**
208+
* ARN of AWS Role used to validate access to S3 data
209+
*/
210+
readonly dataAccessRole: iam.IRole;
211+
212+
/**
213+
* URL of STAC API
214+
*/
215+
readonly stacUrl: string;
216+
217+
/**
218+
* Stage of deployment (e.g. `dev`, `prod`)
219+
*/
220+
readonly stage: string;
221+
222+
/**
223+
* Secret containing pgSTAC DB connection information
224+
*/
225+
readonly stacDbSecret: secretsmanager.ISecret;
226+
227+
/**
228+
* VPC running pgSTAC DB
229+
*/
230+
readonly vpc: ec2.IVpc;
231+
232+
/**
233+
* Security Group used by pgSTAC DB
234+
*/
235+
readonly stacDbSecurityGroup: ec2.ISecurityGroup;
236+
237+
/**
238+
* Boolean indicating whether or not pgSTAC DB is in a public subnet
239+
*/
240+
readonly subnetSelection: ec2.SubnetSelection;
241+
242+
/**
243+
* Environment variables to be sent to Lambda.
244+
*/
245+
readonly apiEnv: Record<string, string>;
246+
247+
/**
248+
* API Endpoint Configuration, useful for creating private APIs.
249+
*/
250+
readonly apiEndpointConfiguration?: apigateway.EndpointConfiguration;
251+
252+
/**
253+
* API Policy Document, useful for creating private APIs.
254+
*/
255+
readonly apiPolicy?: iam.PolicyDocument;
256+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
Authlib==1.0.1
2+
cachetools==5.1.0
3+
fastapi>=0.75.1
4+
mangum>=0.15.0
5+
orjson>=3.6.8
6+
psycopg[binary,pool]>=3.0.15
7+
pydantic_ssm_settings>=0.2.0
8+
pydantic>=1.9.0
9+
# Waiting for https://github.com/stac-utils/pgstac/pull/135
10+
# pypgstac==0.6.6
11+
pypgstac @ git+https://github.com/stac-utils/pgstac.git@main#egg=pygstac&subdirectory=pypgstac
12+
requests>=2.27.1
13+
# Waiting for https://github.com/stac-utils/stac-pydantic/pull/116
14+
stac-pydantic @ git+https://github.com/alukach/stac-pydantic.git@patch-1

lib/ingestor-api/runtime/src/__init__.py

Whitespace-only changes.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import os
2+
from getpass import getuser
3+
from typing import Optional
4+
5+
from pydantic import BaseSettings, Field, AnyHttpUrl, constr
6+
from pydantic_ssm_settings import AwsSsmSourceConfig
7+
8+
9+
AwsArn = constr(regex=r"^arn:aws:iam::\d{12}:role/.+")
10+
11+
12+
class Settings(BaseSettings):
13+
dynamodb_table: str
14+
15+
root_path: Optional[str] = Field(description="Path from where to serve this URL.")
16+
17+
jwks_url: Optional[AnyHttpUrl] = Field(
18+
description="URL of JWKS, e.g. https://cognito-idp.{region}.amazonaws.com/{userpool_id}/.well-known/jwks.json" # noqa
19+
)
20+
21+
stac_url: AnyHttpUrl = Field(description="URL of STAC API")
22+
23+
data_access_role: AwsArn = Field(
24+
description="ARN of AWS Role used to validate access to S3 data"
25+
)
26+
27+
class Config(AwsSsmSourceConfig):
28+
env_file = ".env"
29+
30+
@classmethod
31+
def from_ssm(cls, stack: str):
32+
return cls(_secrets_dir=f"/{stack}")
33+
34+
35+
settings = (
36+
Settings()
37+
if os.environ.get("NO_PYDANTIC_SSM_SETTINGS")
38+
else Settings.from_ssm(
39+
stack=os.environ.get(
40+
"STACK", f"veda-stac-ingestion-system-{os.environ.get('STAGE', getuser())}"
41+
),
42+
)
43+
)

0 commit comments

Comments
 (0)