Skip to content

Commit 0de54db

Browse files
committed
[sdlf-stage-lambda] remove pDataset parameter
it was wrongly used in place of pBucketPrefix and unnecessarily used in resource names
1 parent 6987197 commit 0de54db

File tree

6 files changed

+41
-57
lines changed

6 files changed

+41
-57
lines changed

sdlf-stage-lambda/src/awslambda.yaml

Lines changed: 29 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,6 @@ Parameters:
6464
Description: Analytics bucket
6565
Type: String
6666
Default: "" # if not provided, pStorageDeploymentInstance must be specified
67-
pDataset:
68-
Description: The name of the dataset (all lowercase, no symbols or spaces)
69-
Type: String
70-
AllowedPattern: "[a-z0-9]{2,14}"
7167
pStageEnabled:
7268
Description: Whether the stage is enabled or not
7369
Type: String
@@ -146,7 +142,7 @@ Globals:
146142
- "{{resolve:ssm:/SDLF/Lambda/LatestDatalakeLibraryLayer}}"
147143
Environment:
148144
Variables:
149-
DATASET: !Ref pDataset
145+
S3_PREFIX: !If [FetchFromDatasetSsm, !Sub "{{resolve:ssm:/sdlf/dataset/rS3Prefix/${pDatasetDeploymentInstance}}}", !Ref pBucketPrefix]
150146
DEPLOYMENT_INSTANCE: !Ref pDeploymentInstance
151147
STORAGE_DEPLOYMENT_INSTANCE: !Ref pStorageDeploymentInstance
152148
DATASET_DEPLOYMENT_INSTANCE: !Ref pDatasetDeploymentInstance
@@ -170,7 +166,6 @@ Resources:
170166
pInfraKmsKey: !Ref pInfraKmsKey
171167
pEventBus: !Ref pEventBus
172168
pScheduleGroup: !Ref pScheduleGroup
173-
pDataset: !Ref pDataset
174169
pStageEnabled: !Ref pStageEnabled
175170
pTriggerType: !Ref pTriggerType
176171
pSchedule: !Ref pSchedule
@@ -181,7 +176,7 @@ Resources:
181176
rLambdaCommonPolicy:
182177
Type: AWS::IAM::ManagedPolicy
183178
Properties:
184-
Path: !Sub /sdlf-${pDataset}/
179+
Path: !Sub /sdlf-${pDeploymentInstance}/
185180
PolicyDocument:
186181
Version: "2012-10-17"
187182
Statement:
@@ -191,7 +186,7 @@ Resources:
191186
- logs:CreateLogStream
192187
- logs:PutLogEvents
193188
Resource:
194-
- !Sub arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/sdlf-${pDataset}-${pDeploymentInstance}-*
189+
- !Sub arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/sdlf-${pDeploymentInstance}-*
195190
- Effect: Allow
196191
Action:
197192
- ssm:GetParameter
@@ -227,7 +222,7 @@ Resources:
227222
rRoleLambdaExecutionProcessingStep:
228223
Type: AWS::IAM::Role
229224
Properties:
230-
Path: !Sub /sdlf-${pDataset}/
225+
Path: !Sub /sdlf-${pDeploymentInstance}/
231226
# PermissionsBoundary: !Sub "{{resolve:ssm:/SDLF/IAM/${pDataset}/TeamPermissionsBoundary}}"
232227
ManagedPolicyArns:
233228
- !Ref rLambdaCommonPolicy
@@ -243,7 +238,7 @@ Resources:
243238
Service: lambda.amazonaws.com
244239
Action: sts:AssumeRole
245240
Policies:
246-
- PolicyName: !Sub sdlf-${pDataset}-${pDeploymentInstance}-process
241+
- PolicyName: !Sub sdlf-${pDeploymentInstance}-process
247242
PolicyDocument:
248243
Version: "2012-10-17"
249244
Statement:
@@ -259,15 +254,15 @@ Resources:
259254
Action:
260255
- s3:GetObject
261256
Resource:
262-
- !If [FetchFromStorageSsm, !Sub "arn:${AWS::Partition}:s3:::{{resolve:ssm:/sdlf/storage/rRawBucket/${pStorageDeploymentInstance}}}/${pDataset}/*", !Sub "arn:${AWS::Partition}:s3:::${pRawBucket}/${pDataset}/*"]
263-
- !If [FetchFromStorageSsm, !Sub "arn:${AWS::Partition}:s3:::{{resolve:ssm:/sdlf/storage/rStageBucket/${pStorageDeploymentInstance}}}/${pDataset}/*", !Sub "arn:${AWS::Partition}:s3:::${pStageBucket}/${pDataset}/*"]
264-
- !If [FetchFromStorageSsm, !Sub "arn:${AWS::Partition}:s3:::{{resolve:ssm:/sdlf/storage/rAnalyticsBucket/${pStorageDeploymentInstance}}}/${pDataset}/*", !Sub "arn:${AWS::Partition}:s3:::${pAnalyticsBucket}/${pDataset}/*"]
257+
- !If [FetchFromStorageSsm, !Sub "arn:${AWS::Partition}:s3:::{{resolve:ssm:/sdlf/storage/rRawBucket/${pStorageDeploymentInstance}}}/{{resolve:ssm:/sdlf/dataset/rS3Prefix/${pDatasetDeploymentInstance}}}/*", !Sub "arn:${AWS::Partition}:s3:::${pRawBucket}/${pBucketPrefix}/*"]
258+
- !If [FetchFromStorageSsm, !Sub "arn:${AWS::Partition}:s3:::{{resolve:ssm:/sdlf/storage/rStageBucket/${pStorageDeploymentInstance}}}/{{resolve:ssm:/sdlf/dataset/rS3Prefix/${pDatasetDeploymentInstance}}}/*", !Sub "arn:${AWS::Partition}:s3:::${pStageBucket}/${pBucketPrefix}/*"]
259+
- !If [FetchFromStorageSsm, !Sub "arn:${AWS::Partition}:s3:::{{resolve:ssm:/sdlf/storage/rAnalyticsBucket/${pStorageDeploymentInstance}}}/{{resolve:ssm:/sdlf/dataset/rS3Prefix/${pDatasetDeploymentInstance}}}/*", !Sub "arn:${AWS::Partition}:s3:::${pAnalyticsBucket}/${pBucketPrefix}/*"]
265260
- Effect: Allow
266261
Action:
267262
- s3:PutObject
268263
Resource:
269-
- !If [FetchFromStorageSsm, !Sub "arn:${AWS::Partition}:s3:::{{resolve:ssm:/sdlf/storage/rStageBucket/${pStorageDeploymentInstance}}}/${pDataset}/*", !Sub "arn:${AWS::Partition}:s3:::${pStageBucket}/${pDataset}/*"]
270-
- !If [FetchFromStorageSsm, !Sub "arn:${AWS::Partition}:s3:::{{resolve:ssm:/sdlf/storage/rAnalyticsBucket/${pStorageDeploymentInstance}}}/${pDataset}/*", !Sub "arn:${AWS::Partition}:s3:::${pAnalyticsBucket}/${pDataset}/*"]
264+
- !If [FetchFromStorageSsm, !Sub "arn:${AWS::Partition}:s3:::{{resolve:ssm:/sdlf/storage/rStageBucket/${pStorageDeploymentInstance}}}/{{resolve:ssm:/sdlf/dataset/rS3Prefix/${pDatasetDeploymentInstance}}}/*", !Sub "arn:${AWS::Partition}:s3:::${pStageBucket}/${pBucketPrefix}/*"]
265+
- !If [FetchFromStorageSsm, !Sub "arn:${AWS::Partition}:s3:::{{resolve:ssm:/sdlf/storage/rAnalyticsBucket/${pStorageDeploymentInstance}}}/{{resolve:ssm:/sdlf/dataset/rS3Prefix/${pDatasetDeploymentInstance}}}/*", !Sub "arn:${AWS::Partition}:s3:::${pAnalyticsBucket}/${pBucketPrefix}/*"]
271266
- Effect: Allow
272267
Action:
273268
- kms:DescribeKey
@@ -285,7 +280,7 @@ Resources:
285280
rRoleLambdaExecutionRoutingStep:
286281
Type: AWS::IAM::Role
287282
Properties:
288-
Path: !Sub /sdlf-${pDataset}/
283+
Path: !Sub /sdlf-${pDeploymentInstance}/
289284
# PermissionsBoundary: !Sub "{{resolve:ssm:/SDLF/IAM/${pDataset}/TeamPermissionsBoundary}}"
290285
ManagedPolicyArns:
291286
- !Ref rLambdaCommonPolicy
@@ -301,7 +296,7 @@ Resources:
301296
Service: lambda.amazonaws.com
302297
Action: sts:AssumeRole
303298
Policies:
304-
- PolicyName: !Sub sdlf-${pDataset}-${pDeploymentInstance}-routing
299+
- PolicyName: !Sub sdlf-${pDeploymentInstance}-routing
305300
PolicyDocument:
306301
Version: "2012-10-17"
307302
Statement:
@@ -321,8 +316,8 @@ Resources:
321316
- sqs:ReceiveMessage
322317
- sqs:SendMessage
323318
Resource:
324-
- !Sub arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:sdlf-${pDataset}-${pDeploymentInstance}-queue-*
325-
- !Sub arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:sdlf-${pDataset}-${pDeploymentInstance}-dlq-*
319+
- !Sub arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:sdlf-${pDeploymentInstance}-queue.fifo
320+
- !Sub arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:sdlf-${pDeploymentInstance}-dlq.fifo
326321

