Skip to content

Commit 4672754

Browse files
Use Data Streams events to send Kafka messages to Datadog (#22208)
* Use Data Streams events to send Kafka messages to Datadog * simplify e2e test
1 parent b8d7eb1 commit 4672754

File tree

4 files changed

+57
-122
lines changed

4 files changed

+57
-122
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
kafka_actions: Send Kafka messages to backend as data streams events instead of regular Datadog events

kafka_actions/datadog_checks/kafka_actions/check.py

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ def _emit_action_event(
144144
):
145145
"""Emit an event for action success or failure.
146146
147+
Events are sent in duplicate:
148+
1. As a standard Datadog event
149+
2. As a data-streams-message event
150+
147151
Args:
148152
success: Whether the action succeeded
149153
action: Action name
@@ -155,8 +159,10 @@ def _emit_action_event(
155159
alert_type = 'success' if success else 'error'
156160

157161
payload = {
162+
'message_timestamp': int(time.time() * 1000),
158163
'action': action,
159164
'remote_config_id': self.remote_config_id,
165+
'kafka_cluster_id': cluster,
160166
'status': 'success' if success else 'failure',
161167
'message': message,
162168
}
@@ -171,6 +177,7 @@ def _emit_action_event(
171177

172178
tags = self._get_tags() + [f'kafka_cluster_id:{cluster}']
173179

180+
# Send as standard Datadog event (backwards compatibility; moving to Data Streams intake-only)
174181
event_payload = {
175182
'timestamp': int(time.time()),
176183
'event_type': f'kafka_action_{event_type}',
@@ -180,10 +187,15 @@ def _emit_action_event(
180187
'source_type_name': 'kafka',
181188
'aggregation_key': f'kafka_action_{action}_{self.remote_config_id}',
182189
'tags': tags,
190+
'remote_config_id': self.remote_config_id,
191+
'kafka_cluster_id': cluster,
183192
}
184193

185194
self.event(event_payload)
186195

196+
# Send same payload to Data Streams track
197+
self.event_platform_event(event_text, "data-streams-message")
198+
187199
# =========================================================================
188200
# Action Handlers (RFC-Compliant)
189201
# =========================================================================
@@ -486,57 +498,32 @@ def _parse_literal(self, value_str: str):
486498
return value_str
487499

488500
def _emit_message_event_deserialized(self, deserialized_msg: DeserializedMessage, cluster: str):
489-
"""Emit a deserialized Kafka message as a Datadog event.
501+
"""Emit a deserialized Kafka message to data-streams-message track.
490502
491503
Args:
492504
deserialized_msg: DeserializedMessage object
493505
cluster: Kafka cluster identifier
494506
"""
495-
msg_dict = deserialized_msg.to_dict()
496-
497507
event_data = {
508+
'message_timestamp': int(time.time() * 1000),
509+
'remote_config_id': self.remote_config_id,
510+
'kafka_cluster_id': cluster,
498511
'topic': deserialized_msg.topic,
499512
'partition': deserialized_msg.partition,
500513
'offset': deserialized_msg.offset,
501-
'key': msg_dict.get('key'),
502-
'value': msg_dict.get('value'),
514+
'key': deserialized_msg.key,
515+
'value': deserialized_msg.value,
503516
}
504517

505-
if msg_dict.get('headers'):
506-
event_data['headers'] = msg_dict['headers']
518+
if deserialized_msg.headers:
519+
event_data['headers'] = deserialized_msg.headers
507520

508521
if deserialized_msg.value_schema_id:
509522
event_data['value_schema_id'] = deserialized_msg.value_schema_id
510523
if deserialized_msg.key_schema_id:
511524
event_data['key_schema_id'] = deserialized_msg.key_schema_id
512525

513-
event_text = json.dumps(event_data, indent=2)
514-
515-
event_tags = self._get_tags() + [
516-
f'kafka_cluster_id:{cluster}',
517-
f'topic:{deserialized_msg.topic}',
518-
f'partition:{deserialized_msg.partition}',
519-
f'offset:{deserialized_msg.offset}',
520-
]
521-
522-
event_title = f'Kafka Message: {deserialized_msg.topic}'
523-
524-
agg_key = (
525-
f'kafka_{deserialized_msg.topic}_{deserialized_msg.partition}_'
526-
f'{deserialized_msg.offset}_{self.remote_config_id}'
527-
)
528-
529-
event_payload = {
530-
'timestamp': int(time.time()),
531-
'event_type': 'kafka_message',
532-
'msg_title': event_title,
533-
'msg_text': event_text,
534-
'tags': event_tags,
535-
'source_type_name': 'kafka',
536-
'aggregation_key': agg_key,
537-
}
538-
539-
self.event(event_payload)
526+
self.event_platform_event(json.dumps(event_data), "data-streams-message")
540527

541528
def _format_for_display(self, data) -> str:
542529
"""Format data for display in event.

kafka_actions/tests/test_e2e.py

Lines changed: 14 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -13,97 +13,38 @@
1313

1414

1515
def test_e2e_read_messages(dd_environment, kafka_instance, check, dd_run_check, aggregator):
16-
"""Test reading messages from Kafka."""
16+
"""Test end-to-end integration: check can connect to real Kafka and emit events correctly."""
1717
if not e2e_testing():
1818
pytest.skip("E2E tests require dd_environment fixture")
1919

2020
# Verify cluster is available
2121
cluster_id = common.get_cluster_id()
2222
assert cluster_id is not None, "Kafka cluster is not available"
2323

24-
# Ensure the instance has the cluster ID set
2524
kafka_instance['read_messages']['cluster'] = cluster_id
2625

2726
# Run the check
2827
check_instance = check(kafka_instance)
2928
dd_run_check(check_instance)
3029

31-
# Verify events were emitted
32-
events = aggregator.events
33-
assert len(events) > 0, f"Expected at least one event. Got {len(events)} events."
34-
35-
# Find the action success event
36-
action_events = [e for e in events if 'kafka_action_' in e.get('event_type', '')]
30+
# Verify standard Datadog event was emitted
31+
action_events = [e for e in aggregator.events if 'kafka_action_' in e.get('event_type', '')]
3732
assert len(action_events) == 1, f"Expected 1 action event, got {len(action_events)}"
33+
assert action_events[0]['event_type'] == 'kafka_action_success'
3834

39-
action_event = action_events[0]
40-
assert action_event['event_type'] == 'kafka_action_success'
41-
assert 'kafka_cluster_id' in [tag.split(':')[0] for tag in action_event['tags']]
42-
assert 'remote_config_id:test-rc-id' in action_event['tags']
43-
44-
# Parse the msg_text as JSON
45-
event_data = json.loads(action_event['msg_text'])
46-
assert event_data['action'] == 'read_messages'
47-
assert event_data['status'] == 'success'
48-
assert 'stats' in event_data
49-
stats = event_data['stats']
50-
assert stats['messages_scanned'] > 0
51-
assert stats['messages_sent'] > 0
35+
# Verify events sent to data-streams-message track
36+
data_streams_events = aggregator.get_event_platform_events("data-streams-message")
37+
action_ds_events = [e for e in data_streams_events if 'action' in e]
38+
message_events = [e for e in data_streams_events if 'topic' in e]
5239

53-
# Check that message events were emitted
54-
message_events = [e for e in events if e.get('event_type') == 'kafka_message']
40+
# Verify both action and message events were sent
41+
assert len(action_ds_events) == 1, f"Expected 1 action event in data streams, got {len(action_ds_events)}"
5542
assert len(message_events) > 0, "Expected at least one Kafka message event"
56-
assert len(message_events) == stats['messages_sent'], "Message event count should match stats"
57-
58-
# Verify message event structure
59-
msg_event = message_events[0]
60-
assert msg_event['source_type_name'] == 'kafka'
61-
assert 'kafka_cluster_id' in [tag.split(':')[0] for tag in msg_event['tags']]
62-
assert 'remote_config_id:test-rc-id' in msg_event['tags']
63-
64-
# Parse message event data
65-
msg_data = json.loads(msg_event['msg_text'])
66-
assert 'topic' in msg_data
67-
assert 'partition' in msg_data
68-
assert 'offset' in msg_data
69-
assert 'key' in msg_data
70-
assert 'value' in msg_data
71-
assert msg_data['topic'] == 'test-topic'
7243

73-
# Find the action success event
74-
action_events = [e for e in events if 'kafka_action_' in e.get('event_type', '')]
75-
assert len(action_events) == 1, f"Expected 1 action event, got {len(action_events)}"
76-
77-
action_event = action_events[0]
78-
assert action_event['event_type'] == 'kafka_action_success'
79-
assert 'kafka_cluster_id' in [tag.split(':')[0] for tag in action_event['tags']]
80-
assert 'remote_config_id:test-rc-id' in action_event['tags']
44+
# Verify action event payload is consistent between both tracks
45+
event_data = json.loads(action_events[0]['msg_text'])
46+
assert action_ds_events[0] == event_data, "Action event should have same payload in both tracks"
8147

82-
# Parse the msg_text as JSON
83-
event_data = json.loads(action_event['msg_text'])
84-
assert event_data['action'] == 'read_messages'
85-
assert event_data['status'] == 'success'
86-
assert 'stats' in event_data
48+
# Verify message count matches stats
8749
stats = event_data['stats']
88-
assert stats['messages_scanned'] > 0
89-
assert stats['messages_sent'] > 0
90-
91-
# Check that message events were emitted
92-
message_events = [e for e in events if e.get('event_type') == 'kafka_message']
93-
assert len(message_events) > 0, "Expected at least one Kafka message event"
9450
assert len(message_events) == stats['messages_sent'], "Message event count should match stats"
95-
96-
# Verify message event structure
97-
msg_event = message_events[0]
98-
assert msg_event['source_type_name'] == 'kafka'
99-
assert 'kafka_cluster_id' in [tag.split(':')[0] for tag in msg_event['tags']]
100-
assert 'remote_config_id:test-rc-id' in msg_event['tags']
101-
102-
# Parse message event data
103-
msg_data = json.loads(msg_event['msg_text'])
104-
assert 'topic' in msg_data
105-
assert 'partition' in msg_data
106-
assert 'offset' in msg_data
107-
assert 'key' in msg_data
108-
assert 'value' in msg_data
109-
assert msg_data['topic'] == 'test-topic'

kafka_actions/tests/test_unit.py

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,21 @@ def test_read_messages_with_filter(self, aggregator, dd_run_check):
103103
assert call_kwargs['start_offset'] == -1
104104
assert call_kwargs['max_messages'] == 100
105105

106-
events = [e for e in aggregator.events if e.get('event_type') == 'kafka_message']
107-
assert len(events) == 2, f"Expected 2 message events (filtered), got {len(events)}"
106+
all_events = aggregator.get_event_platform_events("data-streams-message")
107+
# Filter for message events only (those with 'topic' field)
108+
message_events = [e for e in all_events if 'topic' in e]
109+
assert len(message_events) == 2, f"Expected 2 message events (filtered), got {len(message_events)}"
108110

109-
event1 = events[0]
110-
assert 'test-topic' in event1['msg_title']
111-
assert '"offset": 100' in event1['msg_text']
112-
assert '"id": 1' in event1['msg_text']
113-
assert '"status": "active"' in event1['msg_text']
111+
event1 = message_events[0]
112+
assert event1['topic'] == 'test-topic'
113+
assert event1['offset'] == 100
114+
assert event1['value']['id'] == 1
115+
assert event1['value']['status'] == 'active'
116+
assert event1['remote_config_id'] == 'test-read-messages-001'
114117

115-
event2 = events[1]
116-
assert '"offset": 102' in event2['msg_text']
117-
assert '"id": 3' in event2['msg_text']
118+
event2 = message_events[1]
119+
assert event2['offset'] == 102
120+
assert event2['value']['id'] == 3
118121

119122

120123
class TestCreateTopicAction:
@@ -370,11 +373,14 @@ def test_read_messages_nested_field_filter(self, aggregator, dd_run_check):
370373

371374
mock_consume.assert_called_once()
372375

373-
events = [e for e in aggregator.events if e.get('event_type') == 'kafka_message']
374-
assert len(events) == 1, f"Expected 1 message event (filtered), got {len(events)}"
375-
assert '"order_id": 1' in events[0]['msg_text']
376-
assert '"country": "US"' in events[0]['msg_text']
377-
assert '"tier": "gold"' in events[0]['msg_text']
376+
all_events = aggregator.get_event_platform_events("data-streams-message")
377+
# Filter for message events only (those with 'topic' field)
378+
message_events = [e for e in all_events if 'topic' in e]
379+
assert len(message_events) == 1, f"Expected 1 message event (filtered), got {len(message_events)}"
380+
assert message_events[0]['value']['order_id'] == 1
381+
assert message_events[0]['value']['user']['country'] == 'US'
382+
assert message_events[0]['value']['user']['tier'] == 'gold'
383+
assert message_events[0]['remote_config_id'] == 'test-nested-filter-001'
378384

379385

380386
if __name__ == '__main__':

0 commit comments

Comments
 (0)