Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
infra/
34 changes: 34 additions & 0 deletions .github/workflows/aws-deploy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
on:
push:
branches:
- add-lambda-handler

jobs:
aws_cdk:
runs-on: ubuntu-latest
steps:
- name: Check out the repo
uses: actions/checkout@v4

# - name: cdk diff
# uses: youyo/aws-cdk-github-actions@v2
# with:
# cdk_subcommand: 'diff'
# actions_comment: true
# working_dir: 'infra'
# env:
# AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
# AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
# AWS_DEFAULT_REGION: 'us-west-2'

- name: cdk deploy
uses: youyo/aws-cdk-github-actions@v2
with:
cdk_subcommand: 'deploy'
cdk_args: '--require-approval never'
actions_comment: false
working_dir: 'infra'
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_DEFAULT_REGION: 'us-west-2'
46 changes: 0 additions & 46 deletions .github/workflows/publish.yaml

This file was deleted.

4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@ cython_debug/

# PyPI configuration file
.pypirc

# CDK asset staging directory
.cdk.staging
cdk.out
26 changes: 17 additions & 9 deletions fiboa_sda/ingest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import functools
import concurrent.futures
import json
import multiprocessing
import os
import tempfile

import boto3
from botocore import UNSIGNED
from botocore.config import Config
import geopandas as gpd
import pandas as pd
import pyarrow as pa
from google.cloud import bigquery
from google.oauth2 import service_account

from fiboa_sda.logger import get_logger, TimerFunc
from fiboa_sda.metrics import calculate_geometry_metrics
Expand All @@ -17,7 +20,7 @@
settings = get_settings()
logger = get_logger(__name__)
BUCKET_NAME = "us-west-2.opendata.source.coop"
S3_CLIENT = boto3.client('s3', region_name="us-west-2")
S3_CLIENT = boto3.client('s3', region_name="us-west-2", config=Config(signature_version=UNSIGNED))

# pyarrow schema used when writing out parquet
# files to BQ
Expand Down Expand Up @@ -67,7 +70,11 @@ def get_s3_key_for_dataset(fiboa_id: str) -> list[str]:
def write_to_bq(
df: pd.DataFrame, project_name: str, dataset_name: str, table_name: str
) -> None:
client = bigquery.Client(project=project_name)
credentials = None
if service_account_file := os.getenv("SERVICE_ACCOUNT_FILE"):
credentials = service_account.Credentials.from_service_account_info(json.loads(service_account_file))

client = bigquery.Client(project=project_name, credentials=credentials)
job_config = bigquery.LoadJobConfig(source_format="PARQUET")

with tempfile.NamedTemporaryFile() as tmp:
Expand All @@ -90,12 +97,13 @@ def normalize_dataset(df: gpd.GeoDataFrame, repository_id: str, s3_path: str) ->

# Dump any fields that aren't part of fiboa into a JSON column.
available_external_fields = set(df.columns) - set(settings.FIBOA_FIELDS)
df["external_fields"] = df.apply(
lambda row: json.dumps(
{field: row[field] for field in available_external_fields}
),
axis=1,
)

def _coalesce(row):
d = row.to_dict()
data = {field: d[field] for field in available_external_fields}
return json.dumps(data)

df["external_fields"] = df.apply(_coalesce, axis=1)
df = df[list(settings.FIBOA_FIELDS) + ["external_fields"]]

# Drop the geometry-metrics fields, we'll recalculate these.
Expand Down
58 changes: 58 additions & 0 deletions infra/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@

# Welcome to your CDK Python project!

This is a blank project for CDK development with Python.

The `cdk.json` file tells the CDK Toolkit how to execute your app.

This project is set up like a standard Python project. The initialization
process also creates a virtualenv within this project, stored under the `.venv`
directory. To create the virtualenv it assumes that there is a `python3`
(or `python` for Windows) executable in your path with access to the `venv`
package. If for any reason the automatic creation of the virtualenv fails,
you can create the virtualenv manually.

To manually create a virtualenv on MacOS and Linux:

```
$ python3 -m venv .venv
```

After the init process completes and the virtualenv is created, you can use the following
step to activate your virtualenv.

```
$ source .venv/bin/activate
```

If you are a Windows platform, you would activate the virtualenv like this:

```
% .venv\Scripts\activate.bat
```

Once the virtualenv is activated, you can install the required dependencies.

```
$ pip install -r requirements.txt
```

At this point you can now synthesize the CloudFormation template for this code.

```
$ cdk synth
```

To add additional dependencies, for example other CDK libraries, just add
them to your `setup.py` file and rerun the `pip install -r requirements.txt`
command.

