1+ ######################################################################################################################
2+ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. #
3+ # #
4+ # Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance #
5+ # with the License. A copy of the License is located at #
6+ # #
7+ # http://www.apache.org/licenses/LICENSE-2.0 #
8+ # #
9+ # or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES #
10+ # OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions #
11+ # and limitations under the License. #
12+ ######################################################################################################################
13+ #!/bin/python
14+
15+ import datetime
16+ from os import environ
17+ from lib .boto3_util import create_client
18+
19+ class WAFCloudWatchMetrics (object ):
20+ """
21+ This class creates a wrapper function for cloudwatch get_metric_statistics API
22+ and another function to add the waf cw metric statistics to the anonymous usage
23+ data that the solution collects
24+ """
25+ def __init__ (self , log ):
26+ self .log = log
27+ self .cw_client = create_client ('cloudwatch' )
28+
29+ def get_cw_metric_statistics (self , metric_name , period_seconds , waf_rule ,
30+ namespace = 'AWS/WAFV2' ,
31+ statistics = ['Sum' ],
32+ start_time = datetime .datetime .utcnow (),
33+ end_time = datetime .datetime .utcnow (),
34+ web_acl = 'STACK_NAME' ):
35+ """
36+ Get a WAF CloudWatch metric given a WAF rule and metric name.
37+ Parameters:
38+ metric_name: string. The name of the metric. Optional.
39+ period_seconds: integer. The granularity, in seconds, of the returned data points.
40+ waf_rule: string. The name of the WAF rule.
41+ namespace: string. The namespace of the metric. Optional.
42+ statistics: list. The metric statistics, other than percentile. Optional.
43+ start_time: datetime. The time stamp that determines the first data point to return. Optional.
44+ end_time: datetime. The time stamp that determines the last data point to return. Optional.
45+ web_acl: string. The name of the WebACL. Optional
46+
47+ Returns: Metric data points if any, or None
48+ """
49+ try :
50+ response = self .cw_client .get_metric_statistics (
51+ MetricName = metric_name ,
52+ Namespace = namespace ,
53+ Statistics = statistics ,
54+ Period = period_seconds ,
55+ StartTime = start_time - datetime .timedelta (seconds = period_seconds ),
56+ EndTime = end_time ,
57+ Dimensions = [
58+ {
59+ "Name" : "Rule" ,
60+ "Value" : waf_rule
61+ },
62+ {
63+ "Name" : "WebACL" ,
64+ "Value" : environ .get (web_acl )
65+ },
66+ {
67+ "Name" : "Region" ,
68+ "Value" : environ .get ('AWS_REGION' )
69+ }
70+ ]
71+ )
72+ self .log .debug ("[cw_metrics_util: get_cw_metric_statistics] response:\n {}" .format (response ))
73+ return response if len (response ['Datapoints' ]) > 0 else None
74+ except Exception as e :
75+ self .log .error ("[cw_metrics_util: get_cw_metric_statistics] Failed to get metric %s." , metric_name )
76+ self .log .error (e )
77+ return None
78+
79+ def add_waf_cw_metric_to_usage_data (self , metric_name , period_seconds , waf_rule ,
80+ usage_data , usage_data_field_name , default_value ):
81+ """
82+ Get the CloudWatch metric statistics given a WAF rule and metric name, and
83+ add it to the anonymous usage data collected by the solution.
84+ Parameters:
85+ metric_name: string. The name of the metric. Optional.
86+ period_seconds: integer. The granularity, in seconds, of the returned data points.
87+ waf_rule: string. The name of the WAF rule.
88+ usage_data: JSON. Anonymous customer usage data of the solution
89+ usage_data_field_name: string. The field name in the usage data whose value will be
90+ replaced by the waf cloudwatch metric (if any)
91+ default_value: number. The default value of the field in the usage data
92+
93+ Returns: JSON. usage data.
94+ """
95+ self .log .info ("[cw_metrics_util: add_waf_cw_metric_to_usage_data] "
96+ + "Get metric %s for waf rule %s." % (metric_name , waf_rule ))
97+
98+ response = self .get_cw_metric_statistics (
99+ metric_name = metric_name ,
100+ period_seconds = period_seconds ,
101+ waf_rule = waf_rule
102+ )
103+ usage_data [usage_data_field_name ] = \
104+ response ['Datapoints' ][0 ]['Sum' ] if response is not None else default_value
105+
106+ self .log .info ("[cw_metrics_util: add_waf_cw_metric_to_usage_data] "
107+ + "%s - rule %s: %s" % (metric_name , waf_rule , str (usage_data [usage_data_field_name ])))
108+
109+ return usage_data
0 commit comments