327322
rLambdaRoutingStep:
328323
Type: AWS::Serverless::Function
@@ -333,7 +328,7 @@ Resources:
333328
reason: Permissions to write CloudWatch Logs are granted by rLambdaCommonPolicy
334329
Properties:
335330
CodeUri: ./lambda/routing/src
336-
FunctionName: !Sub sdlf-${pDataset}-${pDeploymentInstance}-routing
331+
FunctionName: !Sub sdlf-${pDeploymentInstance}-routing
337332
Description: Checks if items are to be processed and route them to state machine
338333
Environment:
339334
Variables:
@@ -357,7 +352,7 @@ Resources:
357352
Name: !Sub /sdlf/pipeline/rLambdaRoutingStep/${pDeploymentInstance}
358353
Type: String
359354
Value: !GetAtt rLambdaRoutingStep.Arn
360-
Description: !Sub "ARN of the ${pDataset} ${pDeploymentInstance} Routing Lambda" # TODO
355+
Description: !Sub "ARN of the ${pDeploymentInstance} Routing Lambda" # TODO
361356

362357
rLambdaRedriveStep:
363358
Type: AWS::Serverless::Function
@@ -368,7 +363,7 @@ Resources:
368363
reason: Permissions to write CloudWatch Logs are granted by rLambdaCommonPolicy
369364
Properties:
370365
CodeUri: ./lambda/redrive/src
371-
FunctionName: !Sub sdlf-${pDataset}-${pDeploymentInstance}-redrive
366+
FunctionName: !Sub sdlf-${pDeploymentInstance}-redrive
372367
Description: Redrives Failed messages to the routing queue
373368
MemorySize: 192
374369
Timeout: 300
@@ -392,7 +387,7 @@ Resources:
392387
reason: Permissions to write CloudWatch Logs are granted by rLambdaCommonPolicy
393388
Properties:
394389
CodeUri: ./lambda/process-object/src
395-
FunctionName: !Sub sdlf-${pDataset}-${pDeploymentInstance}-process
390+
FunctionName: !Sub sdlf-${pDeploymentInstance}-process
396391
Description: Processing pipeline
397392
MemorySize: 1536
398393
Timeout: 600
@@ -411,7 +406,7 @@ Resources:
411406
rRoleLambdaExecutionMetadataStep:
412407
Type: AWS::IAM::Role
413408
Properties:
414-
Path: !Sub /sdlf-${pDataset}/
409+
Path: !Sub /sdlf-${pDeploymentInstance}/
415410
# PermissionsBoundary: "{{resolve:ssm:/SDLF/IAM/${pDataset}/TeamPermissionsBoundary}}"
416411
ManagedPolicyArns:
417412
- !Ref rLambdaCommonPolicy
@@ -436,7 +431,7 @@ Resources:
436431
reason: Permissions to write CloudWatch Logs are granted by rLambdaCommonPolicy
437432
Properties:
438433
CodeUri: ./lambda/postupdate-metadata/src
439-
FunctionName: !Sub sdlf-${pDataset}-${pDeploymentInstance}-postupdate
434+
FunctionName: !Sub sdlf-${pDeploymentInstance}-postupdate
440435
Description: Post-Update the metadata in the DynamoDB Catalog table
441436
MemorySize: 192
442437
Timeout: 300
@@ -455,7 +450,7 @@ Resources:
455450
rRoleLambdaExecutionErrorStep:
456451
Type: AWS::IAM::Role
457452
Properties:
458-
Path: !Sub /sdlf-${pDataset}/
453+
Path: !Sub /sdlf-${pDeploymentInstance}/
459454
# PermissionsBoundary: !Sub "{{resolve:ssm:/SDLF/IAM/${pDataset}/TeamPermissionsBoundary}}"
460455
ManagedPolicyArns:
461456
- !Ref rLambdaCommonPolicy
@@ -471,7 +466,7 @@ Resources:
471466
Service: lambda.amazonaws.com
472467
Action: sts:AssumeRole
473468
Policies:
474-
- PolicyName: !Sub sdlf-${pDataset}-${pDeploymentInstance}-error
469+
- PolicyName: !Sub sdlf-${pDeploymentInstance}-error
475470
PolicyDocument:
476471
Version: "2012-10-17"
477472
Statement:
@@ -486,7 +481,7 @@ Resources:
486481
- sqs:ReceiveMessage
487482
- sqs:SendMessage
488483
Resource:
489-
- !Sub arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:sdlf-${pDataset}-${pDeploymentInstance}-dlq-*
484+
- !Sub arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:sdlf-${pDeploymentInstance}-dlq.fifo
490485

