Skip to content

Commit 02b2349

Browse files
authored
To support Step Functions customized log groups (DataDog#866)
* To support Step Functions customized log groups * Refine tests * Improve doc
1 parent 4fda68f commit 02b2349

8 files changed

+115
-15
lines changed

aws/logs_monitoring/customized_log_group.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ def is_lambda_customized_log_group(logstream_name):
2626
)
2727

2828

29+
# For both default and customzied Step Functions log groups, the log_stream starts with "states/"
30+
def is_step_functions_log_group(logstream_name):
31+
return logstream_name.startswith("states/")
32+
33+
2934
def get_lambda_function_name_from_logstream_name(logstream_name):
3035
try:
3136
# Not match the pattern for customized Lambda log group

aws/logs_monitoring/steps/enums.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ def __init__(self, string, event_source):
136136
RDS = ("/aws/rds", AwsEventSource.RDS)
137137
# e.g. sns/us-east-1/123456779121/SnsTopicX
138138
SNS = ("sns/", AwsEventSource.SNS)
139-
STEPFUNCTION = ("/aws/vendedlogs/states", AwsEventSource.STEPFUNCTION)
140139
TRANSITGATEWAY = ("tgw-attach", AwsEventSource.TRANSITGATEWAY)
141140

142141
def __str__(self):

aws/logs_monitoring/steps/handlers/awslogs_handler.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
)
1414
from customized_log_group import (
1515
is_lambda_customized_log_group,
16+
is_step_functions_log_group,
1617
get_lambda_function_name_from_logstream_name,
1718
)
1819
from steps.handlers.aws_attributes import AwsAttributes
@@ -102,11 +103,14 @@ def set_source(self, event):
102103
source = str(AwsEventSource.BEDROCK)
103104
self.metadata[DD_SOURCE] = parse_event_source(event, source)
104105

105-
# Special handling for customized log group of Lambda functions
106-
# Multiple Lambda functions can share one single customized log group
107-
# Need to parse logStream name to determine whether it is a Lambda function
106+
# Special handling for customized log group of Lambda Functions and Step Functions
107+
# Multiple functions can share one single customized log group. Need to parse logStream name to determine
108+
# Need to place the handling of customized log group at the bottom so that it can correct the source for some edge cases
108109
if is_lambda_customized_log_group(log_stream):
109110
self.metadata[DD_SOURCE] = str(AwsEventSource.LAMBDA)
111+
# Regardless of whether the log group is customized, the corresponding log stream starts with 'states/'."
112+
if is_step_functions_log_group(log_stream):
113+
self.metadata[DD_SOURCE] = str(AwsEventSource.STEPFUNCTION)
110114

111115
def add_cloudwatch_tags_from_cache(self):
112116
log_group_arn = self.aws_attributes.get_log_group_arn()
@@ -159,9 +163,6 @@ def handle_rds_source(self):
159163
)
160164

161165
def handle_step_function_source(self):
162-
if not self.aws_attributes.get_log_stream().startswith("states/"):
163-
return
164-
165166
state_machine_arn = self.get_state_machine_arn()
166167
if not state_machine_arn:
167168
return
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[
2+
{
3+
"aws": {
4+
"awslogs": {
5+
"logGroup": "test/logs",
6+
"logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9",
7+
"owner": "425362996713"
8+
}
9+
},
10+
"id": "37199773595581154154810589279545129148442535997644275712",
11+
"message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction2:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}",
12+
"timestamp": 1668095539607
13+
}
14+
]
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"ddsource": "stepfunction",
3+
"ddtags": "env:dev,test_tag_key:test_tag_value,dd_step_functions_trace_enabled:true",
4+
"host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2",
5+
"service": "stepfunction"
6+
}

aws/logs_monitoring/tests/test_awslogs_handler.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from approvaltests.approvals import verify_as_json
99
from approvaltests.namer import NamerFactory
1010

