-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontroller.py
More file actions
111 lines (94 loc) · 5.73 KB
/
controller.py
File metadata and controls
111 lines (94 loc) · 5.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 Implemented
# Telnet functionality and introduced AWS SES Email Notification Feature. These enhancements are now applicable for
# both MariaDB and MySQL databases, catering to single-user scenarios.
# Maintainer Alon Shrestha.
import boto3
import logging
import core
import library
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def mainHandler(event, context):
"""
- This is the main method which is being called when lambda gets triggered.
# Functions:
- Gets the required parameters from event(i.e. SecretArn, ClientToken, Steps)
- Checks for Exception:
- Checks if Secret is enabled for rotation. Else Raise Error.
- Checks if the Received Token from Event is present in Secret Version. If not Raise Error.
- Checks if the Received Token is already present in Secret Stage "AWSCURRENT". Secret Already Set.
- Checks if the Received Token is not present in Secret Stage "AWSPENDING". Then Raise Error.
- After passing the above exception, Calls the required method defined on the required parameter Steps.
- Check for Exception:
- If the step is not provided or not matched. Raise Error.
The Secret is expected to be a JSON string with the following format:
{
'engine': <required: must be set to 'mariadb'>,
'host': <required: instance host name>,
'username': <required: username>,
'password': <required: password>,
'dbname': <optional: database name>,
'port': <optional: if not specified, default port 3306 will be used>
}
"""
logger.info("mainHandler -> Received Event: %s" % event)
arn = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
# Set up the client / Get SecretManager Session
secretClient = boto3.client('secretsmanager')
# Check if Rotation is Enabled for Secret.
metaData = secretClient.describe_secret(SecretId=arn)
if "RotationEnabled" in metaData and not metaData['RotationEnabled']:
subject = f"{library.subjectFailed} {metaData['Name']} On : {library.todayDate}"
message = f"Method: mainHandler -> Secret: {metaData['Name']}, Arn: {arn} -> is not enabled for rotation." \
f"\n\n<h4> <i>Automated 🚀 </i></h4>"
library.sendEmail(subject, message)
logger.error("mainHandler -> Secret %s is not enabled for rotation." % arn)
raise ValueError("mainHandler -> Secret %s is not enabled for rotation." % arn)
# Get Versions from Secret
versions = metaData['VersionIdsToStages']
# Check if token(ClientRequestToken) exist in Version.
if token not in versions:
subject = f"{library.subjectFailed} {metaData['Name']} On : {library.todayDate}"
message = f"Method: mainHandler -> Secret: {metaData['Name']}, Arn: {arn} -> ClientRequestToken: {token} " \
f"not found in Secret VersionIdsToStages: {metaData['VersionIdsToStages']}." \
f"\n\n<h4> <i>Automated 🚀 </i></h4>"
library.sendEmail(subject, message)
logger.error("mainHandler -> Secret version %s has no stage for rotation of secret %s." % (token, arn))
raise ValueError("mainHandler -> Secret version %s has no stage for rotation of secret %s." % (token, arn))
# If "AWSCURRENT" Stage has token(ClientRequestToken). Secret is already set. Return.
if "AWSCURRENT" in versions[token]:
subject = f"{library.subjectSuccess} {metaData['Name']} On : {library.todayDate}"
message = f"Method: mainHandler -> Secret: {metaData['Name']}, Arn: {arn} -> ClientRequestToken: {token} " \
f"already found on Secret Stage: AWSCURRENT. " \
f"Secret VersionIdsToStages: {metaData['VersionIdsToStages']}" \
f"\n\n<h4> <i>Automated 🚀 </i></h4>"
library.sendEmail(subject, message)
logger.info("mainHandler -> Secret version %s already set as AWSCURRENT for secret %s." % (token, arn))
return
# If "AWSPENDING" stage does not have token(ClientRequestToken). Error.
elif "AWSPENDING" not in versions[token]:
subject = f"{library.subjectFailed} {metaData['Name']} On : {library.todayDate}"
message = f"Method: mainHandler -> Secret: {metaData['Name']}, Arn: {arn} -> ClientRequestToken: {token} " \
f"not found in Secret Stage: AWSPENDING. VersionIdsToStages: {metaData['VersionIdsToStages']}." \
f"\n\n<h4> <i>Automated 🚀 </i></h4>"
library.sendEmail(subject, message)
logger.error("mainHandler -> Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
raise ValueError("mainHandler -> Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
# Call the appropriate step.
if step == "createSecret":
core.createSecret(secretClient, arn, token)
elif step == "setSecret":
core.setSecret(secretClient, arn, token, metaData)
elif step == "testSecret":
core.testSecret(secretClient, arn, token, metaData)
elif step == "finishSecret":
core.finishSecret(secretClient, arn, token)
else:
subject = f"{library.subjectFailed} {metaData['Name']} On : {library.todayDate}"
message = f"Method: mainHandler -> Secret: {metaData['Name']}, Arn: {arn} -> Invalid Step Parameter: {step}." \
f"\n\n<h4> <i>Automated 🚀 </i></h4>"
library.sendEmail(subject, message)
logger.error("mainHandler -> Invalid step parameter %s for secret %s" % (step, arn))
raise ValueError("mainHandler -> Invalid step parameter %s for secret %s" % (step, arn))