diff --git a/.amazonq/rules/structure.md b/.amazonq/rules/structure.md index e529e977..1c277de6 100644 --- a/.amazonq/rules/structure.md +++ b/.amazonq/rules/structure.md @@ -5,7 +5,7 @@ - **app.py** - Main CDK application entry point for Jira integration - **app_service_now.py** - CDK application entry point for ServiceNow integration - **app_slack.py** - CDK application entry point for Slack integration -- **deploy-integrations-solution.py** - CLI deployment script with integration parameters +- **deploy_integrations_solution.py** - CLI deployment script with integration parameters - **cdk.json** - CDK configuration and feature flags - **requirements.txt** - Python dependencies (includes asset-specific requirements) - **requirements-dev.txt** - Development and testing dependencies diff --git a/.amazonq/rules/tech.md b/.amazonq/rules/tech.md index faafbf90..142c6898 100644 --- a/.amazonq/rules/tech.md +++ b/.amazonq/rules/tech.md @@ -34,9 +34,9 @@ pip install -r requirements-dev.txt # optional dev dependencies ### Deployment ```bash -./deploy-integrations-solution.py jira --email user@example.com --url https://example.atlassian.net --token TOKEN --project-key PROJ -./deploy-integrations-solution.py service-now --instance-id INSTANCE --username USER --password PASS -./deploy-integrations-solution.py slack --bot-token xoxb-TOKEN --signing-secret SECRET --workspace-id WORKSPACE +./deploy_integrations_solution.py jira --email user@example.com --url https://example.atlassian.net --token TOKEN --project-key PROJ +./deploy_integrations_solution.py service-now --instance-id INSTANCE --username USER --password PASS +./deploy_integrations_solution.py slack --bot-token xoxb-TOKEN --signing-secret SECRET --workspace-id WORKSPACE ``` ### Testing and Quality diff --git a/.kiro/steering/structure.md b/.kiro/steering/structure.md index 0e8e5f66..a4fc069e 100644 --- a/.kiro/steering/structure.md +++ b/.kiro/steering/structure.md @@ -4,7 +4,7 @@ - **app.py** - Main CDK application entry point for Jira integration - **app_service_now.py** - CDK application entry point for ServiceNow integration -- **deploy-integrations-solution.py** - CLI deployment script with integration-specific parameters +- **deploy_integrations_solution.py** - CLI deployment script with integration-specific parameters - **cdk.json** - CDK configuration and feature flags - **requirements.txt** - Python dependencies (includes asset-specific requirements) - **requirements-dev.txt** - Development and testing dependencies diff --git a/.kiro/steering/tech.md b/.kiro/steering/tech.md index 748ee54c..c7930408 100644 --- a/.kiro/steering/tech.md +++ b/.kiro/steering/tech.md @@ -42,10 +42,10 @@ pip install -r requirements-dev.txt ```bash # Deploy Jira integration -./deploy-integrations-solution.py jira --email user@example.com --url https://example.atlassian.net --token TOKEN --project-key PROJ +./deploy_integrations_solution.py jira --email user@example.com --url https://example.atlassian.net --token TOKEN --project-key PROJ # Deploy ServiceNow integration (under development) -./deploy-integrations-solution.py service-now --instance-id INSTANCE --username USER --password PASS +./deploy_integrations_solution.py service-now --instance-id INSTANCE --username USER --password PASS ``` ### Testing diff --git a/assets/security_ir_client/requirements.txt b/assets/security_ir_client/requirements.txt index 352983a8..7599c71a 100644 --- a/assets/security_ir_client/requirements.txt +++ b/assets/security_ir_client/requirements.txt @@ -3,4 +3,4 @@ aws-lambda-powertools>=2.30.2 requests>=2.31.0 pysnc PyJWT==2.10.1 -cryptography==46.0.3 \ No newline at end of file +cryptography>=46.0.3 \ No newline at end of file diff --git a/assets/service_now_client/requirements.txt b/assets/service_now_client/requirements.txt index 88ab7af0..59351f3a 100644 --- a/assets/service_now_client/requirements.txt +++ b/assets/service_now_client/requirements.txt @@ -1,4 +1,4 @@ boto3>=1.37.7 pysnc PyJWT==2.10.1 -cryptography==46.0.3 \ No newline at end of file +cryptography>=46.0.3 \ No newline at end of file diff --git a/assets/service_now_notifications_handler/requirements.txt b/assets/service_now_notifications_handler/requirements.txt index 62f8ae74..64cf03e1 100644 --- a/assets/service_now_notifications_handler/requirements.txt +++ b/assets/service_now_notifications_handler/requirements.txt @@ -2,4 +2,4 @@ boto3>=1.37.7 pysnc aws-lambda-powertools>=2.30.0 PyJWT==2.10.1 -cryptography==46.0.3 \ No newline at end of file +cryptography>=46.0.3 \ No newline at end of file diff --git a/assets/service_now_resource_setup_handler/requirements.txt b/assets/service_now_resource_setup_handler/requirements.txt index 5ab55e11..3164f3f6 100644 --- a/assets/service_now_resource_setup_handler/requirements.txt +++ b/assets/service_now_resource_setup_handler/requirements.txt @@ -1,4 +1,4 @@ requests==2.32.4 -boto3==1.34.0 -PyJWT==2.8.0 -cryptography==41.0.7 \ No newline at end of file +boto3>=1.37.7 +PyJWT==2.10.1 +cryptography>=46.0.3 \ No newline at end of file diff --git a/assets/slack_client/requirements.txt b/assets/slack_client/requirements.txt index 53b6e5ae..a523bdd1 100644 --- a/assets/slack_client/requirements.txt +++ b/assets/slack_client/requirements.txt @@ -1,4 +1,4 @@ -boto3>=1.26.0 +boto3>=1.37.7 slack-bolt>=1.18.0 slack-sdk>=3.23.0 requests>=2.28.0 \ No newline at end of file diff --git a/aws_security_incident_response_sample_integrations/aws_security_incident_response_sample_integrations_common_stack.py b/aws_security_incident_response_sample_integrations/aws_security_incident_response_sample_integrations_common_stack.py index 58108e1f..2125cb59 100644 --- a/aws_security_incident_response_sample_integrations/aws_security_incident_response_sample_integrations_common_stack.py +++ b/aws_security_incident_response_sample_integrations/aws_security_incident_response_sample_integrations_common_stack.py @@ -14,6 +14,8 @@ aws_events as events, aws_logs, ) +from aws_cdk.aws_events import CfnRule + from .event_bus_logger_construct import EventBusLoggerConstruct from cdk_nag import NagSuppressions from constructs import Construct @@ -180,7 +182,7 @@ def __init__( self.poller_rule = aws_events.Rule( self, - "SecurityIncidentResponsePollerRule", + "SecurityIncidentResponsePollerRuleDisabled", schedule=aws_events.Schedule.rate(duration=Duration.minutes(1)), targets=[aws_events_targets.LambdaFunction(self.poller)], enabled=False, # Start disabled @@ -561,4 +563,13 @@ def update_security_ir_client_env(self, service_now_params: dict) -> None: }, ], True, + ) + + def enable_polling(self): + self.poller_rule = aws_events.Rule( + self, + "SecurityIncidentResponsePollerRuleEnable", + schedule=aws_events.Schedule.rate(duration=Duration.minutes(1)), + targets=[aws_events_targets.LambdaFunction(self.poller)], + enabled=True, # Start disabled ) \ No newline at end of file diff --git a/aws_security_incident_response_sample_integrations/aws_security_incident_response_service_now_integration_stack.py b/aws_security_incident_response_sample_integrations/aws_security_incident_response_service_now_integration_stack.py index dfc95353..394a111e 100644 --- a/aws_security_incident_response_sample_integrations/aws_security_incident_response_service_now_integration_stack.py +++ b/aws_security_incident_response_sample_integrations/aws_security_incident_response_service_now_integration_stack.py @@ -1,8 +1,13 @@ from os import path from aws_cdk import ( + Aspects, + CfnCondition, CfnOutput, CfnParameter, + CfnResource, Duration, + Fn, + IAspect, Stack, Aws, RemovalPolicy, @@ -13,6 +18,7 @@ aws_lambda, aws_lambda_python_alpha as py_lambda, aws_logs, + aws_s3 as s3, aws_secretsmanager, aws_ssm, aws_s3 as s3, @@ -20,19 +26,33 @@ CustomResource, custom_resources as cr, ) -from cdk_nag import NagSuppressions +import jsii from constructs import Construct from .constants import ( SECURITY_IR_EVENT_SOURCE, SERVICE_NOW_EVENT_SOURCE, + PYTHON_LAMBDA_RUNTIME, SECRET_ROTATION_LAMBDA_TIMEOUT, API_GATEWAY_AUTHORIZOR_TIMEOUT, + API_GATEWAY_LAMBDA_HANDLER_TIMEOUT, DEFAULT_LAMBDA_TIMEOUT, LAMBDA_MEMORY_SIZE, - LAMBDA_TIMEOUT_MINUTES, + LAMBDA_TIMEOUT_MINUTES, TOKEN_ROTATION_PERIOD, ) from .aws_security_incident_response_sample_integrations_common_stack import ( AwsSecurityIncidentResponseSampleIntegrationsCommonStack, ) +@jsii.implements(IAspect) +class ApplyCondition: + """Aspect that applies a CfnCondition to all CfnResources in a construct tree.""" + + def __init__(self, condition: CfnCondition): + self.condition = condition + + def visit(self, node): + if isinstance(node, CfnResource): + node.cfn_options.condition = self.condition + + class AwsSecurityIncidentResponseServiceNowIntegrationStack(Stack): """AWS CDK Stack for ServiceNow integration with Security Incident Response.""" @@ -54,142 +74,221 @@ def __init__( super().__init__(scope, construct_id, **kwargs) # Reference common resources - table = common_stack.table - event_bus = common_stack.event_bus - event_bus_logger = common_stack.event_bus_logger - domain_layer = common_stack.domain_layer - mappers_layer = common_stack.mappers_layer - wrappers_layer = common_stack.wrappers_layer - log_level_param = common_stack.log_level_param + self.__table = common_stack.table + self.__event_bus = common_stack.event_bus + self.__event_bus_logger = common_stack.event_bus_logger + self.__domain_layer = common_stack.domain_layer + self.__mappers_layer = common_stack.mappers_layer + self.__wrappers_layer = common_stack.wrappers_layer + self.__log_level_param = common_stack.log_level_param - """ - cdk for setting Service Now Client parameters - """ - # Create Service Now Client parameters - service_now_instance_id_param = CfnParameter( + self.__setup_cfn_parameters() + self.__setup_ssm_parameters() + + self.__private_key_bucket = s3.Bucket.from_bucket_name( self, - "serviceNowInstanceId", - type="String", - description="The instance id that will be used with the Service Now API.", - no_echo=True, + "ServiceNowPrivateKeyBucket", + bucket_name=self.__private_key_bucket_param.value_as_string ) - # Store Service Now Client ID parameter - service_now_client_id_param = CfnParameter( + # Create CfnCondition for deploy-time evaluation of OAuth setting + # Note: OAuth is not yet implemented - this condition is for future use + self.use_oauth_condition = CfnCondition( self, - "serviceNowClientId", - type="String", - description="The OAuth client ID for the ServiceNow API.", + "UseOAuthCondition", + expression=Fn.condition_equals(self.__use_oauth_param.value_as_string, "true"), ) - # Store Service Now Client Secret parameter - service_now_client_secret_param = CfnParameter( + # Condition for token-based auth (when OAuth is NOT enabled) + # This is the default authentication method + self.use_token_auth_condition = CfnCondition( self, - "serviceNowClientSecret", - type="String", - description="The OAuth client secret that will be used with the Service Now API.", - no_echo=True, + "UseTokenAuthCondition", + expression=Fn.condition_equals(self.__use_oauth_param.value_as_string, "false"), ) - # Store Service Now User ID parameter - service_now_user_id_param = CfnParameter( + combined_condition = CfnCondition(self, 'TemporaryOrCondition', + expression=Fn.condition_or( + self.use_oauth_condition, + self.use_token_auth_condition + ) + ) + + # Create System -> ServiceNow Connectivity + self.service_now_client = self._create_service_now_client() + common_stack.enable_polling() + common_stack.poller_rule.node.add_dependency(self.service_now_client) # Ensures poller doesn't start polling until the client is there to process the events. + + ## Create Event Bridge rule for ServiceNow Client Lambda function + service_now_client_rule = aws_events.Rule( self, - "serviceNowUserId", - type="String", - description="The ServiceNow user ID for JWT authentication.", + "service-now-client-rule", + description="Rule to send all events to Service Now Lambda function", + event_pattern=aws_events.EventPattern(source=[SECURITY_IR_EVENT_SOURCE]), + event_bus=self.__event_bus, ) + service_now_client_rule.add_target(aws_events_targets.LambdaFunction( + self.service_now_client + )) - # Private key bucket parameter (from deploy script) - private_key_bucket_param = CfnParameter( + # Create ServiceNow -> System Connectivity + self.service_now_notification_handler = self._create_service_now_notifications_handler() + ## Add a specific rule for ServiceNow notification events + service_now_notifications_rule = aws_events.Rule( self, - "privateKeyBucket", - type="String", - description="S3 bucket name containing the private key file.", + "ServiceNowNotificationsRule", + description="Rule to capture events from ServiceNow notifications handler", + event_pattern=aws_events.EventPattern(source=[SERVICE_NOW_EVENT_SOURCE]), + event_bus=self.__event_bus, ) + ## Use the same log group as the event bus logger + service_now_notifications_rule.add_target(aws_events_targets.CloudWatchLogGroup( + log_group=self.__event_bus_logger.log_group + )) - # Integration module parameter - self.integration_module_param = CfnParameter( + # Create Api Gateway + self.__api_gateway = self.__create_api_gateway() + webhook_resource = self.__api_gateway.root.add_resource("webhook") + self.__create_token_based_authn_authz_endpoint( + api_gateway=self.__api_gateway, + api_gateway_resource=webhook_resource, + handler=self.service_now_notification_handler, + http_method='POST', # OPTIONS method is automatically added by CORS configuration, no need to add it manually + apply_condition=combined_condition # TODO: Swap for self.__use_token_auth_condition + ) + + # Output API Gateway URL + CfnOutput( self, - "integrationModule", - type="String", - description="ServiceNow integration module: 'itsm' for IT Service Management or 'ir' for Incident Response", - allowed_values=["itsm", "ir"], - default="itsm", + "ServiceNowWebhookUrl", + value=f"{self.__api_gateway.url.rstrip('/')}/webhook", + description="ServiceNow Webhook API Gateway URL", ) - # Create SSM parameters - service_now_client_secret_ssm_param = aws_ssm.StringParameter( + def __setup_ssm_parameters(self): + """Creates SSM Parameters, these store static secrets that we wouldn't the values exposed as environment variables in Lambda Functions""" + self.__service_now_client_secret_ssm_param = aws_ssm.StringParameter( self, "serviceNowClientSecretSSM", parameter_name="/SecurityIncidentResponse/serviceNowClientSecret", - string_value=service_now_client_secret_param.value_as_string, + string_value=self.__service_now_client_secret_param.value_as_string, description="Service Now OAuth client secret", ) - service_now_client_secret_ssm_param.apply_removal_policy(RemovalPolicy.DESTROY) + self.__service_now_client_secret_ssm_param.apply_removal_policy(RemovalPolicy.DESTROY) - service_now_client_id_ssm = aws_ssm.StringParameter( + self.__service_now_client_id_ssm = aws_ssm.StringParameter( self, "serviceNowClientIdSSM", parameter_name="/SecurityIncidentResponse/serviceNowClientId", - string_value=service_now_client_id_param.value_as_string, + string_value=self.__service_now_client_id_param.value_as_string, description="Service Now OAuth client ID", ) - service_now_client_id_ssm.apply_removal_policy(RemovalPolicy.DESTROY) + self.__service_now_client_id_ssm.apply_removal_policy(RemovalPolicy.DESTROY) - service_now_user_id_ssm = aws_ssm.StringParameter( + self.__service_now_user_id_ssm = aws_ssm.StringParameter( self, "serviceNowUserIdSSM", parameter_name="/SecurityIncidentResponse/serviceNowUserId", - string_value=service_now_user_id_param.value_as_string, + string_value=self.__service_now_user_id_param.value_as_string, description="Service Now user ID", ) - service_now_user_id_ssm.apply_removal_policy(RemovalPolicy.DESTROY) + self.__service_now_user_id_ssm.apply_removal_policy(RemovalPolicy.DESTROY) # Use existing S3 bucket from deploy script - # Reference the AWS-managed S3 KMS key - s3_kms_key = kms.Alias.from_alias_name( - self, - "AWSS3ManagedKey", - "alias/aws/s3" - ) - - private_key_bucket = s3.Bucket.from_bucket_name( - self, - "ServiceNowPrivateKeyBucket", - private_key_bucket_param.value_as_string - ) # Create SSM parameters for S3 bucket location - private_key_asset_bucket_ssm = aws_ssm.StringParameter( + self.__private_key_asset_bucket_ssm = aws_ssm.StringParameter( self, "PrivateKeyAssetBucketSSM", parameter_name="/SecurityIncidentResponse/privateKeyAssetBucket", - string_value=private_key_bucket.bucket_name, + string_value=self.__private_key_bucket_param.value_as_string, description="S3 bucket for private key asset", ) - private_key_asset_bucket_ssm.apply_removal_policy(RemovalPolicy.DESTROY) + self.__private_key_asset_bucket_ssm.apply_removal_policy(RemovalPolicy.DESTROY) - private_key_asset_key_ssm = aws_ssm.StringParameter( + self.__private_key_asset_key_ssm = aws_ssm.StringParameter( self, "PrivateKeyAssetKeySSM", parameter_name="/SecurityIncidentResponse/privateKeyAssetKey", string_value="private.key", description="S3 object key for private key asset", ) - private_key_asset_key_ssm.apply_removal_policy(RemovalPolicy.DESTROY) + self.__private_key_asset_key_ssm.apply_removal_policy(RemovalPolicy.DESTROY) - service_now_instance_id_ssm = aws_ssm.StringParameter( + self.__service_now_instance_id_ssm = aws_ssm.StringParameter( self, "serviceNowInstanceIdSSM", parameter_name="/SecurityIncidentResponse/serviceNowInstanceId", - string_value=service_now_instance_id_param.value_as_string, + string_value=self.__service_now_instance_id_param.value_as_string, description="Service Now instance id", ) - service_now_instance_id_ssm.apply_removal_policy(RemovalPolicy.DESTROY) + self.__service_now_instance_id_ssm.apply_removal_policy(RemovalPolicy.DESTROY) - """ - cdk for assets/service_now_client - """ + def __setup_cfn_parameters(self): + """"Creates CfnParameters. We use CfnParameters to get data about ServiceNow from engineer deploying the system""" + self.__service_now_instance_id_param = CfnParameter( + self, + "serviceNowInstanceId", + type="String", + description="The instance id that will be used with the Service Now API.", + no_echo=True, + ) + + # Store Service Now Client ID parameter + self.__service_now_client_id_param = CfnParameter( + self, + "serviceNowClientId", + type="String", + description="The OAuth client ID for the ServiceNow API.", + ) + + # Store Service Now Client Secret parameter + self.__service_now_client_secret_param = CfnParameter( + self, + "serviceNowClientSecret", + type="String", + description="The OAuth client secret that will be used with the Service Now API.", + no_echo=True, + ) + + # Store Service Now User ID parameter + self.__service_now_user_id_param = CfnParameter( + self, + "serviceNowUserId", + type="String", + description="The ServiceNow user ID for JWT authentication.", + ) + + # Private key bucket parameter (from deploy script) + self.__private_key_bucket_param = CfnParameter( + self, + "privateKeyBucket", + type="String", + description="S3 bucket name containing the private key file.", + ) + + # Integration module parameter + self.integration_module_param = CfnParameter( + self, + "integrationModule", + type="String", + description="ServiceNow integration module: 'itsm' for IT Service Management or 'ir' for Incident Response", + allowed_values=["itsm", "ir"], + default="itsm", + ) + + # API Gateway authentication type parameter + self.__use_oauth_param = CfnParameter( + self, + "useOAuth", + type="String", + description="Use OAuth for API Gateway authentication instead of token-based auth. Set to 'true' to enable OAuth (not yet implemented).", + allowed_values=["true", "false"], + default="false", + ) + + def _create_service_now_client(self) -> py_lambda.PythonFunction: + """The purpose of the ServiceNow Client is to issue commands to ServiceNow from the System""" # Create a custom role for the ServiceNow Client Lambda function service_now_client_role = aws_iam.Role( self, @@ -198,127 +297,160 @@ def __init__( description="Custom role for Security Incident Response Service Now Client Lambda function", ) - # Add custom policy for CloudWatch Logs permissions service_now_client_role.add_to_policy( aws_iam.PolicyStatement( effect=aws_iam.Effect.ALLOW, actions=[ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:PutLogEvents", - ], - resources=[ - f"arn:{Aws.PARTITION}:logs:{self.region}:{self.account}:log-group:/aws/lambda/*" + "security-ir:GetCaseAttachmentDownloadUrl", + "security-ir:ListComments", ], + resources=["*"], ) ) - # create Lambda function for Service Now with custom role + # Create Lambda function service_now_client = py_lambda.PythonFunction( self, "SecurityIncidentResponseServiceNowClient", entry=path.join(path.dirname(__file__), "..", "assets/service_now_client"), - runtime=aws_lambda.Runtime.PYTHON_3_13, - timeout=Duration.minutes(LAMBDA_TIMEOUT_MINUTES), - memory_size=LAMBDA_MEMORY_SIZE, - layers=[domain_layer, mappers_layer, wrappers_layer], + runtime=PYTHON_LAMBDA_RUNTIME, + timeout=API_GATEWAY_LAMBDA_HANDLER_TIMEOUT, + layers=[self.__domain_layer, self.__mappers_layer, self.__wrappers_layer], environment={ - "SERVICE_NOW_INSTANCE_ID": service_now_instance_id_ssm.parameter_name, - "SERVICE_NOW_CLIENT_ID": service_now_client_id_ssm.parameter_name, - "SERVICE_NOW_USER_ID": service_now_user_id_ssm.parameter_name, - "PRIVATE_KEY_ASSET_BUCKET": private_key_asset_bucket_ssm.parameter_name, - "PRIVATE_KEY_ASSET_KEY": private_key_asset_key_ssm.parameter_name, - "INCIDENTS_TABLE_NAME": table.table_name, - "SERVICE_NOW_CLIENT_SECRET_PARAM": service_now_client_secret_ssm_param.parameter_name, "EVENT_SOURCE": SECURITY_IR_EVENT_SOURCE, "INTEGRATION_MODULE": self.integration_module_param.value_as_string, - "LOG_LEVEL": log_level_param.value_as_string, + "LOG_LEVEL": self.__log_level_param.value_as_string, }, role=service_now_client_role, ) + self.__add_service_now_to_environment(service_now_client) + self.__add_private_key_to_environment(service_now_client) + self.__add_incident_table_to_environment(service_now_client) + return service_now_client - # create Event Bridge rule for Service Now Client Lambda function - service_now_client_rule = aws_events.Rule( + def _create_service_now_notifications_handler(self) -> py_lambda.PythonFunction: + """The purpose of the ServiceNow Notification Handler is to process events coming from ServiceNow""" + # Create Service Now notifications handler and related resources + service_now_notifications_handler_role = aws_iam.Role( self, - "service-now-client-rule", - description="Rule to send all events to Service Now Lambda function", - event_pattern=aws_events.EventPattern(source=[SECURITY_IR_EVENT_SOURCE]), - event_bus=event_bus, - ) - - # Add target - service_now_client_target = aws_events_targets.LambdaFunction( - service_now_client + "ServiceNowNotificationsHandlerRole", + assumed_by=aws_iam.ServicePrincipal("lambda.amazonaws.com"), + description="Custom role for Service Now Notifications Handler Lambda function", ) - service_now_client_rule.add_target(service_now_client_target) - # grant permissions to DynamoDB table and security-ir - service_now_client_role.add_to_policy( + # Grant permission to publish events to EventBridge + # TODO: Swap with grant + service_now_notifications_handler_role.add_to_policy( aws_iam.PolicyStatement( effect=aws_iam.Effect.ALLOW, - actions=[ - "security-ir:GetCaseAttachmentDownloadUrl", - "security-ir:ListComments", - ], - resources=["*"], + actions=["events:PutEvents"], + resources=[self.__event_bus.event_bus_arn], ) ) - - # allow adding SSM values - service_now_client_role.add_to_policy( - aws_iam.PolicyStatement( - effect=aws_iam.Effect.ALLOW, - actions=["ssm:GetParameter", "ssm:PutParameter"], - resources=["*"], - ) + # Create Lambda function for Service Now Notifications handler with custom role + service_now_notifications_handler = py_lambda.PythonFunction( + self, + "ServiceNowNotificationsHandler", + entry=path.join( + path.dirname(__file__), "..", "assets/service_now_notifications_handler" + ), + runtime=PYTHON_LAMBDA_RUNTIME, + timeout=API_GATEWAY_LAMBDA_HANDLER_TIMEOUT, + layers=[self.__domain_layer, self.__mappers_layer, self.__wrappers_layer], + environment={ + "EVENT_BUS_NAME": self.__event_bus.event_bus_name, + "EVENT_SOURCE": SERVICE_NOW_EVENT_SOURCE, + "INTEGRATION_MODULE": self.integration_module_param.value_as_string, + "LOG_LEVEL": self.__log_level_param.value_as_string, + }, + role=service_now_notifications_handler_role, ) + self.__add_service_now_to_environment(service_now_notifications_handler) + self.__add_private_key_to_environment(service_now_notifications_handler) + self.__add_incident_table_to_environment(service_now_notifications_handler) + return service_now_notifications_handler - # Grant S3 permissions to read private key - private_key_bucket.grant_read(service_now_client_role) - - # Grant KMS permissions to decrypt S3 objects using specific key - s3_kms_key.grant_decrypt(service_now_client_role) + def __create_service_now_setup_custom_resource(self, api_gateway: aws_apigateway.RestApi, secret: aws_secretsmanager.Secret) -> CustomResource: + """Creates Business Rule and Outbound REST API. These ServiceNow resources automate the pushing of events from ServiceNow to API Gateway""" + service_now_resource_setup_role = aws_iam.Role( + self, + "ServiceNowResourceSetupRole", + assumed_by=aws_iam.ServicePrincipal("lambda.amazonaws.com"), + description="Role for ServiceNow Resource setup Lambda", + ) + secret.grant_read(service_now_resource_setup_role) + secret.grant_write(service_now_resource_setup_role) - # Grant specific DynamoDB permissions instead of full access - table.grant_read_write_data(service_now_client_role) + service_now_resource_setup_handler = py_lambda.PythonFunction( + self, + "ServiceNowResourceSetupLambda", + entry=path.join( + path.dirname(__file__), + "..", + "assets/service_now_resource_setup_handler", + ), + layers=[self.__domain_layer, self.__mappers_layer, self.__wrappers_layer], + runtime=PYTHON_LAMBDA_RUNTIME, + timeout=DEFAULT_LAMBDA_TIMEOUT, + environment={ + "SERVICE_NOW_RESOURCE_PREFIX": api_gateway.rest_api_id, + "WEBHOOK_URL": f"{api_gateway.url.rstrip('/')}/webhook", + "API_AUTH_SECRET": secret.secret_arn, + "INTEGRATION_MODULE": self.integration_module_param.value_as_string, + "LOG_LEVEL": self.__log_level_param.value_as_string, + }, + role=service_now_resource_setup_role, + ) + self.__add_service_now_to_environment(service_now_resource_setup_handler) + self.__add_private_key_to_environment(service_now_resource_setup_handler) - # Enable the poller rule after ServiceNow client is ready - enable_poller_cr = cr.AwsCustomResource( + service_now_cr_provider = cr.Provider( self, - "EnablePollerRule", - on_create=cr.AwsSdkCall( - service="EventBridge", - action="enableRule", - parameters={ - "Name": common_stack.poller_rule.rule_name, - }, - physical_resource_id=cr.PhysicalResourceId.of( - f"enable-poller-{common_stack.poller_rule.rule_name}" - ), - ), - policy=cr.AwsCustomResourcePolicy.from_sdk_calls( - resources=[common_stack.poller_rule.rule_arn] - ), + "ServiceNowResourceSetupProvider", + on_event_handler=service_now_resource_setup_handler, ) - enable_poller_cr.node.add_dependency(service_now_client) - - # Add suppressions for IAM5 findings related to wildcard resources - NagSuppressions.add_resource_suppressions( - service_now_client_role, - [ - { - "id": "AwsSolutions-IAM5", - "reason": "Wildcard resources are required for security-ir and SSM actions", - "applies_to": ["Resource::*"], - } - ], - True, + + # Create custom resource + return CustomResource( + self, + "ServiceNowResourceSetupCr", + service_token=service_now_cr_provider.service_token, + properties={ + "WebhookUrl": f"{api_gateway.url.rstrip('/')}/webhook", + "IntegrationModule": self.integration_module_param.value_as_string, + }, ) - """ - cdk for API Gateway to receive events from ServiceNow - """ - # Create IAM role for API Gateway CloudWatch logging + def __add_service_now_to_environment(self, py_function: py_lambda.PythonFunction) -> None: + """Adds ServiceNow specific environment variables and grants permissions""" + py_function.add_environment('SERVICE_NOW_INSTANCE_ID', self.__service_now_instance_id_ssm.parameter_name) + py_function.add_environment('SERVICE_NOW_CLIENT_ID', self.__service_now_client_id_ssm.parameter_name) + py_function.add_environment('SERVICE_NOW_USER_ID', self.__service_now_user_id_ssm.parameter_name) + py_function.add_environment('SERVICE_NOW_CLIENT_SECRET_PARAM', self.__service_now_client_secret_ssm_param.parameter_name) + + # SSM Grants + for ssm_param in [self.__service_now_instance_id_ssm, self.__service_now_client_id_ssm, + self.__service_now_user_id_ssm, self.__service_now_client_secret_ssm_param]: + ssm_param.grant_read(py_function.role) + ssm_param.grant_write(py_function.role) + + def __add_private_key_to_environment(self, py_function: py_lambda.PythonFunction) -> None: + """Adds private key environment and grants permissions""" + py_function.add_environment('PRIVATE_KEY_ASSET_BUCKET', self.__private_key_asset_bucket_ssm.parameter_name) + py_function.add_environment('PRIVATE_KEY_ASSET_KEY', self.__private_key_asset_key_ssm.parameter_name) + ## SSM Grants + for ssm_param in [self.__private_key_asset_bucket_ssm, self.__private_key_asset_key_ssm]: + ssm_param.grant_read(py_function.role) + ssm_param.grant_write(py_function.role) + + self.__private_key_bucket.grant_read(py_function.role) + + def __add_incident_table_to_environment(self, py_function: py_lambda.PythonFunction) -> None: + py_function.add_environment('INCIDENTS_TABLE_NAME', self.__table.table_name) + self.__table.grant_read_write_data(py_function.role) + + def __create_api_gateway(self) -> aws_apigateway.RestApi: + """Create API Gateway""" api_gateway_logging_role = aws_iam.Role( self, "ApiGatewayLoggingRole", @@ -326,32 +458,20 @@ def __init__( description="Role for API Gateway to write logs to CloudWatch", managed_policies=[ aws_iam.ManagedPolicy.from_aws_managed_policy_name( - "service-role/AmazonAPIGatewayPushToCloudWatchLogs" + "service-role/AmazonAPIGatewayPushToCloudWatchLogs" # This includes Read / Write permission to CW Logs. So we don't need to explicitly grant ) ], ) - - # Add CloudWatch Logs permissions to the role - api_gateway_logging_role.add_to_policy( - aws_iam.PolicyStatement( - effect=aws_iam.Effect.ALLOW, - actions=[ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:DescribeLogGroups", - "logs:DescribeLogStreams", - "logs:PutLogEvents", - "logs:GetLogEvents", - "logs:FilterLogEvents", - ], - resources=[f"arn:{Aws.PARTITION}:logs:{self.region}:{self.account}:*"], - ) + api_gateway_logs = aws_logs.LogGroup( + self, + "ServiceNowApiGatewayLogs", + log_group_name=f"/aws/apigateway/ServiceNowWebhookApi-{self.node.addr}", + retention=aws_logs.RetentionDays.ONE_WEEK, + removal_policy=RemovalPolicy.DESTROY, ) - - # Create API Gateway - service_now_api_gateway = aws_apigateway.RestApi( + api_gateway = aws_apigateway.RestApi( self, - "ServiceNowWebhookApi", + "ServiceNowWebhookApiToken", rest_api_name="ServiceNow Webhook API", description="API Gateway to receive events from ServiceNow", default_cors_preflight_options=aws_apigateway.CorsOptions( @@ -363,33 +483,26 @@ def __init__( logging_level=aws_apigateway.MethodLoggingLevel.INFO, data_trace_enabled=True, metrics_enabled=True, - access_log_destination=aws_apigateway.LogGroupLogDestination( - aws_logs.LogGroup( - self, - "ServiceNowApiGatewayLogs", - log_group_name=f"/aws/apigateway/ServiceNowWebhookApi-{self.node.addr}", - retention=aws_logs.RetentionDays.ONE_WEEK, - removal_policy=RemovalPolicy.DESTROY, - ) - ), access_log_format=aws_apigateway.AccessLogFormat.clf(), - ), + access_log_destination=aws_apigateway.LogGroupLogDestination(api_gateway_logs) + ) ) - - # Create account-level setting for API Gateway CloudWatch role api_gateway_account = aws_apigateway.CfnAccount( self, "ApiGatewayAccount", cloud_watch_role_arn=api_gateway_logging_role.role_arn, ) - - # Add dependency to ensure the role is created before the account uses it api_gateway_account.node.add_dependency(api_gateway_logging_role) - """ - cdk for Secrets Manager secret with rotation for API Gateway authorization - """ - # Create rotation Lambda role + return api_gateway + + def __create_token_based_authn_authz_endpoint(self, + api_gateway: aws_apigateway.RestApi, + api_gateway_resource: aws_apigateway.Resource, + handler: py_lambda.PythonFunction, + http_method: str, + apply_condition: CfnCondition) -> aws_apigateway.Method: + # Create Lambda Function to rotate token service_now_secret_rotation_handler_role = aws_iam.Role( self, "ServiceNowSecretRotationHandlerRole", @@ -397,33 +510,6 @@ def __init__( description="Role for ServiceNow secret rotation Lambda function", ) - service_now_secret_rotation_handler_role.add_to_policy( - aws_iam.PolicyStatement( - effect=aws_iam.Effect.ALLOW, - actions=[ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:PutLogEvents", - ], - resources=[ - f"arn:{Aws.PARTITION}:logs:{self.region}:{self.account}:log-group:/aws/lambda/*" - ], - ) - ) - - service_now_secret_rotation_handler_role.add_to_policy( - aws_iam.PolicyStatement( - effect=aws_iam.Effect.ALLOW, - actions=[ - "secretsmanager:GetSecretValue", - "secretsmanager:PutSecretValue", - "secretsmanager:UpdateSecretVersionStage", - ], - resources=["*"], - ) - ) - - # Create rotation Lambda function service_now_secret_rotation_handler = py_lambda.PythonFunction( self, "SecretRotationLambda", @@ -432,14 +518,13 @@ def __init__( "..", "assets/service_now_secret_rotation_handler", ), - runtime=aws_lambda.Runtime.PYTHON_3_13, - timeout=Duration.minutes(LAMBDA_TIMEOUT_MINUTES), role=service_now_secret_rotation_handler_role, + runtime=PYTHON_LAMBDA_RUNTIME, + timeout=SECRET_ROTATION_LAMBDA_TIMEOUT, ) - # Create the secret with rotation secret_template = '{"token": ""}' # nosec B105 - api_auth_secret = aws_secretsmanager.Secret( + token_secret = aws_secretsmanager.Secret( self, "ApiAuthSecret", description="API Gateway authorization token for ServiceNow webhook", @@ -450,167 +535,22 @@ def __init__( password_length=32, ), ) - - # Configure rotation - api_auth_secret.add_rotation_schedule( + token_secret.grant_read(service_now_secret_rotation_handler_role) + token_secret.grant_write(service_now_secret_rotation_handler_role) + token_secret.add_rotation_schedule( "RotationSchedule", rotation_lambda=service_now_secret_rotation_handler, - automatically_after=Duration.days(30), - ) - - # Add suppression for rotation role - NagSuppressions.add_resource_suppressions( - service_now_secret_rotation_handler_role, - [ - { - "id": "AwsSolutions-IAM5", - "reason": "Wildcard resources are required for Secrets Manager rotation", - "applies_to": ["Resource::*"], - } - ], - True, - ) - - """ - cdk for assets/service_now_notifications_handler - """ - # Create Service Now notifications handler and related resources - service_now_notifications_handler_role = aws_iam.Role( - self, - "ServiceNowNotificationsHandlerRole", - assumed_by=aws_iam.ServicePrincipal("lambda.amazonaws.com"), - description="Custom role for Service Now Notifications Handler Lambda function", - ) - - # Add custom policy for CloudWatch Logs permissions - service_now_notifications_handler_role.add_to_policy( - aws_iam.PolicyStatement( - effect=aws_iam.Effect.ALLOW, - actions=[ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:PutLogEvents", - ], - resources=[ - f"arn:{Aws.PARTITION}:logs:{self.region}:{self.account}:log-group:/aws/lambda/*" - ], - ) + automatically_after=TOKEN_ROTATION_PERIOD, ) - # Grant permission to publish events to EventBridge - service_now_notifications_handler_role.add_to_policy( - aws_iam.PolicyStatement( - effect=aws_iam.Effect.ALLOW, - actions=["events:PutEvents"], - resources=[event_bus.event_bus_arn], - ) - ) - - # Grant permission to access SSM parameters - service_now_notifications_handler_role.add_to_policy( - aws_iam.PolicyStatement( - effect=aws_iam.Effect.ALLOW, - actions=["ssm:GetParameter"], - resources=["*"], - ) - ) - - # Add suppressions for IAM5 findings related to wildcard resources - NagSuppressions.add_resource_suppressions( - service_now_notifications_handler_role, - [ - { - "id": "AwsSolutions-IAM5", - "reason": "Wildcard resources are required for SSM parameters", - "applies_to": ["Resource::*"], - } - ], - True, - ) - - # Create Lambda function for Service Now Notifications handler with custom role - service_now_notifications_handler = py_lambda.PythonFunction( - self, - "ServiceNowNotificationsHandler", - entry=path.join( - path.dirname(__file__), "..", "assets/service_now_notifications_handler" - ), - runtime=aws_lambda.Runtime.PYTHON_3_13, - timeout=Duration.minutes(LAMBDA_TIMEOUT_MINUTES), - memory_size=LAMBDA_MEMORY_SIZE, - layers=[domain_layer, mappers_layer, wrappers_layer], - environment={ - "EVENT_BUS_NAME": event_bus.event_bus_name, - "SERVICE_NOW_INSTANCE_ID": service_now_instance_id_ssm.parameter_name, - "SERVICE_NOW_CLIENT_ID": service_now_client_id_ssm.parameter_name, - "SERVICE_NOW_USER_ID": service_now_user_id_ssm.parameter_name, - "PRIVATE_KEY_ASSET_BUCKET": private_key_asset_bucket_ssm.parameter_name, - "PRIVATE_KEY_ASSET_KEY": private_key_asset_key_ssm.parameter_name, - "SERVICE_NOW_CLIENT_SECRET_PARAM": service_now_client_secret_ssm_param.parameter_name, - "INCIDENTS_TABLE_NAME": table.table_name, - "EVENT_SOURCE": SERVICE_NOW_EVENT_SOURCE, - "INTEGRATION_MODULE": self.integration_module_param.value_as_string, - "LOG_LEVEL": log_level_param.value_as_string, - }, - role=service_now_notifications_handler_role, - ) - - # Add a specific rule for ServiceNow notification events - service_now_notifications_rule = aws_events.Rule( - self, - "ServiceNowNotificationsRule", - description="Rule to capture events from ServiceNow notifications handler", - event_pattern=aws_events.EventPattern(source=[SERVICE_NOW_EVENT_SOURCE]), - event_bus=event_bus, - ) - - # Use the same log group as the event bus logger - service_now_notifications_target = aws_events_targets.CloudWatchLogGroup( - log_group=event_bus_logger.log_group - ) - service_now_notifications_rule.add_target(service_now_notifications_target) - - # Grant S3 permissions to read private key - private_key_bucket.grant_read(service_now_notifications_handler_role) - - # Grant KMS permissions to decrypt S3 objects using specific key - s3_kms_key.grant_decrypt(service_now_notifications_handler_role) - - # Grant specific DynamoDB permissions instead of full access - table.grant_read_write_data(service_now_notifications_handler_role) - - """ - cdk for Service Now API Gateway Authorizer - """ - # Create Lambda authorizer for service_now_api_gateway + # Create Lambda authorizer to enforce Token based AuthN/AuthZ service_now_api_gateway_authorizer_role = aws_iam.Role( self, "ServiceNowApiGatewayAuthorizerRole", assumed_by=aws_iam.ServicePrincipal("lambda.amazonaws.com"), description="Role for ServiceNow API Gateway authorizer Lambda function", ) - - service_now_api_gateway_authorizer_role.add_to_policy( - aws_iam.PolicyStatement( - effect=aws_iam.Effect.ALLOW, - actions=[ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:PutLogEvents", - ], - resources=[ - f"arn:{Aws.PARTITION}:logs:{self.region}:{self.account}:log-group:/aws/lambda/*" - ], - ) - ) - - service_now_api_gateway_authorizer_role.add_to_policy( - aws_iam.PolicyStatement( - effect=aws_iam.Effect.ALLOW, - actions=["secretsmanager:GetSecretValue"], - resources=[api_auth_secret.secret_arn], - ) - ) + token_secret.grant_read(service_now_api_gateway_authorizer_role) service_now_api_gateway_authorizer_lambda = py_lambda.PythonFunction( self, @@ -620,16 +560,14 @@ def __init__( "..", "assets/service_now_api_gateway_authorizer", ), - runtime=aws_lambda.Runtime.PYTHON_3_13, - timeout=Duration.minutes(LAMBDA_TIMEOUT_MINUTES), + runtime=PYTHON_LAMBDA_RUNTIME, + timeout=API_GATEWAY_AUTHORIZOR_TIMEOUT, environment={ - "API_AUTH_SECRET": api_auth_secret.secret_arn, - "LOG_LEVEL": log_level_param.value_as_string, + "API_AUTH_SECRET": token_secret.secret_arn, + "LOG_LEVEL": self.__log_level_param.value_as_string, }, role=service_now_api_gateway_authorizer_role, ) - - # Create API Gateway authorizer service_now_api_gateway_token_authorizer = aws_apigateway.TokenAuthorizer( self, "ServiceNowTokenAuthorizer", @@ -637,220 +575,21 @@ def __init__( identity_source="method.request.header.Authorization", ) - # Create webhook resource and methods - webhook_resource = service_now_api_gateway.root.add_resource("webhook") - webhook_integration = aws_apigateway.LambdaIntegration( - service_now_notifications_handler - ) - - webhook_resource.add_method( - "POST", - webhook_integration, + token_based_method = api_gateway_resource.add_method( + http_method, + aws_apigateway.LambdaIntegration(handler), # As a side effect, API Gateway is automatically granted invoke permissions authorizer=service_now_api_gateway_token_authorizer, ) - # OPTIONS method is automatically added by CORS configuration, no need to add it manually - # Grant API Gateway permission to invoke the Lambda function - service_now_notifications_handler.grant_invoke( - aws_iam.ServicePrincipal("apigateway.amazonaws.com") + # Setup ServiceNow Business Rule and REST API + service_now_setup_cr = self.__create_service_now_setup_custom_resource( + api_gateway=api_gateway, + secret=token_secret ) - # Add suppression for authorizer role - NagSuppressions.add_resource_suppressions( - service_now_api_gateway_authorizer_role, - [ - { - "id": "AwsSolutions-IAM5", - "reason": "Wildcard resources are required for CloudWatch Logs permissions", - "applies_to": ["Resource::arn:*:logs:*:*:*"], - } - ], - True, - ) - - # Add suppressions for IAM5 findings related to wildcard resources - NagSuppressions.add_resource_suppressions( - service_now_notifications_handler, - [ - { - "id": "AwsSolutions-IAM5", - "reason": "Wildcard resources are required for security-ir, events, lambda, and SSM actions", - "applies_to": ["Resource::*"], - } - ], - True, - ) - - # Add suppressions for API Gateway logging role - NagSuppressions.add_resource_suppressions( - api_gateway_logging_role, - [ - { - "id": "AwsSolutions-IAM5", - "reason": "Wildcard resources are required for CloudWatch Logs permissions", - "applies_to": ["Resource::arn:*:logs:*:*:*"], - } - ], - True, - ) - - """ - Custom Lambda resource for creating ServiceNow resources (Business Rule and Outbound REST API). These Service Now resources will automate the event processing for Incident related updates in AWS Security IR - """ - # Create role for ServiceNow API setup Lambda - service_now_resource_setup_role = aws_iam.Role( - self, - "ServiceNowResourceSetupRole", - assumed_by=aws_iam.ServicePrincipal("lambda.amazonaws.com"), - description="Role for ServiceNow Resource setup Lambda", - ) - - # Add CloudWatch Logs permissions - service_now_resource_setup_role.add_to_policy( - aws_iam.PolicyStatement( - effect=aws_iam.Effect.ALLOW, - actions=[ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:PutLogEvents", - ], - resources=[ - f"arn:{Aws.PARTITION}:logs:{self.region}:{self.account}:log-group:/aws/lambda/*" - ], - ) - ) - - # Add SSM permissions with full access to resolve permission issues - service_now_resource_setup_role.add_to_policy( - aws_iam.PolicyStatement( - effect=aws_iam.Effect.ALLOW, - actions=[ - "ssm:GetParameter", - "ssm:GetParameters", - "ssm:DescribeParameters", - ], - resources=["*"], - ) - ) - - # Add Secrets Manager permissions for rotation and secret access - service_now_resource_setup_role.add_to_policy( - aws_iam.PolicyStatement( - effect=aws_iam.Effect.ALLOW, - actions=[ - "secretsmanager:RotateSecret", - "secretsmanager:GetSecretValue", - "secretsmanager:PutSecretValue", - "secretsmanager:UpdateSecretVersionStage", - ], - resources=[api_auth_secret.secret_arn], - ) - ) + for resource in [service_now_secret_rotation_handler_role, service_now_secret_rotation_handler, token_secret, + service_now_api_gateway_authorizer_role, service_now_api_gateway_authorizer_lambda, + service_now_api_gateway_token_authorizer, token_based_method, service_now_setup_cr]: + Aspects.of(resource).add(ApplyCondition(apply_condition)) - # Grant S3 permissions to read private key - private_key_bucket.grant_read(service_now_resource_setup_role) - - # Grant KMS permissions to decrypt S3 objects using specific key - s3_kms_key.grant_decrypt(service_now_resource_setup_role) - - # Add suppression for wildcard resource in SSM and Secrets Manager policies - NagSuppressions.add_resource_suppressions( - service_now_resource_setup_role, - [ - { - "id": "AwsSolutions-IAM5", - "reason": "Wildcard resource is required for SSM parameter access and Secrets Manager rotation", - "applies_to": ["Resource::*"], - } - ], - True, - ) - - # Use the API Gateway's physical ID as the resource prefix - # This will be available after deployment and used for naming ServiceNow resources - - # Create Lambda function for ServiceNow API setup - service_now_resource_setup_handler = py_lambda.PythonFunction( - self, - "ServiceNowResourceSetupLambda", - entry=path.join( - path.dirname(__file__), - "..", - "assets/service_now_resource_setup_handler", - ), - layers=[domain_layer, mappers_layer, wrappers_layer], - runtime=aws_lambda.Runtime.PYTHON_3_13, - timeout=Duration.minutes(LAMBDA_TIMEOUT_MINUTES), - memory_size=LAMBDA_MEMORY_SIZE, - environment={ - "SERVICE_NOW_INSTANCE_ID": service_now_instance_id_ssm.parameter_name, - "SERVICE_NOW_CLIENT_ID": service_now_client_id_ssm.parameter_name, - "SERVICE_NOW_USER_ID": service_now_user_id_ssm.parameter_name, - "PRIVATE_KEY_ASSET_BUCKET": private_key_asset_bucket_ssm.parameter_name, - "PRIVATE_KEY_ASSET_KEY": private_key_asset_key_ssm.parameter_name, - "SERVICE_NOW_CLIENT_SECRET_PARAM": service_now_client_secret_ssm_param.parameter_name, - "SERVICE_NOW_RESOURCE_PREFIX": service_now_api_gateway.rest_api_id, - "WEBHOOK_URL": f"{service_now_api_gateway.url.rstrip('/')}/webhook", - "API_AUTH_SECRET": api_auth_secret.secret_arn, - "INTEGRATION_MODULE": self.integration_module_param.value_as_string, - "LOG_LEVEL": log_level_param.value_as_string, - }, - role=service_now_resource_setup_role, - ) - - # Create custom resource provider - service_now_cr_provider = cr.Provider( - self, - "ServiceNowResourceSetupProvider", - on_event_handler=service_now_resource_setup_handler, - ) - - # Create custom resource - CustomResource( - self, - "ServiceNowResourceSetupCr", - service_token=service_now_cr_provider.service_token, - properties={ - "WebhookUrl": f"{service_now_api_gateway.url.rstrip('/')}/webhook", - "IntegrationModule": self.integration_module_param.value_as_string, - }, - ) - - # Add stack-level suppression - NagSuppressions.add_stack_suppressions( - self, - [ - { - "id": "AwsSolutions-IAM4", - "reason": "Built-in LogRetention Lambda role requires AWSLambdaBasicExecutionRole managed policy", - }, - { - "id": "AwsSolutions-IAM5", - "reason": "Built-in LogRetention Lambda needs these permissions to manage log retention", - }, - {"id": "AwsSolutions-SQS3", "reason": "SQS is used as DLQ"}, - { - "id": "AwsSolutions-L1", - "reason": "CDK-generated Lambda functions may use older runtimes which we cannot directly control", - }, - ], - ) - - """ - cdk to output the generated name of CFN resources - """ - # Output ServiceNow client ARN - CfnOutput( - self, - "ServiceNowClientLambdaArn", - value=service_now_client.function_arn, - description="ServiceNow Client Lambda Function ARN", - ) - - # Output API Gateway URL - CfnOutput( - self, - "ServiceNowWebhookUrl", - value=f"{service_now_api_gateway.url.rstrip('/')}/webhook", - description="ServiceNow Webhook API Gateway URL", - ) \ No newline at end of file + return token_based_method diff --git a/aws_security_incident_response_sample_integrations/constants.py b/aws_security_incident_response_sample_integrations/constants.py index cf3810c1..2c000a89 100644 --- a/aws_security_incident_response_sample_integrations/constants.py +++ b/aws_security_incident_response_sample_integrations/constants.py @@ -4,6 +4,19 @@ including AWS account IDs, event sources, and integration-specific constants. """ +from aws_cdk.aws_lambda import Runtime +from aws_cdk import Duration + +# Generic Lambda Constants +PYTHON_LAMBDA_RUNTIME = Runtime.PYTHON_3_13 +DEFAULT_LAMBDA_TIMEOUT = Duration.minutes(15) # Max Timeout for Lambda + +# API Gateway Constants +API_GATEWAY_LAMBDA_HANDLER_TIMEOUT = DEFAULT_LAMBDA_TIMEOUT +SECRET_ROTATION_LAMBDA_TIMEOUT = Duration.minutes(5) # Secrets Rotation may need to connect to an integration target to persist secret and persist it locally. +API_GATEWAY_AUTHORIZOR_TIMEOUT = Duration.seconds(29) # The default timeout for a regional API Gateway Endpoint is 29 seconds. + + # JIRA Account ID/Service Principal for creating an SNS topic that receives notifications/events from JIRA # see the detailed documentation here - https://support.atlassian.com/cloud-automation/docs/configure-aws-sns-for-jira-automation/ JIRA_AWS_ACCOUNT_ID = "815843069303" @@ -71,6 +84,8 @@ # ServiceNow automation constants +TOKEN_ROTATION_PERIOD = Duration.days(30) # Token rotates every 30 days. + # Lambda configuration constants LAMBDA_MEMORY_SIZE = 1024 LAMBDA_TIMEOUT_MINUTES = 15 diff --git a/cdk.context.json b/cdk.context.json index 72183ad6..c13f5cfb 100644 --- a/cdk.context.json +++ b/cdk.context.json @@ -1,6 +1,7 @@ { "acknowledged-issue-numbers": [ 32775, - 32775 + 24892, + 34892 ] } diff --git a/deploy-integrations-solution.py b/deploy_integrations_solution.py similarity index 94% rename from deploy-integrations-solution.py rename to deploy_integrations_solution.py index 531911a0..20003756 100755 --- a/deploy-integrations-solution.py +++ b/deploy_integrations_solution.py @@ -6,8 +6,8 @@ with proper parameter passing for different integration types. Usage: - ./deploy-integrations-solution.py jira --email user@example.com --url https://example.atlassian.net --token TOKEN --project-key PROJ - ./deploy-integrations-solution.py service-now --instance-id example --username admin --password PASSWORD --integration-module itsm + ./deploy_integrations_solution.py jira --email user@example.com --url https://example.atlassian.net --token TOKEN --project-key PROJ + ./deploy_integrations_solution.py service-now --instance-id example --username admin --password PASSWORD --integration-module itsm """ import argparse @@ -161,6 +161,8 @@ def deploy_servicenow(args): f"AwsSecurityIncidentResponseServiceNowIntegrationStack:privateKeyBucket={bucket_name}", "--parameters", f"AwsSecurityIncidentResponseServiceNowIntegrationStack:integrationModule={args.integration_module}", + "--parameters", + f"AwsSecurityIncidentResponseServiceNowIntegrationStack:useOAuth={'true' if getattr(args, 'use_oauth', False) else 'false'}", ] print("\nšŸ”„ Deploying ServiceNow integration...\n") # Using subprocess with a list of arguments is safe from shell injection @@ -283,6 +285,13 @@ def main(): required=True, help="ServiceNow integration module: 'itsm' for IT Service Management or 'ir' for Incident Response", ) + # TODO: Enable when ready to publish OAuth 2 Authentication for API Gateway + # servicenow_parser.add_argument( + # "--use-oauth", + # action="store_true", + # default=False, + # help="Use OAuth for API Gateway authentication instead of token-based auth (not yet implemented)", + # ) servicenow_parser.add_argument( "--log-level", choices=["info", "debug", "error"], @@ -326,9 +335,9 @@ def main(): print( textwrap.dedent(""" Please specify 'jira', 'service-now', or 'slack' as the integration type. - Example: deploy-integrations-solution jira --email user@example.com --url https://example.atlassian.net --token YOUR_TOKEN --project-key PROJ - Example: deploy-integrations-solution service-now --instance-id example --client-id YOUR_CLIENT_ID --client-secret YOUR_CLIENT_SECRET --user-id YOUR_USER_ID --private-key-path ./private.key --integration-module itsm - Example: deploy-integrations-solution slack --bot-token xoxb-... --signing-secret YOUR_SECRET --workspace-id YOUR_WORKSPACE_ID + Example: deploy_integrations_solution jira --email user@example.com --url https://example.atlassian.net --token YOUR_TOKEN --project-key PROJ + Example: deploy_integrations_solution service-now --instance-id example --client-id YOUR_CLIENT_ID --client-secret YOUR_CLIENT_SECRET --user-id YOUR_USER_ID --private-key-path ./private.key --integration-module itsm + Example: deploy_integrations_solution slack --bot-token xoxb-... --signing-secret YOUR_SECRET --workspace-id YOUR_WORKSPACE_ID """) ) parser.print_help() diff --git a/documentation/JIRA/JIRA.md b/documentation/JIRA/JIRA.md index 76b4c4c5..831e9af0 100644 --- a/documentation/JIRA/JIRA.md +++ b/documentation/JIRA/JIRA.md @@ -5,11 +5,11 @@ This document provides an overview of the AWS Security Incident Response Jira in ## Deployment 1. Use the `jira` argument to deploy the JIRA integration: - `./deploy-integrations-solution.py jira -h` + `./deploy_integrations_solution.py jira -h` You should see the following output: ``` - usage: deploy-integrations-solution jira [-h] --email EMAIL --url URL --token TOKEN + usage: deploy_integrations_solution jira [-h] --email EMAIL --url URL --token TOKEN options: -h, --help show this help message and exit @@ -21,7 +21,7 @@ This document provides an overview of the AWS Security Incident Response Jira in 2. Deploy the integration with a single command ```bash - ./deploy-integrations-solution.py jira \ + ./deploy_integrations_solution.py jira \ --email \ --url \ --token \ @@ -132,7 +132,7 @@ Bootstrap is a prerequisite to deployment. You cannot deploy the solution which git clone https://github.com/sample-aws-security-incident-response-integrations.git cd sample-aws-security-incident-response-integrations/ pip install -r requirements.txt - chmod +x deploy-integrations-solution.py + chmod +x deploy_integrations_solution.py sudo systemctl start docker.service sudo chmod 666 /var/run/docker.sock ``` @@ -160,7 +160,7 @@ Bootstrap is a prerequisite to deployment. You cannot deploy the solution which git clone https://github.com/sample-aws-security-incident-response-integrations.git cd sample-aws-security-incident-response-integrations/ pip install -r requirements.txt - chmod +x deploy-integrations-solution.py + chmod +x deploy_integrations_solution.py sudo systemctl start docker.service sudo chmod 666 /var/run/docker.sock ``` diff --git a/documentation/JIRA/JIRA_TROUBLESHOOTING.md b/documentation/JIRA/JIRA_TROUBLESHOOTING.md index 8d2b5d14..06f45db5 100644 --- a/documentation/JIRA/JIRA_TROUBLESHOOTING.md +++ b/documentation/JIRA/JIRA_TROUBLESHOOTING.md @@ -23,8 +23,8 @@ This document provides detailed information on setup, configuration, validation, 2. **Deploy the Stack**: ```bash - # Using the deploy-integrations-solution script - deploy-integrations-solution jira \ + # Using the deploy_integrations_solution script + deploy_integrations_solution jira \ --email \ --url \ --token \ diff --git a/documentation/SERVICE_NOW/SERVICE_NOW.md b/documentation/SERVICE_NOW/SERVICE_NOW.md index 1e5bf413..9c838500 100644 --- a/documentation/SERVICE_NOW/SERVICE_NOW.md +++ b/documentation/SERVICE_NOW/SERVICE_NOW.md @@ -6,7 +6,7 @@ This document provides an overview of the AWS Security Incident Response Service ```bash # Deploy the integration with JWT OAuth authentication -./deploy-integrations-solution.py service-now \ +./deploy_integrations_solution.py service-now \ --instance-id \ --client-id \ --client-secret \ @@ -19,7 +19,7 @@ This document provides an overview of the AWS Security Incident Response Service Eg. ```bash # Deploy the integration with JWT OAuth authentication -./deploy-integrations-solution.py service-now \ +./deploy_integrations_solution.py service-now \ --instance-id dev1234 \ --client-id test-1234 \ --client-secret "XXXXXXXXXXXXXXXXXXXX" \ @@ -148,7 +148,7 @@ Bootstrap is a prerequisite to deployment. You cannot deploy the solution which git clone https://github.com/aws-samples/sample-aws-security-incident-response-integrations.git cd sample-aws-security-incident-response-integrations/ pip install -r requirements.txt - chmod +x deploy-integrations-solution.py + chmod +x deploy_integrations_solution.py sudo systemctl start docker.service sudo chmod 666 /var/run/docker.sock ``` @@ -189,7 +189,7 @@ Bootstrap is a prerequisite to deployment. You cannot deploy the solution which git clone https://github.com/aws-samples/sample-aws-security-incident-response-integrations.git cd sample-aws-security-incident-response-integrations/ pip install -r requirements.txt - chmod +x deploy-integrations-solution.py + chmod +x deploy_integrations_solution.py sudo systemctl start docker.service sudo chmod 666 /var/run/docker.sock ``` @@ -572,8 +572,8 @@ The stack provides the following outputs that can be used for integration: 3. **Deploy the Stack**: ```bash - # Using the deploy-integrations-solution script with JWT OAuth - deploy-integrations-solution service-now \ + # Using the deploy_integrations_solution script with JWT OAuth + deploy_integrations_solution service-now \ --instance-id \ --client-id \ --client-secret \ diff --git a/documentation/SERVICE_NOW/SERVICE_NOW_TROUBLESHOOTING.md b/documentation/SERVICE_NOW/SERVICE_NOW_TROUBLESHOOTING.md index cfe414ae..8163d479 100644 --- a/documentation/SERVICE_NOW/SERVICE_NOW_TROUBLESHOOTING.md +++ b/documentation/SERVICE_NOW/SERVICE_NOW_TROUBLESHOOTING.md @@ -21,8 +21,8 @@ This document provides detailed information on troubleshooting, validation, and 2. **Deploy the Stack**: ```bash - # Using the deploy-integrations-solution script with JWT OAuth - deploy-integrations-solution service-now \ + # Using the deploy_integrations_solution script with JWT OAuth + deploy_integrations_solution service-now \ --instance-id \ --client-id \ --client-secret \ diff --git a/documentation/SLACK/SLACK.md b/documentation/SLACK/SLACK.md index 55bf1978..b411fe5a 100644 --- a/documentation/SLACK/SLACK.md +++ b/documentation/SLACK/SLACK.md @@ -114,7 +114,7 @@ Bootstrap is a prerequisite to deployment. You cannot deploy the solution which git clone https://github.com/aws-samples/sample-aws-security-incident-response-integrations.git cd sample-aws-security-incident-response-integrations/ pip install -r requirements.txt - chmod +x deploy-integrations-solution.py + chmod +x deploy_integrations_solution.py sudo systemctl start docker.service sudo chmod 666 /var/run/docker.sock ``` @@ -143,7 +143,7 @@ Bootstrap is a prerequisite to deployment. You cannot deploy the solution which git clone https://github.com/aws-samples/sample-aws-security-incident-response-integrations.git cd sample-aws-security-incident-response-integrations/ pip install -r requirements.txt - chmod +x deploy-integrations-solution.py + chmod +x deploy_integrations_solution.py sudo systemctl start docker.service sudo chmod 666 /var/run/docker.sock ``` @@ -164,13 +164,13 @@ Bootstrap is a prerequisite to deployment. You cannot deploy the solution which Use the deployment script to deploy the Slack integration: ```bash -./deploy-integrations-solution.py slack --help +./deploy_integrations_solution.py slack --help ``` You should see the following output: ``` -usage: deploy-integrations-solution slack [-h] --bot-token BOT_TOKEN +usage: deploy_integrations_solution slack [-h] --bot-token BOT_TOKEN --signing-secret SIGNING_SECRET --workspace-id WORKSPACE_ID [--region REGION] @@ -194,7 +194,7 @@ options: Deploy the integration with a single command: ```bash -./deploy-integrations-solution.py slack \ +./deploy_integrations_solution.py slack \ --bot-token "xoxb-YOUR-BOT-TOKEN-HERE" \ --signing-secret "YOUR-SIGNING-SECRET-HERE" \ --workspace-id "T1234567890" \ diff --git a/documentation/SLACK/SLACK_DEPLOYMENT_GUIDE.md b/documentation/SLACK/SLACK_DEPLOYMENT_GUIDE.md index f2c553db..621029ca 100644 --- a/documentation/SLACK/SLACK_DEPLOYMENT_GUIDE.md +++ b/documentation/SLACK/SLACK_DEPLOYMENT_GUIDE.md @@ -156,12 +156,12 @@ echo "T1234567890" | grep -E '^[A-Z0-9]{9,11}$' ### Step 2: Review Deployment Command ```bash -./deploy-integrations-solution.py slack --help +./deploy_integrations_solution.py slack --help ``` Expected output: ``` -usage: deploy-integrations-solution slack [-h] --bot-token BOT_TOKEN +usage: deploy_integrations_solution slack [-h] --bot-token BOT_TOKEN --signing-secret SIGNING_SECRET --workspace-id WORKSPACE_ID [--region REGION] @@ -185,7 +185,7 @@ options: ### Step 3: Deploy the Integration ```bash -./deploy-integrations-solution.py slack \ +./deploy_integrations_solution.py slack \ --bot-token "xoxb-YOUR-BOT-TOKEN-HERE" \ --signing-secret "YOUR-SIGNING-SECRET-HERE" \ --workspace-id "T1234567890" \ diff --git a/documentation/SLACK/SLACK_PARAMETER_MANAGEMENT.md b/documentation/SLACK/SLACK_PARAMETER_MANAGEMENT.md index b6623d24..e1b3db5e 100644 --- a/documentation/SLACK/SLACK_PARAMETER_MANAGEMENT.md +++ b/documentation/SLACK/SLACK_PARAMETER_MANAGEMENT.md @@ -30,7 +30,7 @@ All Slack credentials are stored in SSM Parameter Store with the following chara The simplest way to set up parameters is during CDK deployment: ```bash -./deploy-integrations-solution.py slack \ +./deploy_integrations_solution.py slack \ --bot-token "xoxb-YOUR-BOT-TOKEN-HERE" \ --signing-secret "YOUR-SIGNING-SECRET-HERE" \ --workspace-id "T1234567890" \ @@ -304,7 +304,7 @@ Enable debug logging to troubleshoot parameter issues: ```bash # Deploy with debug logging -./deploy-integrations-solution.py slack \ +./deploy_integrations_solution.py slack \ --bot-token "xoxb-..." \ --signing-secret "..." \ --workspace-id "T1234567890" \ diff --git a/documentation/SLACK/SLACK_TROUBLESHOOTING.md b/documentation/SLACK/SLACK_TROUBLESHOOTING.md index fbb890c4..01d28c24 100644 --- a/documentation/SLACK/SLACK_TROUBLESHOOTING.md +++ b/documentation/SLACK/SLACK_TROUBLESHOOTING.md @@ -756,7 +756,7 @@ This output provides the CloudWatch Logs group name for the Slack Command Handle 1. **Redeploy with Debug Logging**: ```bash - ./deploy-integrations-solution.py slack \ + ./deploy_integrations_solution.py slack \ --bot-token "" \ --signing-secret "" \ --workspace-id "" \ diff --git a/requirements-dev.txt b/requirements-dev.txt index cbac6b7c..a61b08e4 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,4 +1,18 @@ -r requirements.txt +-r assets/jira_client/requirements.txt +-r assets/jira_notifications_handler/requirements.txt +-r assets/security_ir_client/requirements.txt +-r assets/security_ir_poller/requirements.txt +-r assets/service_now_api_gateway_authorizer/requirements.txt +-r assets/service_now_client/requirements.txt +-r assets/service_now_notifications_handler/requirements.txt +-r assets/service_now_resource_setup_handler/requirements.txt +-r assets/service_now_secret_rotation_handler/requirements.txt +-r assets/slack_api_gateway_authorizer/requirements.txt +-r assets/slack_bolt_layer/requirements.txt +-r assets/slack_client/requirements.txt +-r assets/slack_command_handler/requirements.txt +-r assets/slack_events_bolt_handler/requirements.txt pytest>=7.0.0 pytest-cov pytest-xdist @@ -8,6 +22,8 @@ isort mypy types-boto3 types-requests +boto3-stubs +boto3-stubs-full moto bandit semgrep diff --git a/requirements.txt b/requirements.txt index d7dc544d..1d467da9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ aws-cdk-lib>=2.180.0 aws_cdk.aws_lambda_python_alpha>=2.180.0a0 cdk-nag>=2.35.37 constructs>=10.0.0,<11.0.0 -boto3 +boto3>=1.37.7 botocore pysnc urllib3>=1.26.0,<3.0.0 diff --git a/tests/assets/slack_api_gateway_authorizer/test_slack_api_gateway_authorizer.py b/tests/assets/slack_api_gateway_authorizer/test_slack_api_gateway_authorizer.py index 2bd5c596..fdcc9ffe 100644 --- a/tests/assets/slack_api_gateway_authorizer/test_slack_api_gateway_authorizer.py +++ b/tests/assets/slack_api_gateway_authorizer/test_slack_api_gateway_authorizer.py @@ -12,8 +12,12 @@ import pytest +# Store original boto3 module reference if it exists +_original_boto3 = sys.modules.get("boto3") + # Mock boto3 before importing the module -sys.modules["boto3"] = Mock() +_mock_boto3 = Mock() +sys.modules["boto3"] = _mock_boto3 # Add the assets directory to the path assets_path = Path(__file__).parent.parent.parent.parent / "assets" @@ -21,6 +25,12 @@ import index +# Restore original boto3 module after import to prevent test pollution +if _original_boto3 is not None: + sys.modules["boto3"] = _original_boto3 +else: + del sys.modules["boto3"] + @pytest.fixture def mock_ssm_client(): diff --git a/tests/assets/slack_events_bolt_handler/test_file_upload_handler.py b/tests/assets/slack_events_bolt_handler/test_file_upload_handler.py index f2c8140b..5933321e 100644 --- a/tests/assets/slack_events_bolt_handler/test_file_upload_handler.py +++ b/tests/assets/slack_events_bolt_handler/test_file_upload_handler.py @@ -25,7 +25,6 @@ MAX_FILE_SIZE_BYTES, SLACK_MAX_RETRIES, SLACK_INITIAL_RETRY_DELAY, - SLACK_MAX_RETRY_DELAY ) diff --git a/tests/cdk/cdk_test_utils.py b/tests/cdk/cdk_test_utils.py new file mode 100644 index 00000000..026fd396 --- /dev/null +++ b/tests/cdk/cdk_test_utils.py @@ -0,0 +1,138 @@ +"""Shared utilities for CDK stack tests. + +This module provides common classes and functions used across CDK test files, +including CDK Nag finding aggregation and compliance checking utilities. +""" + +import sys + +import aws_cdk as core +import cdk_nag + + +class Finding: + """Represents a CDK Nag finding. + + Attributes: + rule_id: The CDK Nag rule identifier + rule_explanation: Description of the rule violation + resource: The CDK resource that triggered the finding + stack_name: Name of the stack containing the resource + resource_id: Path identifier of the resource + """ + + def __init__(self, rule_id: str, rule_explanation: str, resource: core.CfnResource): + self.rule_id = rule_id + self.rule_explanation = rule_explanation + self.resource = resource + self.stack_name = ( + core.Names.unique_id(self.resource.stack) + if self.resource.stack.nested_stack_parent + else self.resource.stack.stack_name + ) + self.resource_id = self.resource.node.path + + def __str__(self): + return f"{self.resource_id}: {self.rule_id} -- {self.rule_explanation}" + + +class FindingAggregatorLogger(cdk_nag.AnnotationLogger): + """Aggregates CDK Nag findings for test assertions. + + This logger collects non-compliant and suppressed findings during CDK Nag + analysis, allowing tests to assert on compliance results. + + Attributes: + non_compliant_findings: List of findings that violate CDK Nag rules + suppressed_findings: List of findings that were suppressed + """ + + def __init__(self): + super().__init__() + self.non_compliant_findings: list[Finding] = [] + self.suppressed_findings: list[Finding] = [] + + def on_non_compliance( + self, + *, + finding_id: str, + nag_pack_name: str, + resource: core.CfnResource, + rule_explanation: str, + rule_id: str, + rule_info: str, + rule_level: cdk_nag.NagMessageLevel, + rule_original_name: str, + ) -> None: + self.non_compliant_findings.append(Finding(rule_id, rule_explanation, resource)) + + def on_error( + self, + *, + error_message: str, + nag_pack_name: str, + resource: core.CfnResource, + rule_explanation: str, + rule_id: str, + rule_info: str, + rule_level: cdk_nag.NagMessageLevel, + rule_original_name: str, + ) -> None: + print(f"Error found: {rule_id} - {rule_explanation}") + sys.exit(1) + + def on_compliance( + self, + *, + nag_pack_name: str, + resource: core.CfnResource, + rule_explanation: str, + rule_id: str, + rule_info: str, + rule_level: cdk_nag.NagMessageLevel, + rule_original_name: str, + ) -> None: + pass + + def on_suppressed( + self, + *, + suppression_reason: str, + finding_id: str, + nag_pack_name: str, + resource: core.CfnResource, + rule_explanation: str, + rule_id: str, + rule_info: str, + rule_level: cdk_nag.NagMessageLevel, + rule_original_name: str, + ) -> None: + self.suppressed_findings.append(Finding(rule_id, rule_explanation, resource)) + + def on_not_applicable( + self, + *, + nag_pack_name: str, + resource: core.CfnResource, + rule_explanation: str, + rule_id: str, + rule_info: str, + rule_level: cdk_nag.NagMessageLevel, + rule_original_name: str, + ) -> None: + pass + + def on_suppressed_error( + self, + *, + error_suppression_reason: str, + error_message: str, + nag_pack_name: str, + resource: core.CfnResource, + rule_explanation: str, + rule_id: str, + rule_info: str, + rule_level: cdk_nag.NagMessageLevel, + rule_original_name: str, + ) -> None: + print(f"Suppressed error finding: {rule_id} - {rule_explanation}") diff --git a/tests/cdk/test_aws_security_incident_response_sample_integrations_stack.py b/tests/cdk/test_aws_security_incident_response_sample_integrations_stack.py index 9c260bf4..c24bad14 100644 --- a/tests/cdk/test_aws_security_incident_response_sample_integrations_stack.py +++ b/tests/cdk/test_aws_security_incident_response_sample_integrations_stack.py @@ -1,4 +1,4 @@ -import sys +"""Tests for the common integrations CDK stack.""" import aws_cdk as core import cdk_nag @@ -10,113 +10,7 @@ AwsSecurityIncidentResponseSampleIntegrationsCommonStack, ) - -class Finding: - def __init__(self, rule_id: str, rule_explanation: str, resource: core.CfnResource): - self.rule_id = rule_id - self.rule_explanation = rule_explanation - self.resource = resource - self.stack_name = ( - core.Names.unique_id(self.resource.stack) - if self.resource.stack.nested_stack_parent - else self.resource.stack.stack_name - ) - self.resource_id = self.resource.node.path - - def __str__(self): - return f"{self.resource_id}: {self.rule_id} -- {self.rule_explanation}" - - -class FindingAggregatorLogger(cdk_nag.AnnotationLogger): - def __init__(self): - super().__init__() - self.non_compliant_findings: list[Finding] = [] - self.suppressed_findings: list[Finding] = [] - - def on_non_compliance( - self, - *, - finding_id: str, - nag_pack_name: str, - resource: core.CfnResource, - rule_explanation: str, - rule_id: str, - rule_info: str, - rule_level: cdk_nag.NagMessageLevel, - rule_original_name: str, - ) -> None: - self.non_compliant_findings.append(Finding(rule_id, rule_explanation, resource)) - - def on_error( - self, - *, - error_message: str, - nag_pack_name: str, - resource: core.CfnResource, - rule_explanation: str, - rule_id: str, - rule_info: str, - rule_level: cdk_nag.NagMessageLevel, - rule_original_name: str, - ) -> None: - print(f"Error found: {rule_id} - {rule_explanation}") - sys.exit(1) - - def on_compliance( - self, - *, - nag_pack_name: str, - resource: core.CfnResource, - rule_explanation: str, - rule_id: str, - rule_info: str, - rule_level: cdk_nag.NagMessageLevel, - rule_original_name: str, - ) -> None: - pass - - def on_suppressed( - self, - *, - suppression_reason: str, - finding_id: str, - nag_pack_name: str, - resource: core.CfnResource, - rule_explanation: str, - rule_id: str, - rule_info: str, - rule_level: cdk_nag.NagMessageLevel, - rule_original_name: str, - ) -> None: - self.suppressed_findings.append(Finding(rule_id, rule_explanation, resource)) - - def on_not_applicable( - self, - *, - nag_pack_name: str, - resource: core.CfnResource, - rule_explanation: str, - rule_id: str, - rule_info: str, - rule_level: cdk_nag.NagMessageLevel, - rule_original_name: str, - ) -> None: - pass - - def on_suppressed_error( - self, - *, - error_suppression_reason: str, - error_message: str, - nag_pack_name: str, - resource: core.CfnResource, - rule_explanation: str, - rule_id: str, - rule_info: str, - rule_level: cdk_nag.NagMessageLevel, - rule_original_name: str, - ) -> None: - print(f"Suppressed error finding: {rule_id} - {rule_explanation}") +from .cdk_test_utils import FindingAggregatorLogger @pytest.fixture(autouse=True) diff --git a/tests/cdk/test_aws_security_incident_response_service_now_integration_stack.py b/tests/cdk/test_aws_security_incident_response_service_now_integration_stack.py new file mode 100644 index 00000000..ff19e85b --- /dev/null +++ b/tests/cdk/test_aws_security_incident_response_service_now_integration_stack.py @@ -0,0 +1,229 @@ +"""Tests for the ServiceNow integration CDK stack.""" + +import aws_cdk as core +import pytest +from aws_cdk.assertions import Template +from cdk_nag import AwsSolutionsChecks, NagSuppressions + +from aws_security_incident_response_sample_integrations.aws_security_incident_response_sample_integrations_common_stack import ( + AwsSecurityIncidentResponseSampleIntegrationsCommonStack, +) +from aws_security_incident_response_sample_integrations.aws_security_incident_response_service_now_integration_stack import ( + AwsSecurityIncidentResponseServiceNowIntegrationStack, +) + +from .cdk_test_utils import FindingAggregatorLogger + + +@pytest.fixture(scope="module") +def app(): + """Create a CDK app for testing (module-scoped for performance).""" + return core.App() + + +@pytest.fixture(scope="module") +def common_stack(app): + """Create the common stack required by ServiceNow integration (module-scoped).""" + return AwsSecurityIncidentResponseSampleIntegrationsCommonStack( + app, "service-now-test-common-stack" + ) + + +@pytest.fixture(scope="module") +def service_now_stack(app, common_stack): + """Create the ServiceNow integration stack for testing (module-scoped).""" + return AwsSecurityIncidentResponseServiceNowIntegrationStack( + app, "service-now-test-stack", common_stack=common_stack + ) + + +@pytest.fixture(scope="module") +def template(service_now_stack): + """Create template once for all tests (module-scoped).""" + return Template.from_stack(service_now_stack) + + +def test_service_now_stack_synthesizes(template): + """Test that the ServiceNow stack synthesizes without errors.""" + assert template is not None + +def test_lambda_authorizer_exists(template): + """Test that the API Gateway Lambda authorizer is created.""" + template.has_resource( + "AWS::ApiGateway::Authorizer", + {"Properties": {"Type": "TOKEN"}}, + ) + + +def test_security_compliance(app, common_stack, service_now_stack): + """Test CDK Nag security compliance for the ServiceNow stack.""" + spy = FindingAggregatorLogger() + checks = AwsSolutionsChecks(additional_loggers=[spy], verbose=True) + + # Add stack-level suppressions for common patterns + NagSuppressions.add_stack_suppressions( + common_stack, + [ + { + "id": "AwsSolutions-L1", + "reason": "Using the latest available runtime for Python (3.13)", + }, + { + "id": "AwsSolutions-SQS3", + "reason": "DLQs are used appropriately in the architecture", + }, + { + "id": "AwsSolutions-IAM4", + "reason": "AWS CDK custom resource provider requires managed policies", + } + ], + ) + + NagSuppressions.add_stack_suppressions( + service_now_stack, + [ + { + "id": "AwsSolutions-L1", + "reason": "Using the latest available runtime for Python (3.13)", + }, + { + "id": "AwsSolutions-SQS3", + "reason": "DLQs are used appropriately in the architecture", + }, + { + "id": "AwsSolutions-IAM4", + "reason": "AWS CDK custom resource provider requires managed policies", + }, + { + "id": "AwsSolutions-APIG2", + "reason": "Request validation is handled by Lambda authorizer and handler", # FIXME: In the next round of changes, we will add the option for OAuth 2.0 + }, + { + "id": "AwsSolutions-APIG4", + "reason": "Authorization is implemented via Lambda authorizer", # FIXME: In the next round of changes, we will add the option for OAuth 2.0 + }, + { + "id": "AwsSolutions-COG4", + "reason": "Using Lambda authorizer instead of Cognito", # FIXME: In the next round of changes, we will add the option for OAuth 2.0 + }, + { + "id": "AwsSolutions-APIG1", + "reason": "Access logging is enabled via deploy options", # + }, + { + "id": "AwsSolutions-APIG3", + "reason": "WAF not required for this internal integration", # TODO: Add WAF integration to ApiGateways + }, + { + "id": "AwsSolutions-APIG6", + "reason": "CloudWatch logging is configured at stage level", + }, + { + "id": "AwsSolutions-SMG4", + "reason": "Secret rotation is configured with 30-day schedule", + }, + ], + ) + + core.Aspects.of(app).add(checks) + app.synth() + + if spy.non_compliant_findings: + print("\n") + for finding in spy.non_compliant_findings: + print(f"Non-compliant finding: {finding}") + assert False, f"Found {len(spy.non_compliant_findings)} non-compliant findings" + +class TestConditionalResources: + """Tests for CfnCondition-based conditional resource creation.""" + + def test_use_oauth_condition_exists(self, template): + """Test that the UseOAuthCondition is defined in the template.""" + conditions = template.find_conditions("*") + + condition_keys = list(conditions.keys()) + assert any("UseOAuthCondition" in key for key in condition_keys) + + def test_use_token_auth_condition_exists(self, template): + """Test that the UseTokenAuthCondition is defined in the template.""" + conditions = template.find_conditions("*") + + condition_keys = list(conditions.keys()) + assert any("UseTokenAuthCondition" in key for key in condition_keys) + + def test_combined_condition_exists(self, template): + """Test that the TemporaryOrCondition combining OAuth and token auth exists.""" + conditions = template.find_conditions("*") + + condition_keys = list(conditions.keys()) + assert any("TemporaryOrCondition" in key for key in condition_keys) + + def test_api_gateway_has_condition_applied(self, template): + """Test that API Gateway resources have the combined condition applied.""" + api_gateways = template.find_resources("AWS::ApiGateway::Authorizer") + + # At least one API Gateway should have a condition + has_condition = False + for resource in api_gateways.values(): + if "Condition" in resource: + has_condition = True + break + assert has_condition, "Authorizers should have a condition applied" + + def test_use_oauth_parameter_allowed_values(self, template): + """Test that useOAuth parameter only allows 'true' or 'false'.""" + parameters = template.find_parameters("*") + + oauth_param = None + for key, value in parameters.items(): + if "useoauth" in key.lower(): + oauth_param = value + break + + assert oauth_param is not None, "useOAuth parameter should exist" + assert oauth_param.get("AllowedValues") == ["true", "false"] + assert oauth_param.get("Default") == "false" + + def test_integration_module_parameter_allowed_values(self, template): + """Test that integrationModule parameter only allows 'itsm' or 'ir'.""" + parameters = template.find_parameters("*") + + module_param = None + for key, value in parameters.items(): + if "integrationmodule" in key.lower(): + module_param = value + break + + assert module_param is not None, "integrationModule parameter should exist" + assert module_param.get("AllowedValues") == ["itsm", "ir"] + assert module_param.get("Default") == "itsm" + + def test_condition_evaluates_oauth_parameter(self, template): + """Test that UseOAuthCondition evaluates the useOAuth parameter.""" + conditions = template.find_conditions("*") + + # Find the UseOAuthCondition + oauth_condition = None + for key, value in conditions.items(): + if "UseOAuthCondition" in key: + oauth_condition = value + break + + assert oauth_condition is not None + # The condition should use Fn::Equals to compare parameter value to "true" + assert "Fn::Equals" in oauth_condition + + def test_condition_evaluates_token_auth_parameter(self, template): + """Test that UseTokenAuthCondition evaluates the useOAuth parameter for 'false'.""" + conditions = template.find_conditions("*") + + # Find the UseTokenAuthCondition + token_condition = None + for key, value in conditions.items(): + if "UseTokenAuthCondition" in key: + token_condition = value + break + + assert token_condition is not None + # The condition should use Fn::Equals to compare parameter value to "false" + assert "Fn::Equals" in token_condition diff --git a/tests/test_deploy_integrations_solution.py b/tests/test_deploy_integrations_solution.py new file mode 100644 index 00000000..0662aee0 --- /dev/null +++ b/tests/test_deploy_integrations_solution.py @@ -0,0 +1,486 @@ +"""Tests for deploy_integrations_solution.py deployment script.""" + +import argparse +import subprocess +import sys +from unittest.mock import MagicMock, patch + +import boto3 +import pytest +from moto import mock_aws + +from deploy_integrations_solution import deploy_jira, deploy_servicenow, deploy_slack, main + +# Test account ID used by moto +MOTO_ACCOUNT_ID = "123456789012" + + +class TestArgumentParsing: + """Tests for CLI argument parsing.""" + + def test_jira_required_arguments(self): + """Test that Jira integration requires all mandatory arguments.""" + with pytest.raises(SystemExit): + with patch.object(sys, "argv", ["prog", "jira"]): + main() + + def test_jira_all_arguments_parsed(self): + """Test that all Jira arguments are correctly parsed.""" + args = argparse.Namespace( + email="test@example.com", + url="https://example.atlassian.net", + token="test-token", + project_key="PROJ", + log_level="error", + ) + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + result = deploy_jira(args) + assert result == 0 + + def test_servicenow_required_arguments(self): + """Test that ServiceNow integration requires all mandatory arguments.""" + with pytest.raises(SystemExit): + with patch.object(sys, "argv", ["prog", "service-now"]): + main() + + def test_servicenow_all_arguments_parsed(self): + """Test that all ServiceNow arguments are correctly parsed.""" + args = argparse.Namespace( + instance_id="test-instance", + client_id="client123", + client_secret="secret456", + user_id="user789", + private_key_path="./test.key", + integration_module="itsm", + log_level="error", + use_oauth=False, + ) + mock_s3 = MagicMock() + mock_sts = MagicMock() + mock_sts.get_caller_identity.return_value = {"Account": "123456789012"} + + with patch("os.path.exists", return_value=True): + with patch("boto3.client") as mock_boto: + mock_boto.side_effect = lambda service: mock_s3 if service == "s3" else mock_sts + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + result = deploy_servicenow(args) + assert result == 0 + + def test_slack_required_arguments(self): + """Test that Slack integration requires all mandatory arguments.""" + with pytest.raises(SystemExit): + with patch.object(sys, "argv", ["prog", "slack"]): + main() + + def test_slack_all_arguments_parsed(self): + """Test that all Slack arguments are correctly parsed.""" + args = argparse.Namespace( + bot_token="xoxb-test-token", + signing_secret="test-secret", + workspace_id="T12345", + region="us-east-1", + skip_verification=True, + log_level="error", + ) + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + result = deploy_slack(args) + assert result == 0 + + def test_slack_optional_arguments(self): + """Test Slack optional arguments have correct defaults.""" + args = argparse.Namespace( + bot_token="xoxb-test", + signing_secret="secret", + workspace_id="T123", + region="us-east-1", + skip_verification=False, + log_level="error", + ) + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + result = deploy_slack(args) + assert result == 0 + + def test_no_integration_specified(self): + """Test that missing integration type shows error.""" + with patch.object(sys, "argv", ["prog"]): + with pytest.raises(SystemExit) as exc_info: + main() + assert exc_info.value.code == 1 + + def test_log_level_override(self): + """Test that log level can be overridden.""" + args = argparse.Namespace( + email="test@example.com", + url="https://example.atlassian.net", + token="token", + project_key="PROJ", + log_level="debug", + ) + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + result = deploy_jira(args) + assert result == 0 + cmd = mock_run.call_args[0][0] + assert "AwsSecurityIncidentResponseSampleIntegrationsCommonStack:logLevel=debug" in " ".join(cmd) + + +class TestDeployJira: + """Tests for Jira deployment function.""" + + def test_deploy_jira_success(self): + """Test successful Jira deployment.""" + args = argparse.Namespace( + email="test@example.com", + url="https://example.atlassian.net", + token="test-token", + project_key="PROJ", + log_level="error", + ) + mock_result = MagicMock() + mock_result.returncode = 0 + + with patch("subprocess.run", return_value=mock_result) as mock_run: + result = deploy_jira(args) + assert result == 0 + mock_run.assert_called_once() + cmd = mock_run.call_args[0][0] + assert "npx" in cmd + assert "cdk" in cmd + assert "deploy" in cmd + assert "AwsSecurityIncidentResponseJiraIntegrationStack" in cmd + + def test_deploy_jira_builds_correct_command(self): + """Test that Jira deployment builds correct CDK command.""" + args = argparse.Namespace( + email="user@test.com", + url="https://test.atlassian.net", + token="my-token", + project_key="TEST", + log_level="debug", + ) + mock_result = MagicMock() + mock_result.returncode = 0 + + with patch("subprocess.run", return_value=mock_result) as mock_run: + deploy_jira(args) + cmd = mock_run.call_args[0][0] + assert "AwsSecurityIncidentResponseJiraIntegrationStack:jiraEmail=user@test.com" in " ".join(cmd) + assert "AwsSecurityIncidentResponseJiraIntegrationStack:jiraUrl=https://test.atlassian.net" in " ".join(cmd) + assert "AwsSecurityIncidentResponseJiraIntegrationStack:jiraToken=my-token" in " ".join(cmd) + assert "AwsSecurityIncidentResponseJiraIntegrationStack:jiraProjectKey=TEST" in " ".join(cmd) + + def test_deploy_jira_subprocess_error(self): + """Test Jira deployment handles subprocess errors.""" + args = argparse.Namespace( + email="test@example.com", + url="https://example.atlassian.net", + token="test-token", + project_key="PROJ", + log_level="error", + ) + with patch( + "subprocess.run", + side_effect=subprocess.CalledProcessError(1, "cmd"), + ): + result = deploy_jira(args) + assert result == 1 + + def test_deploy_jira_unexpected_error(self): + """Test Jira deployment handles unexpected errors.""" + args = argparse.Namespace( + email="test@example.com", + url="https://example.atlassian.net", + token="test-token", + project_key="PROJ", + log_level="error", + ) + with patch("subprocess.run", side_effect=Exception("Unexpected")): + result = deploy_jira(args) + assert result == 1 + + +class TestDeployServiceNow: + """Tests for ServiceNow deployment function.""" + + @pytest.fixture + def servicenow_args(self, tmp_path): + """Common ServiceNow deployment arguments with a real temp key file.""" + key_file = tmp_path / "test.key" + key_file.write_text("fake-private-key-content") + return argparse.Namespace( + instance_id="test-instance", + client_id="client123", + client_secret="secret456", + user_id="user789", + private_key_path=str(key_file), + integration_module="itsm", + log_level="error", + use_oauth=False, + ) + + def test_deploy_servicenow_private_key_not_found(self): + """Test ServiceNow deployment fails when private key file doesn't exist.""" + args = argparse.Namespace( + instance_id="test-instance", + client_id="client123", + client_secret="secret456", + user_id="user789", + private_key_path="./nonexistent.key", + integration_module="itsm", + log_level="error", + use_oauth=False, + ) + result = deploy_servicenow(args) + assert result == 1 + + @mock_aws + def test_deploy_servicenow_success(self, servicenow_args): + """Test successful ServiceNow deployment.""" + mock_result = MagicMock() + mock_result.returncode = 0 + + with patch("subprocess.run", return_value=mock_result) as mock_run: + result = deploy_servicenow(servicenow_args) + assert result == 0 + mock_run.assert_called_once() + + @mock_aws + def test_deploy_servicenow_s3_bucket_already_exists(self, servicenow_args): + """Test ServiceNow deployment handles existing S3 bucket.""" + # Pre-create the bucket to simulate "already exists" + s3 = boto3.client("s3", region_name="us-east-1") + bucket_name = f"snow-key-{MOTO_ACCOUNT_ID}" + s3.create_bucket(Bucket=bucket_name) + + mock_result = MagicMock() + mock_result.returncode = 0 + + with patch("subprocess.run", return_value=mock_result) as mock_run: + result = deploy_servicenow(servicenow_args) + assert result == 0 + mock_run.assert_called_once() + + def test_deploy_servicenow_s3_bucket_creation_error(self, tmp_path): + """Test ServiceNow deployment handles S3 bucket creation errors.""" + key_file = tmp_path / "test.key" + key_file.write_text("fake-private-key-content") + args = argparse.Namespace( + instance_id="test-instance", + client_id="client123", + client_secret="secret456", + user_id="user789", + private_key_path=str(key_file), + integration_module="itsm", + log_level="error", + use_oauth=False, + ) + mock_s3 = MagicMock() + mock_s3.exceptions.BucketAlreadyOwnedByYou = type("BucketAlreadyOwnedByYou", (Exception,), {}) + mock_s3.exceptions.BucketAlreadyExists = type("BucketAlreadyExists", (Exception,), {}) + mock_s3.create_bucket.side_effect = Exception("Bucket creation failed") + mock_sts = MagicMock() + mock_sts.get_caller_identity.return_value = {"Account": MOTO_ACCOUNT_ID} + + with patch("boto3.client") as mock_boto: + mock_boto.side_effect = lambda service, **kwargs: mock_s3 if service == "s3" else mock_sts + result = deploy_servicenow(args) + assert result == 1 + + def test_deploy_servicenow_s3_upload_error(self, tmp_path): + """Test ServiceNow deployment handles S3 upload errors.""" + key_file = tmp_path / "test.key" + key_file.write_text("fake-private-key-content") + args = argparse.Namespace( + instance_id="test-instance", + client_id="client123", + client_secret="secret456", + user_id="user789", + private_key_path=str(key_file), + integration_module="itsm", + log_level="error", + use_oauth=False, + ) + mock_s3 = MagicMock() + mock_s3.exceptions.BucketAlreadyOwnedByYou = type("BucketAlreadyOwnedByYou", (Exception,), {}) + mock_s3.exceptions.BucketAlreadyExists = type("BucketAlreadyExists", (Exception,), {}) + mock_s3.upload_file.side_effect = Exception("Upload failed") + mock_sts = MagicMock() + mock_sts.get_caller_identity.return_value = {"Account": MOTO_ACCOUNT_ID} + + with patch("boto3.client") as mock_boto: + mock_boto.side_effect = lambda service, **kwargs: mock_s3 if service == "s3" else mock_sts + result = deploy_servicenow(args) + assert result == 1 + + @mock_aws + def test_deploy_servicenow_subprocess_error(self, servicenow_args): + """Test ServiceNow deployment handles subprocess errors.""" + with patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "cmd")): + result = deploy_servicenow(servicenow_args) + assert result == 1 + + @mock_aws + def test_deploy_servicenow_use_oauth_false(self, servicenow_args): + """Test ServiceNow deployment with use_oauth explicitly set to False.""" + mock_result = MagicMock() + mock_result.returncode = 0 + + with patch("subprocess.run", return_value=mock_result) as mock_run: + result = deploy_servicenow(servicenow_args) + assert result == 0 + cmd = " ".join(mock_run.call_args[0][0]) + assert "useOAuth=false" in cmd + + @mock_aws + def test_deploy_servicenow_use_oauth_true(self, servicenow_args): + """Test ServiceNow deployment with use_oauth explicitly set to True.""" + servicenow_args.use_oauth = True + mock_result = MagicMock() + mock_result.returncode = 0 + + with patch("subprocess.run", return_value=mock_result) as mock_run: + result = deploy_servicenow(servicenow_args) + assert result == 0 + cmd = " ".join(mock_run.call_args[0][0]) + assert "useOAuth=true" in cmd + + @mock_aws + def test_deploy_servicenow_use_oauth_missing_attribute(self, tmp_path): + """Test ServiceNow deployment defaults use_oauth to False when attribute is missing.""" + key_file = tmp_path / "test.key" + key_file.write_text("fake-private-key-content") + args = argparse.Namespace( + instance_id="test-instance", + client_id="client123", + client_secret="secret456", + user_id="user789", + private_key_path=str(key_file), + integration_module="itsm", + log_level="error", + # use_oauth intentionally omitted - should default to False + ) + mock_result = MagicMock() + mock_result.returncode = 0 + + with patch("subprocess.run", return_value=mock_result) as mock_run: + result = deploy_servicenow(args) + assert result == 0 + cmd = " ".join(mock_run.call_args[0][0]) + assert "useOAuth=false" in cmd + + +class TestDeploySlack: + """Tests for Slack deployment function.""" + + def test_deploy_slack_success_skip_verification(self): + """Test successful Slack deployment with verification skipped.""" + args = argparse.Namespace( + bot_token="xoxb-test-token", + signing_secret="test-secret", + workspace_id="T12345", + region="us-east-1", + skip_verification=True, + log_level="error", + ) + mock_result = MagicMock() + mock_result.returncode = 0 + + with patch("subprocess.run", return_value=mock_result) as mock_run: + result = deploy_slack(args) + assert result == 0 + # Should only be called once (CDK deploy, no verification) + assert mock_run.call_count == 1 + + def test_deploy_slack_success_with_verification(self): + """Test successful Slack deployment with verification.""" + args = argparse.Namespace( + bot_token="xoxb-test-token", + signing_secret="test-secret", + workspace_id="T12345", + region="us-west-2", + skip_verification=False, + log_level="error", + ) + mock_result = MagicMock() + mock_result.returncode = 0 + + with patch("subprocess.run", return_value=mock_result) as mock_run: + result = deploy_slack(args) + assert result == 0 + # Should be called twice (CDK deploy + verification) + assert mock_run.call_count == 2 + + def test_deploy_slack_builds_correct_command(self): + """Test that Slack deployment builds correct CDK command.""" + args = argparse.Namespace( + bot_token="xoxb-my-token", + signing_secret="my-secret", + workspace_id="T99999", + region="us-east-1", + skip_verification=True, + log_level="info", + ) + mock_result = MagicMock() + mock_result.returncode = 0 + + with patch("subprocess.run", return_value=mock_result) as mock_run: + deploy_slack(args) + cmd = mock_run.call_args[0][0] + assert "AwsSecurityIncidentResponseSlackIntegrationStack:slackBotToken=xoxb-my-token" in " ".join(cmd) + assert "AwsSecurityIncidentResponseSlackIntegrationStack:slackSigningSecret=my-secret" in " ".join(cmd) + assert "AwsSecurityIncidentResponseSlackIntegrationStack:slackWorkspaceId=T99999" in " ".join(cmd) + + def test_deploy_slack_subprocess_error(self): + """Test Slack deployment handles subprocess errors.""" + args = argparse.Namespace( + bot_token="xoxb-test-token", + signing_secret="test-secret", + workspace_id="T12345", + region="us-east-1", + skip_verification=True, + log_level="error", + ) + with patch( + "subprocess.run", + side_effect=subprocess.CalledProcessError(1, "cmd"), + ): + result = deploy_slack(args) + assert result == 1 + + def test_deploy_slack_unexpected_error(self): + """Test Slack deployment handles unexpected errors.""" + args = argparse.Namespace( + bot_token="xoxb-test-token", + signing_secret="test-secret", + workspace_id="T12345", + region="us-east-1", + skip_verification=True, + log_level="error", + ) + with patch("subprocess.run", side_effect=Exception("Unexpected")): + result = deploy_slack(args) + assert result == 1 + + def test_deploy_slack_verification_failure_still_returns_success(self): + """Test that verification failure doesn't change deployment return code.""" + args = argparse.Namespace( + bot_token="xoxb-test-token", + signing_secret="test-secret", + workspace_id="T12345", + region="us-east-1", + skip_verification=False, + log_level="error", + ) + deploy_result = MagicMock() + deploy_result.returncode = 0 + verify_result = MagicMock() + verify_result.returncode = 1 + + with patch("subprocess.run", side_effect=[deploy_result, verify_result]): + result = deploy_slack(args) + # Deployment succeeded even though verification failed + assert result == 0