491486
rLambdaErrorStep:
492487
Type: AWS::Serverless::Function
@@ -497,7 +492,7 @@ Resources:
497492
reason: Permissions to write CloudWatch Logs are granted by rLambdaCommonPolicy
498493
Properties:
499494
CodeUri: ./lambda/error/src
500-
FunctionName: !Sub sdlf-${pDataset}-${pDeploymentInstance}-error
495+
FunctionName: !Sub sdlf-${pDeploymentInstance}-error
501496
Description: Fallback lambda to handle messages which failed processing
502497
MemorySize: 192
503498
Timeout: 300
@@ -521,7 +516,7 @@ Resources:
521516
- id: W11
522517
reason: The actions with "*" are all ones that do not have resource limitations associated with them
523518
Properties:
524-
Path: !Sub /sdlf-${pDataset}/
519+
Path: !Sub /sdlf-${pDeploymentInstance}/
525520
# PermissionsBoundary: !Sub "{{resolve:ssm:/SDLF/IAM/${pDataset}/TeamPermissionsBoundary}}"
526521
AssumeRolePolicyDocument:
527522
Version: "2012-10-17"
@@ -535,14 +530,14 @@ Resources:
535530
StringEquals:
536531
"aws:SourceAccount": !Sub ${AWS::AccountId}
537532
Policies:
538-
- PolicyName: !Sub sdlf-${pDataset}-${pDeploymentInstance}-sm
533+
- PolicyName: !Sub sdlf-${pDeploymentInstance}-sm
539534
PolicyDocument:
540535
Version: "2012-10-17"
541536
Statement:
542537
- Effect: Allow
543538
Action:
544539
- lambda:InvokeFunction
545-
Resource: !Sub arn:${AWS::Partition}:lambda:${AWS::Region}:${AWS::AccountId}:function:sdlf-${pDataset}-* # TODO explicit ARNs
540+
Resource: !Sub arn:${AWS::Partition}:lambda:${AWS::Region}:${AWS::AccountId}:function:sdlf-${pDeploymentInstance}-* # TODO explicit ARNs
546541
- Effect: Allow
547542
Action:
548543
- xray:PutTraceSegments # W11 exception
@@ -571,7 +566,7 @@ Resources:
571566
rStateMachine:
572567
Type: AWS::Serverless::StateMachine
573568
Properties:
574-
Name: !Sub sdlf-${pDataset}-${pDeploymentInstance}-sm
569+
Name: !Sub sdlf-${pDeploymentInstance}-sm
575570
DefinitionUri: ./state-machine/stage-lambda.asl.json
576571
DefinitionSubstitutions:
577572
lPostMetadata: !GetAtt rLambdaPostMetadataStep.Arn
@@ -587,7 +582,7 @@ Resources:
587582
Name: !Sub /sdlf/pipeline/rStateMachine/${pDeploymentInstance}
588583
Type: String
589584
Value: !Ref rStateMachine
590-
Description: !Sub "ARN of the ${pDataset} ${pDeploymentInstance} State Machine" # TODO
585+
Description: !Sub "ARN of the ${pDeploymentInstance} State Machine" # TODO
591586

