Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a2606e4
Generate Product IDs
jameslinnell Nov 29, 2024
95f706f
Check if IDs exist in generated product IDS
jameslinnell Nov 29, 2024
7584fee
init
jameslinnell Nov 29, 2024
9d4eb59
Add a test to confirm generated ID doesn't exist in Generated list
jameslinnell Nov 29, 2024
86f5f2f
update with PR suggestion
jameslinnell Dec 3, 2024
7c0e54f
Cache existing Ids
jameslinnell Dec 4, 2024
bd2780d
Add a custom flag to behave clear_screen=true
jameslinnell Dec 4, 2024
6d43351
[feature/PI-618-bulk_etl] Layer updates
jaklinger Dec 2, 2024
d4ccf50
[feature/PI-618-bulk_etl] fix up tests after tag changes
jaklinger Dec 3, 2024
e9ef71a
[feature/PI-618-bulk_etl] Lambda updates
jaklinger Dec 2, 2024
db7286d
[feature/PI-618-bulk_etl_lambdas] add test data paths
jaklinger Dec 3, 2024
6d20856
[feature/PI-618-bulk_etl_lambdas] archive old etl
jaklinger Dec 3, 2024
8aadf5a
[feature/PI-618-bulk_etl_lambdas] isort
jaklinger Dec 3, 2024
b13ee6a
[feature/PI-618-bulk_etl_lambdas] read dummy update lambdas
jaklinger Dec 3, 2024
43c3db6
[feature/PI-618-bulk_etl] End-to-end bulk ETL
jaklinger Dec 2, 2024
be34c91
[feature/PI-618-bulk_etl_e2e] isort
jaklinger Dec 3, 2024
9110137
[feature/PI-618-bulk_etl_e2e] add trigger tests
jaklinger Dec 3, 2024
87e3e54
[feature/PI-618-bulk_etl_e2e] re-add triggers
jaklinger Dec 3, 2024
fd0e2cf
[feature/PI-618-bulk_etl_e2e] add placeholder test
jaklinger Dec 3, 2024
4174e8a
[feature/PI-618-bulk_etl_e2e] rm empty line
jaklinger Dec 4, 2024
a47fc58
trigger ci
jaklinger Dec 4, 2024
96c1c29
[feature/PI-618-bulk_etl_e2e] remove comment and rename placeholder file
jaklinger Dec 5, 2024
7ebc04d
Release 2024-12-05 initial commit
jameslinnell Dec 5, 2024
1099bde
Merge branch 'feature/PI-631-generate_product_ids' into release/2024-…
jameslinnell Dec 5, 2024
6141b85
Merge branch 'feature/PI-695-Feature_test_clear_screen' into release/…
jameslinnell Dec 5, 2024
ea7d5bb
trigger ci
jaklinger Dec 5, 2024
572ad26
[feature/PI-618-bulk_etl_e2e] add grace period to bulk test
jaklinger Dec 6, 2024
9071765
Merge branch 'feature/PI-618-bulk_etl_e2e' into release/2024-12-05
jaklinger Dec 6, 2024
17f55c4
[release/2024-12-05] update changelog
jaklinger Dec 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ repos:
rev: v1.4.0
hooks:
- id: detect-secrets
exclude: ".pre-commit-config.yaml|infrastructure/localstack/provider.tf|src/etl/sds/tests/changelog"
exclude: ".pre-commit-config.yaml|infrastructure/localstack/provider.tf|src/etl/sds/tests/changelog|src/etl/sds/worker/bulk/transform_bulk/tests|src/etl/sds/worker/bulk/tests/stage_data"

- repo: https://github.com/prettier/pre-commit
rev: 57f39166b5a5a504d6808b87ab98d41ebf095b46
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 2024-12-05
- [PI-631] Generate Product Ids
- [PI-691] Allow devs to clear terminal after each Feature Test
- [PI-618] Bulk ETL