11+
from aws.logs_monitoring.steps.enums import AwsEventSource
12+
1113
sys.modules["trace_forwarder.connection"] = MagicMock()
1214
sys.modules["datadog_lambda.wrapper"] = MagicMock()
1315
sys.modules["datadog_lambda.metric"] = MagicMock()
@@ -22,6 +24,7 @@
2224
},
2325
)
2426
env_patch.start()
27+
from aws.logs_monitoring.settings import DD_HOST, DD_SOURCE
2528
from steps.handlers.awslogs_handler import AwsLogsHandler
2629
from steps.handlers.aws_attributes import AwsAttributes
2730
from caching.cache_layer import CacheLayer
@@ -117,6 +120,72 @@ def test_awslogs_handler_step_functions_tags_added_properly(
117120
awslogs_handler = AwsLogsHandler(context, metadata, cache_layer)
118121
verify_as_json(list(awslogs_handler.handle(event)))
119122
verify_as_json(metadata, options=NamerFactory.with_parameters("metadata"))
123+
# verify that the handling can properly handle SF logs with the default log group naming
124+
self.assertEqual(
125+
awslogs_handler.metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value
126+
)
127+
self.assertEqual(
128+
awslogs_handler.metadata[DD_HOST],
129+
"arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction1",
130+
)
131+
132+
@patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__")
133+
@patch("caching.cloudwatch_log_group_cache.send_forwarder_internal_metrics")
134+
@patch.dict("os.environ", {"DD_STEP_FUNCTIONS_TRACE_ENABLED": "true"})
135+
def test_awslogs_handler_step_functions_customized_log_group(
136+
self,
137+
mock_forward_metrics,
138+
mock_cache_init,
139+
):
140+
# SF customized log group
141+
eventFromCustomizedLogGroup = {
142+
"awslogs": {
143+
"data": base64.b64encode(
144+
gzip.compress(
145+
bytes(
146+
json.dumps(
147+
{
148+
"messageType": "DATA_MESSAGE",
149+
"owner": "425362996713",
150+
"logGroup": "test/logs",
151+
"logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9",
152+
"subscriptionFilters": ["testFilter"],
153+
"logEvents": [
154+
{
155+
"id": "37199773595581154154810589279545129148442535997644275712",
156+
"timestamp": 1668095539607,
157+
"message": '{"id": "1","type": "ExecutionStarted","details": {"input": "{}","inputDetails": {"truncated": "false"},"roleArn": "arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt"},"previous_event_id": "0","event_timestamp": "1716992192441","execution_arn": "arn:aws:states:us-east-1:12345678910:execution:StepFunction2:ccccccc-d1da-4c38-b32c-2b6b07d713fa","redrive_count": "0"}',
158+
}
159+
],
160+
}
161+
),
162+
"utf-8",
163+
)
164+
)
165+
)
166+
}
167+
}
168+
context = None
169+
metadata = {"ddtags": "env:dev"}
170+
mock_forward_metrics.side_effect = MagicMock()
171+
mock_cache_init.return_value = None
172+
cache_layer = CacheLayer("")
173+
cache_layer._step_functions_cache.get = MagicMock(
174+
return_value=["test_tag_key:test_tag_value"]
175+
)
176+
cache_layer._cloudwatch_log_group_cache.get = MagicMock()
177+
178+
awslogs_handler = AwsLogsHandler(context, metadata, cache_layer)
179+
# for some reasons, the below two are needed to update the context of the handler
180+
verify_as_json(list(awslogs_handler.handle(eventFromCustomizedLogGroup)))
181+
verify_as_json(metadata, options=NamerFactory.with_parameters("metadata"))
182+
self.assertEqual(
183+
awslogs_handler.metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value
184+
)
185+
self.assertEqual(
186+
awslogs_handler.metadata[DD_HOST],
187+
"arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2",
188+
)
120189

121190
def test_process_lambda_logs(self):
122191
# Non Lambda log

aws/logs_monitoring/tests/test_customized_log_group.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from customized_log_group import (
33
is_lambda_customized_log_group,
44
get_lambda_function_name_from_logstream_name,
5+
is_step_functions_log_group,
56
)
67

78

@@ -58,3 +59,16 @@ def get_lambda_function_name_from_logstream_name(self):
5859
get_lambda_function_name_from_logstream_name(stepfunction_log_stream_name),
5960
None,
6061
)
62+
63+
def test_is_step_functions_log_group(self):
64+
# Lambda logstream is false
65+
lambda_log_stream_name = "2023/11/04/[$LATEST]4426346c2cdf4c54a74d3bd2b929fc44"
66+
self.assertEqual(is_step_functions_log_group(lambda_log_stream_name), False)
67+
68+
# SF logstream is true
69+
step_functions_log_stream_name = (
70+
"states/selfmonit-statemachine/2024-11-04-15-30/00000000"
71+
)
72+
self.assertEqual(
73+
is_step_functions_log_group(step_functions_log_stream_name), True
74+
)

aws/logs_monitoring/tests/test_parsing.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -295,14 +295,6 @@ def test_carbon_black_event(self):
295295
str(AwsEventSource.CARBONBLACK),
296296
)
297297

298-
def test_step_function_event(self):
299-
self.assertEqual(
300-
parse_event_source(
301-
{"awslogs": "logs"}, "/aws/vendedlogs/states/MyStateMachine-Logs"
302-
),
303-
str(AwsEventSource.STEPFUNCTION),
304-
)
305-
306298
def test_cloudwatch_source_if_none_found(self):
307299
self.assertEqual(
308300
parse_event_source({"awslogs": "logs"}, ""), str(AwsEventSource.CLOUDWATCH)

0 commit comments

Comments
 (0)