Skip to content

Commit f195602

Browse files
jaymccontobywf
authored andcommitted
Implement CloudWatch metrics (#69)
1 parent 0448d3b commit f195602

File tree

4 files changed

+241
-94
lines changed

4 files changed

+241
-94
lines changed

src/cloudformation_cli_python_lib/metrics.py

Lines changed: 71 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,21 @@
1111

1212
LOG = logging.getLogger(__name__)
1313

14+
METRIC_NAMESPACE_ROOT = "AWS/CloudFormation"
15+
1416

1517
def format_dimensions(dimensions: Mapping[str, str]) -> List[Mapping[str, str]]:
1618
return [{"Name": key, "Value": value} for key, value in dimensions.items()]
1719

1820

1921
class MetricPublisher:
20-
def __init__(self, namespace: str, session: Session) -> None:
21-
self.namespace = namespace
22+
def __init__(self, account_id: str, resource_type: str, session: Session) -> None:
23+
suffix = resource_type.replace("::", "/")
24+
self.namespace = f"{METRIC_NAMESPACE_ROOT}/{account_id}/{suffix}"
25+
self.resource_type = resource_type
2226
self.client = session.client("cloudwatch")
2327

24-
def _publish_metric( # pylint: disable-msg=too-many-arguments
28+
def publish_metric( # pylint: disable-msg=too-many-arguments
2529
self,
2630
metric_name: MetricTypes,
2731
dimensions: Mapping[str, str],
@@ -34,9 +38,9 @@ def _publish_metric( # pylint: disable-msg=too-many-arguments
3438
Namespace=self.namespace,
3539
MetricData=[
3640
{
37-
"MetricName": metric_name,
41+
"MetricName": metric_name.name,
3842
"Dimensions": format_dimensions(dimensions),
39-
"Unit": unit,
43+
"Unit": unit.name,
4044
"Timestamp": str(timestamp),
4145
"Value": value,
4246
}
@@ -46,51 +50,76 @@ def _publish_metric( # pylint: disable-msg=too-many-arguments
4650
except ClientError as e:
4751
LOG.error("An error occurred while publishing metrics: %s", str(e))
4852

53+
54+
class MetricsPublisherProxy:
55+
def __init__(self) -> None:
56+
self._publishers: List[MetricPublisher] = []
57+
58+
def add_metrics_publisher(self, publisher: MetricPublisher) -> None:
59+
self._publishers.append(publisher)
60+
4961
def publish_exception_metric(
5062
self, timestamp: datetime.datetime, action: Action, error: Any
5163
) -> None:
52-
dimensions: Mapping[str, str] = {
53-
"DimensionKeyActionType": action,
54-
"DimensionKeyExceptionType": str(type(error)),
55-
"DimensionKeyResourceType": self.namespace,
56-
}
57-
58-
self._publish_metric(
59-
metric_name=MetricTypes.HandlerException,
60-
dimensions=dimensions,
61-
unit=StandardUnit.Count,
62-
value=1.0,
63-
timestamp=timestamp,
64-
)
64+
for publisher in self._publishers:
65+
dimensions: Mapping[str, str] = {
66+
"DimensionKeyActionType": action.name,
67+
"DimensionKeyExceptionType": str(type(error)),
68+
"DimensionKeyResourceType": publisher.resource_type,
69+
}
70+
publisher.publish_metric(
71+
metric_name=MetricTypes.HandlerException,
72+
dimensions=dimensions,
73+
unit=StandardUnit.Count,
74+
value=1.0,
75+
timestamp=timestamp,
76+
)
6577

6678
def publish_invocation_metric(
6779
self, timestamp: datetime.datetime, action: Action
6880
) -> None:
69-
dimensions = {
70-
"DimensionKeyActionType": action,
71-
"DimensionKeyResourceType": self.namespace,
72-
}
73-
74-
self._publish_metric(
75-
metric_name=MetricTypes.HandlerInvocationCount,
76-
dimensions=dimensions,
77-
unit=StandardUnit.Count,
78-
value=1.0,
79-
timestamp=timestamp,
80-
)
81+
for publisher in self._publishers:
82+
dimensions = {
83+
"DimensionKeyActionType": action.name,
84+
"DimensionKeyResourceType": publisher.resource_type,
85+
}
86+
publisher.publish_metric(
87+
metric_name=MetricTypes.HandlerInvocationCount,
88+
dimensions=dimensions,
89+
unit=StandardUnit.Count,
90+
value=1.0,
91+
timestamp=timestamp,
92+
)
8193

8294
def publish_duration_metric(
8395
self, timestamp: datetime.datetime, action: Action, milliseconds: float
8496
) -> None:
85-
dimensions = {
86-
"DimensionKeyActionType": action,
87-
"DimensionKeyResourceType": self.namespace,
88-
}
89-
90-
self._publish_metric(
91-
metric_name=MetricTypes.HandlerInvocationDuration,
92-
dimensions=dimensions,
93-
unit=StandardUnit.Milliseconds,
94-
value=milliseconds,
95-
timestamp=timestamp,
96-
)
97+
for publisher in self._publishers:
98+
dimensions = {
99+
"DimensionKeyActionType": action.name,
100+
"DimensionKeyResourceType": publisher.resource_type,
101+
}
102+
publisher.publish_metric(
103+
metric_name=MetricTypes.HandlerInvocationDuration,
104+
dimensions=dimensions,
105+
unit=StandardUnit.Milliseconds,
106+
value=milliseconds,
107+
timestamp=timestamp,
108+
)
109+
110+
def publish_log_delivery_exception_metric(
111+
self, timestamp: datetime.datetime, error: Any
112+
) -> None:
113+
for publisher in self._publishers:
114+
dimensions: Mapping[str, str] = {
115+
"DimensionKeyActionType": "ProviderLogDelivery",
116+
"DimensionKeyExceptionType": str(type(error)),
117+
"DimensionKeyResourceType": publisher.resource_type,
118+
}
119+
publisher.publish_metric(
120+
metric_name=MetricTypes.HandlerException,
121+
dimensions=dimensions,
122+
unit=StandardUnit.Count,
123+
value=1.0,
124+
timestamp=timestamp,
125+
)

src/cloudformation_cli_python_lib/resource.py

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
from datetime import datetime
34
from functools import wraps
45
from time import sleep
56
from typing import Any, Callable, MutableMapping, Optional, Tuple, Type, Union
@@ -18,6 +19,7 @@
1819
ProgressEvent,
1920
)
2021
from .log_delivery import ProviderLogHandler
22+
from .metrics import MetricPublisher, MetricsPublisherProxy
2123
from .scheduler import CloudWatchScheduler
2224
from .utils import (
2325
BaseResourceModel,
@@ -95,14 +97,8 @@ def schedule_reinvocation(
9597
# locally otherwise we re-invoke through CloudWatchEvents
9698
needed_ms_remaining = callback_delay_s * 1200 + INVOCATION_TIMEOUT_MS
9799
if callback_delay_s < 60 and remaining_ms > needed_ms_remaining:
98-
LOG.info(
99-
"Scheduling re-invoke locally after %s seconds, with Context {%s}",
100-
callback_delay_s,
101-
reinvoke_context,
102-
)
103100
sleep(callback_delay_s)
104101
return True
105-
LOG.info("Scheduling re-invoke with Context {%s}", reinvoke_context)
106102
callback_delay_min = int(callback_delay_s / 60)
107103
CloudWatchScheduler(boto3_session=session).reschedule_after_minutes(
108104
function_arn=context.invoked_function_arn,
@@ -175,8 +171,7 @@ def test_entrypoint(
175171
def _parse_request(
176172
self, event_data: MutableMapping[str, Any]
177173
) -> Tuple[
178-
Optional[SessionProxy],
179-
boto3.Session,
174+
Tuple[Optional[SessionProxy], Optional[boto3.Session], boto3.Session],
180175
BaseResourceHandlerRequest,
181176
Action,
182177
MutableMapping[str, Any],
@@ -185,6 +180,7 @@ def _parse_request(
185180
try:
186181
event = HandlerRequest.deserialize(event_data)
187182
caller_creds = event.requestData.callerCredentials
183+
provider_creds = event.requestData.providerCredentials
188184
platform_creds = event.requestData.platformCredentials
189185
request: BaseResourceHandlerRequest = UnmodelledRequest(
190186
clientRequestToken=event.bearerToken,
@@ -199,22 +195,45 @@ def _parse_request(
199195
aws_secret_access_key=platform_creds.secretAccessKey,
200196
aws_session_token=platform_creds.sessionToken,
201197
)
198+
provider_sess = None
199+
if provider_creds:
200+
provider_sess = boto3.Session(
201+
aws_access_key_id=provider_creds.accessKeyId,
202+
aws_secret_access_key=provider_creds.secretAccessKey,
203+
aws_session_token=provider_creds.sessionToken,
204+
)
202205
action = Action[event.action]
203206
callback_context = event.requestContext.get("callbackContext", {})
204-
Credentials(**event_data["requestData"]["platformCredentials"])
205207
except Exception as e: # pylint: disable=broad-except
206208
LOG.exception("Invalid request")
207209
raise InvalidRequest(f"{e} ({type(e).__name__})") from e
208-
return caller_sess, platform_sess, request, action, callback_context, event
210+
return (
211+
(caller_sess, provider_sess, platform_sess),
212+
request,
213+
action,
214+
callback_context,
215+
event,
216+
)
209217

210-
@_ensure_serialize
211-
def __call__(
218+
# TODO: refactor to reduce branching and locals
219+
@_ensure_serialize # noqa: C901
220+
def __call__( # pylint: disable=too-many-locals # noqa: C901
212221
self, event_data: MutableMapping[str, Any], context: LambdaContext
213222
) -> MutableMapping[str, Any]:
214223
try:
215224
ProviderLogHandler.setup(event_data)
216-
parsed = self._parse_request(event_data)
217-
caller_sess, platform_sess, request, action, callback, event = parsed
225+
sessions, request, action, callback, event = self._parse_request(event_data)
226+
caller_sess, provider_sess, platform_sess = sessions
227+
metrics = MetricsPublisherProxy()
228+
metrics.add_metrics_publisher(
229+
MetricPublisher(event.awsAccountId, event.resourceType, platform_sess)
230+
)
231+
if provider_sess:
232+
metrics.add_metrics_publisher(
233+
MetricPublisher(
234+
event.awsAccountId, event.resourceType, provider_sess
235+
)
236+
)
218237
# Acknowledge the task for first time invocation
219238
if not event.requestContext:
220239
report_progress(
@@ -235,7 +254,20 @@ def __call__(
235254
)
236255
invoke = True
237256
while invoke:
238-
progress = self._invoke_handler(caller_sess, request, action, callback)
257+
metrics.publish_invocation_metric(datetime.utcnow(), action)
258+
start_time = datetime.utcnow()
259+
error = None
260+
try:
261+
progress = self._invoke_handler(
262+
caller_sess, request, action, callback
263+
)
264+
except Exception as e: # pylint: disable=broad-except
265+
error = e
266+
m_secs = (datetime.utcnow() - start_time).total_seconds() * 1000.0
267+
metrics.publish_duration_metric(datetime.utcnow(), action, m_secs)
268+
if error:
269+
metrics.publish_exception_metric(datetime.utcnow(), action, error)
270+
raise error
239271
if progress.callbackContext:
240272
callback = progress.callbackContext
241273
event.requestContext["callbackContext"] = callback

0 commit comments

Comments
 (0)