## 2024-12-02
- [PI-572] Create an AS Device
- [PI-582] AS Additional Interations smoke test
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
4. [Workflow](#workflow)
5. [Swagger](#swagger)
6. [ETL](#etl)
7. [Administration](#administration)

---

Expand Down Expand Up @@ -429,6 +430,18 @@ and
make etl--clear-state WORKSPACE=dev SET_CHANGELOG_NUMBER=540210
```

## Administration

### Generating Ids

In order to generate a persistent list of Ids across environments then run... (The example given will generate 100 ids.)

```
make admin--generate-ids--product SET_GENERATOR_COUNT=100
```

Any previously generated ids will not be overwritten.

### Documentation

We have several locations for the Swagger to keep things as visible as possible
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2024.12.02
2024.12.05
3 changes: 3 additions & 0 deletions changelog/2024-12-05.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- [PI-631] Generate Product Ids
- [PI-691] Allow devs to clear terminal after each Feature Test
- [PI-618] Bulk ETL
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,43 @@
"BackoffRate": 2
}
],
"Next": "transform-result"
},
"transform-result": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.error_message",
"IsNull": true,
"Next": "load-fanout"
}
],
"Default": "pass"
},
"pass": {
"Type": "Pass",
"End": true
},
"load-fanout": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "${load_fanout_worker_arn}:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Next": "Map"
},
"Map": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"OutputPath": "$.Payload",
"Parameters": {
"Payload": {},
"FunctionName": "${extract_worker_arn}:$LATEST"
"FunctionName": "${extract_worker_bulk_arn}:$LATEST"
},
"Retry": [
{
Expand Down Expand Up @@ -151,7 +151,7 @@
"OutputPath": "$.Payload",
"Parameters": {
"Payload": {},
"FunctionName": "${extract_worker_arn}:$LATEST"
"FunctionName": "${extract_worker_update_arn}:$LATEST"
},
"Retry": [
{
Expand Down
113 changes: 104 additions & 9 deletions infrastructure/terraform/per_workspace/modules/etl/sds/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ module "sds_layer" {
}
}

module "worker_extract" {
source = "./worker/"
module "worker_extract_bulk" {
source = "./worker"

etl_stage = "extract"
etl_stage = "extract_bulk"
etl_type = "bulk"
etl_name = local.etl_name
assume_account = var.assume_account
workspace_prefix = var.workspace_prefix
Expand Down Expand Up @@ -80,10 +81,47 @@ module "worker_extract" {

}

module "worker_extract_update" {
source = "./worker"

etl_stage = "extract_update"
etl_type = "update"
etl_name = local.etl_name
assume_account = var.assume_account
workspace_prefix = var.workspace_prefix
python_version = var.python_version
etl_bucket_name = module.bucket.s3_bucket_id
layers = [var.event_layer_arn, var.third_party_core_layer_arn, module.etl_layer.lambda_layer_arn, module.sds_layer.lambda_layer_arn, var.domain_layer_arn]

policy_json = <<-EOT
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:PutObject",
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObjectVersionTagging"
],
"Effect": "Allow",
"Resource": ["${module.bucket.s3_bucket_arn}", "${module.bucket.s3_bucket_arn}/*"]
}
]
}
EOT

}


module "worker_transform_bulk" {
source = "./worker/"
source = "./worker"

etl_stage = "transform_bulk"
etl_type = "bulk"
etl_name = local.etl_name
assume_account = var.assume_account
workspace_prefix = var.workspace_prefix
Expand Down Expand Up @@ -111,6 +149,13 @@ module "worker_transform_bulk" {
"Effect": "Allow",
"Resource": ["${module.bucket.s3_bucket_arn}", "${module.bucket.s3_bucket_arn}/*"]
},
{
"Action": [
"dynamodb:Query"
],
"Effect": "Allow",
"Resource": ["${var.table_arn}", "${var.table_arn}/*"]
},
{
"Action": [
"kms:Decrypt"
Expand All @@ -125,9 +170,10 @@ module "worker_transform_bulk" {
}

module "worker_transform_update" {
source = "./worker/"
source = "./worker"

etl_stage = "transform_update"
etl_type = "update"
etl_name = local.etl_name
assume_account = var.assume_account
workspace_prefix = var.workspace_prefix
Expand Down Expand Up @@ -174,12 +220,55 @@ module "worker_transform_update" {
EOT

}
module "worker_load_bulk_fanout" {
source = "./worker"

etl_stage = "load_bulk_fanout"
etl_type = "bulk"
etl_name = local.etl_name
assume_account = var.assume_account
workspace_prefix = var.workspace_prefix
python_version = var.python_version
etl_bucket_name = module.bucket.s3_bucket_id
layers = [var.event_layer_arn, var.third_party_core_layer_arn, module.etl_layer.lambda_layer_arn, module.sds_layer.lambda_layer_arn, var.domain_layer_arn]

policy_json = <<-EOT
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:PutObject",
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObjectVersionTagging"
],
"Effect": "Allow",
"Resource": ["${module.bucket.s3_bucket_arn}", "${module.bucket.s3_bucket_arn}/*"]
},
{
"Action": [
"kms:Decrypt"
],
"Effect": "Allow",
"Resource": ["*"]
}
]
}
EOT

}



module "worker_load_bulk" {
source = "./worker/"
source = "./worker"

etl_stage = "load_bulk"
etl_type = "bulk"
etl_name = local.etl_name
assume_account = var.assume_account
workspace_prefix = var.workspace_prefix
Expand Down Expand Up @@ -227,9 +316,10 @@ module "worker_load_bulk" {
}

module "worker_load_update" {
source = "./worker/"
source = "./worker"

etl_stage = "load_update"
etl_type = "update"
etl_name = local.etl_name
assume_account = var.assume_account
workspace_prefix = var.workspace_prefix
Expand Down Expand Up @@ -279,9 +369,10 @@ module "worker_load_update" {
}

module "worker_load_bulk_reduce" {
source = "./worker/"
source = "./worker"

etl_stage = "load_bulk_reduce"
etl_type = "bulk"
etl_name = local.etl_name
assume_account = var.assume_account
workspace_prefix = var.workspace_prefix
Expand Down Expand Up @@ -321,6 +412,7 @@ module "bulk_transform_and_load_step_function" {
"${path.module}/etl-diagram--bulk-transform-and-load.asl.json",
{
transform_worker_arn = module.worker_transform_bulk.arn
load_fanout_worker_arn = module.worker_load_bulk_fanout.arn
load_worker_arn = module.worker_load_bulk.arn
load_reduce_worker_arn = module.worker_load_bulk_reduce.arn
bulk_load_chunksize = var.bulk_load_chunksize
Expand All @@ -332,9 +424,11 @@ module "bulk_transform_and_load_step_function" {
lambda = {
lambda = [
module.worker_transform_bulk.arn,
module.worker_load_bulk_fanout.arn,
module.worker_load_bulk.arn,
module.worker_load_bulk_reduce.arn,
"${module.worker_transform_bulk.arn}:*",
"${module.worker_load_bulk_fanout.arn}:*",
"${module.worker_load_bulk.arn}:*",
"${module.worker_load_bulk_reduce.arn}:*"

Expand Down Expand Up @@ -404,7 +498,8 @@ resource "aws_sfn_state_machine" "state_machine" {
definition = templatefile(
"${path.module}/etl-diagram.asl.json",
{
extract_worker_arn = module.worker_extract.arn
extract_worker_bulk_arn = module.worker_extract_bulk.arn
extract_worker_update_arn = module.worker_extract_update.arn
notify_arn = module.notify.arn
etl_bucket = module.bucket.s3_bucket_id
changelog_key = var.changelog_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ output "etl_state_lock_enforcer" {
output "manual_trigger_arn" {
value = module.trigger_manual.arn
}

output "bulk_load_lambda_arn" {
value = module.worker_load_bulk.arn
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ locals {
lambda = {
actions = ["lambda:InvokeFunction"]
resources = [
module.worker_extract.arn,
module.worker_extract_bulk.arn,
module.worker_extract_update.arn,
module.worker_transform_bulk.arn,
module.worker_transform_update.arn,
module.worker_load_bulk.arn,
module.worker_load_update.arn,
module.notify.arn,
"${module.worker_extract.arn}:*",
"${module.worker_extract_bulk.arn}:*",
"${module.worker_extract_update.arn}:*",
"${module.worker_transform_bulk.arn}:*",
"${module.worker_transform_update.arn}:*",
"${module.worker_load_bulk.arn}:*",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ variable "changelog_key" {
default = "changelog-number"
}
variable "bulk_transform_chunksize" {
default = 500
default = 100000
}

variable "bulk_load_chunksize" {
default = 1000 # needs to be larger than 'bulk_transform_chunksize' in the case of overflow
default = 1000000 # needs to be larger than 'bulk_transform_chunksize' in the case of overflow
}

variable "etl_state_lock_key" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module "lambda_function" {

function_name = "${var.workspace_prefix}--${var.etl_name}--${var.etl_stage}"
description = "${replace(var.workspace_prefix, "_", "-")} ${var.etl_name} (${var.etl_stage}) lambda function"
handler = "etl.sds.worker.${var.etl_stage}.${var.etl_stage}.handler"
handler = "etl.sds.worker.${var.etl_type}.${var.etl_stage}.${var.etl_stage}.handler"
runtime = var.python_version
timeout = 600
memory_size = 10240
Expand All @@ -26,7 +26,7 @@ module "lambda_function" {


create_package = false
local_existing_package = "${path.root}/../../../src/etl/sds/worker/${var.etl_stage}/dist/${var.etl_stage}.zip"
local_existing_package = "${path.root}/../../../src/etl/sds/worker/${var.etl_type}/${var.etl_stage}/dist/${var.etl_stage}.zip"

tags = {
Name = "${var.workspace_prefix}--${var.etl_name}--${var.etl_stage}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ variable "etl_name" {
type = string
}

variable "etl_type" {
type = string
}

variable "etl_stage" {
type = string
}
Expand Down
Loading
Loading