## Useful commands

* `cdk ls` list all stacks in the app
* `cdk synth` emits the synthesized CloudFormation template
* `cdk deploy` deploy this stack to your default AWS account/region
* `cdk diff` compare deployed stack with current state
* `cdk docs` open CDK documentation

Enjoy!
25 changes: 25 additions & 0 deletions infra/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env python3
import os

import aws_cdk as cdk

from aws.fiboa_sda import FiboaSdaStack


app = cdk.App()
FiboaSdaStack(
app,
"FiboaSdaStackTesting",
# If you don't specify 'env', this stack will be environment-agnostic.
# Account/Region-dependent features and context lookups will not work,
# but a single synthesized template can be deployed anywhere.
# Uncomment the next line to specialize this stack for the AWS Account
# and Region that are implied by the current CLI configuration.
# env=cdk.Environment(account=os.getenv('CDK_DEFAULT_ACCOUNT'), region=os.getenv('CDK_DEFAULT_REGION')),
# Uncomment the next line if you know exactly what Account and Region you
# want to deploy the stack to. */
# env=cdk.Environment(account='123456789012', region='us-east-1'),
# For more information, see https://docs.aws.amazon.com/cdk/latest/guide/environments.html
)

app.synth()
Empty file added infra/aws/__init__.py
Empty file.
117 changes: 117 additions & 0 deletions infra/aws/fiboa_sda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import os

from aws_cdk import (
aws_ec2 as ec2,
aws_ecs as ecs,
aws_ecr_assets as ecr_assets,
aws_lambda as _lambda,
aws_lambda_event_sources as lambda_events,
aws_iam as iam,
aws_s3 as s3,
aws_batch as batch,
aws_s3_notifications,
aws_sns as sns,
Stack,
Duration,
Size,
)
from constructs import Construct


class FiboaSdaStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)

# Create a VPC for the batch fargate cluster
vpc = ec2.Vpc(self, "VPC")

# Create AWS Batch Job Queue
self.batch_queue = batch.JobQueue(self, "JobQueue")
fargate_spot_environment = batch.FargateComputeEnvironment(
self,
"FargateSpotEnv",
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_NAT
),
vpc=vpc,
spot=True,
)
self.batch_queue.add_compute_environment(fargate_spot_environment, 0)

# Task execution IAM role for Fargate
task_execution_role = iam.Role(
self,
"TaskExecutionRole",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AmazonECSTaskExecutionRolePolicy"
)
],
)

# image_asset = ecr_assets.DockerImageAsset(
# self, "MyImageAsset",
# directory=os.path.join(os.getcwd(), "..")
# )

# Create Job Definition to submit job in batch job queue.
batch.EcsJobDefinition(
self,
"MyJobDef",
container=batch.EcsFargateContainerDefinition(
self,
"FargateCDKJobDef",
image=ecs.ContainerImage.from_asset(
directory=os.path.join(os.getcwd(), "..")
),
command=["ingest-one"],
memory=Size.gibibytes(16),
cpu=2,
execution_role=task_execution_role,
),
)

# create lambda function
# todo - create an IAM role with access to batch.
# todo - inject environment
function = _lambda.Function(
self,
"fiboa-s3-listener",
runtime=_lambda.Runtime.FROM_IMAGE,
handler=_lambda.Handler.FROM_IMAGE,
architecture=_lambda.Architecture.ARM_64,
timeout=Duration.seconds(10),
code=_lambda.EcrImageCode.from_asset_image(
directory=os.path.join(os.getcwd(), "lambda-image"),
),
)

# add SNS event source
topic = sns.Topic.from_topic_arn(
self,
"source-coop-topic",
topic_arn="arn:aws:sns:us-west-2:417712557820:us-west-2-opendata-source-coop_new-object"
)
event_source = lambda_events.SnsEventSource(
topic,
filter_policy={
"S3Key": sns.SubscriptionFilter.string_filter(match_prefixes=['fiboa'])
}
)
function.add_event_source(event_source)


# # create s3 bucket
# # bucket = s3.Bucket(self, "fiboa-sda-testing")
# bucket = s3.Bucket.from_bucket_name(self, "source-coop-bucket", bucket_name="us-west-2.opendata.source.coop")

# # create s3 notification for lambda function
# # notification = aws_s3_notifications.LambdaDestination(function)

# # assign notification for the s3 event type (ex: OBJECT_CREATED)
# bucket.add_event_notification(
# s3.EventType.OBJECT_CREATED,
# notification,
# s3.NotificationKeyFilter(prefix="fiboa/*"),
# )
Loading