592587
Outputs:
593588
oPipelineReference:

sdlf-stage-lambda/src/lambda/error/src/lambda_function.py

100755100644
Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66
from datalake_library.sdlf import SQSConfiguration
77

88
logger = init_logger(__name__)
9-
dataset = os.environ["DATASET"]
10-
pipeline = os.environ["PIPELINE"]
11-
pipeline_stage = os.environ["PIPELINE_STAGE"]
129
deployment_instance = os.environ["DEPLOYMENT_INSTANCE"]
1310

1411

sdlf-stage-lambda/src/lambda/postupdate-metadata/src/lambda_function.py

100755100644
Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@
44
from datalake_library.sdlf import PipelineExecutionHistoryAPI
55

66
logger = init_logger(__name__)
7-
dataset = os.environ["DATASET"]
8-
pipeline = os.environ["PIPELINE"]
9-
pipeline_stage = os.environ["PIPELINE_STAGE"]
107
deployment_instance = os.environ["DEPLOYMENT_INSTANCE"]
118
peh_table_instance = os.environ["DATASET_DEPLOYMENT_INSTANCE"]
129
manifests_table_instance = os.environ["DATASET_DEPLOYMENT_INSTANCE"]
@@ -42,7 +39,7 @@ def lambda_handler(event, context):
4239

4340
if not partial_failure:
4441
pipeline_execution.update_pipeline_execution(
45-
status=f"{pipeline}-{pipeline_stage} {component} Processing", component=component
42+
status=f"{deployment_instance} {component} Processing", component=component
4643
)
4744
pipeline_execution.end_pipeline_execution_success()
4845
else:
@@ -51,6 +48,6 @@ def lambda_handler(event, context):
5148
except Exception as e:
5249
logger.error("Fatal error", exc_info=True)
5350
pipeline_execution.end_pipeline_execution_failed(
54-
component=component, issue_comment=f"{pipeline}-{pipeline_stage} {component} Error: {repr(e)}"
51+
component=component, issue_comment=f"{deployment_instance} {component} Error: {repr(e)}"
5552
)
5653
raise e

sdlf-stage-lambda/src/lambda/process-object/src/lambda_function.py

100755100644
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@
1010
)
1111

1212
logger = init_logger(__name__)
13-
dataset = os.environ["DATASET"]
14-
pipeline = os.environ["PIPELINE"]
15-
pipeline_stage = os.environ["PIPELINE_STAGE"]
13+
s3_prefix = os.environ["S3_PREFIX"]
14+
deployment_instance = os.environ["DEPLOYMENT_INSTANCE"]
1615
storage_deployment_instance = os.environ["STORAGE_DEPLOYMENT_INSTANCE"]
1716

1817

@@ -53,7 +52,7 @@ def parse(json_data):
5352

5453
# Uploading file to Stage bucket at appropriate path
5554
# IMPORTANT: Build the output s3_path without the s3://stage-bucket/
56-
s3_path = f"{dataset}/{pipeline}/{pipeline_stage}/{PurePath(output_path).name}"
55+
s3_path = f"{s3_prefix}/{deployment_instance}/{PurePath(output_path).name}"
5756
# IMPORTANT: Notice "stage_bucket" not "bucket"
5857
kms_key = KMSConfiguration(instance=storage_deployment_instance).data_kms_key
5958
s3_interface.upload_object(output_path, stage_bucket, s3_path, kms_key=kms_key)

sdlf-stage-lambda/src/lambda/redrive/src/lambda_function.py

100755100644
File mode changed.

sdlf-stage-lambda/src/lambda/routing/src/lambda_function.py

100755100644
Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,8 @@
1212
)
1313

1414
logger = init_logger(__name__)
15-
dataset = os.environ["DATASET"]
16-
pipeline = os.environ["PIPELINE"]
17-
pipeline_stage = os.environ["PIPELINE_STAGE"]
1815
deployment_instance = os.environ["DEPLOYMENT_INSTANCE"]
19-
peh_table_instance = os.environ["DATASET_DEPLOYMENT_INSTANCE"]
20-
manifests_table_instance = os.environ["DATASET_DEPLOYMENT_INSTANCE"]
16+
dataset_deployment_instance = peh_table_instance = manifests_table_instance = os.environ["DATASET_DEPLOYMENT_INSTANCE"]
2117

2218

2319
def serializer(obj):
@@ -31,8 +27,8 @@ def serializer(obj):
3127

3228
def pipeline_start(pipeline_execution, event):
3329
peh_id = pipeline_execution.start_pipeline_execution(
34-
pipeline_name=f"{pipeline}-{pipeline_stage}",
35-
dataset_name=f"{dataset}",
30+
pipeline_name=deployment_instance,
31+
dataset_name=dataset_deployment_instance,
3632
comment=event, # TODO test maximum size
3733
)
3834
logger.info(f"peh_id: {peh_id}")
@@ -53,14 +49,14 @@ def get_source_records(event):
5349
logger.info("Stage trigger: event-schedule")
5450
min_items_to_process = 1
5551
max_items_to_process = 100
56-
logger.info(f"Pipeline is {pipeline}, stage is {pipeline_stage}")
52+
logger.info(f"Pipeline stage is {deployment_instance}")
5753
logger.info(
5854
f"Pipeline stage configuration: min_items_to_process {min_items_to_process}, max_items_to_process {max_items_to_process}"
5955
)
6056

6157
sqs_config = SQSConfiguration(instance=deployment_instance)
6258
queue_interface = SQSInterface(sqs_config.stage_queue)
63-
logger.info(f"Querying {dataset} {pipeline}-{pipeline_stage} objects waiting for processing")
59+
logger.info(f"Querying {deployment_instance} objects waiting for processing")
6460
messages = queue_interface.receive_min_max_messages(min_items_to_process, max_items_to_process)
6561
logger.info(f"{len(messages)} Objects ready for processing")
6662

@@ -115,7 +111,7 @@ def lambda_handler(event, context):
115111
state_config.stage_state_machine, json.dumps(records, default=serializer)
116112
)
117113
pipeline_execution.update_pipeline_execution(
118-
status=f"{pipeline}-{pipeline_stage} Transform Processing", component="Transform"
114+
status=f"{deployment_instance} Transform Processing", component="Transform"
119115
)
120116
else:
121117
logger.info("Nothing to process, exiting pipeline")
@@ -126,6 +122,6 @@ def lambda_handler(event, context):
126122
component = context.function_name.split("-")[-2].title()
127123
pipeline_execution.end_pipeline_execution_failed(
128124
component=component,
129-
issue_comment=f"{pipeline}-{pipeline_stage} {component} Error: {repr(e)}",
125+
issue_comment=f"{deployment_instance} {component} Error: {repr(e)}",
130126
)
131127
raise e

0 commit comments

Comments
 (0)