Skip to content

Commit e3014e5

Browse files
authored
Create shared schemas collector for DBM integrations (#21720)
* Create shared schemas collector for DBM integrations * WIP * WIP * Changelog * Warning * Remove unused * Lint * AI Fixes * Fix * Feedback * Feedback * Lint * Refactor health * Fix * WIP * Revert "WIP" This reverts commit aa5b86b. * Comment
1 parent b9e94e3 commit e3014e5

File tree

5 files changed

+330
-0
lines changed

5 files changed

+330
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Create shared schemas collector for the Postgres, MySQL, and SQL Server integrations

datadog_checks_base/datadog_checks/base/checks/db.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# All rights reserved
33
# Licensed under a 3-clause BSD style license (see LICENSE)
44

5+
from abc import abstractmethod
6+
57
from . import AgentCheck
68

79

@@ -20,3 +22,32 @@ def database_monitoring_metadata(self, raw_event: str):
2022

2123
def database_monitoring_health(self, raw_event: str):
2224
self.event_platform_event(raw_event, "dbm-health")
25+
26+
@property
27+
@abstractmethod
28+
def reported_hostname(self) -> str | None:
29+
pass
30+
31+
@property
32+
@abstractmethod
33+
def database_identifier(self) -> str:
34+
pass
35+
36+
@property
37+
def dbms(self) -> str:
38+
return self.__class__.__name__.lower()
39+
40+
@property
41+
@abstractmethod
42+
def dbms_version(self) -> str:
43+
pass
44+
45+
@property
46+
@abstractmethod
47+
def tags(self) -> list[str]:
48+
pass
49+
50+
@property
51+
@abstractmethod
52+
def cloud_metadata(self) -> dict:
53+
pass
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
# (C) Datadog, Inc. 2025-present
2+
# All rights reserved
3+
# Licensed under a 3-clause BSD style license (see LICENSE)
4+
5+
from __future__ import annotations
6+
7+
from abc import ABC, abstractmethod
8+
from typing import TYPE_CHECKING, TypedDict
9+
10+
from datadog_checks.base.utils.serialization import json
11+
12+
from .utils import now_ms
13+
14+
if TYPE_CHECKING:
15+
from datadog_checks.base.checks.db import DatabaseCheck
16+
17+
try:
18+
import datadog_agent # type: ignore
19+
except ImportError:
20+
from datadog_checks.base.stubs import datadog_agent
21+
22+
23+
class DatabaseInfo(TypedDict):
24+
name: str
25+
26+
27+
# The schema collector sends lists of DatabaseObjects to the agent
28+
# DBMS subclasses may add additional fields to the dictionary
29+
class DatabaseObject(TypedDict):
30+
name: str
31+
32+
33+
# Common configuration for schema collector
34+
# Individual DBMS implementations should map their specific
35+
# configuration to this type
36+
class SchemaCollectorConfig:
37+
def __init__(self):
38+
self.collection_interval = 3600
39+
self.payload_chunk_size = 10_000
40+
41+
42+
class SchemaCollector(ABC):
43+
"""
44+
Abstract base class for DBM schema collectors.
45+
"""
46+
47+
def __init__(self, check: DatabaseCheck, config: SchemaCollectorConfig):
48+
self._check = check
49+
self._log = check.log
50+
self._config = config
51+
self._reset()
52+
53+
def _reset(self):
54+
# Timestamp in whole milliseconds when the current collection started.
55+
self._collection_started_at = None
56+
self._collection_payloads_count = 0
57+
self._queued_rows = []
58+
self._total_rows_count = 0
59+
60+
def collect_schemas(self) -> bool:
61+
"""
62+
Collects and submits all applicable schema metadata to the agent.
63+
This class relies on the owning check to handle scheduling this method.
64+
"""
65+
status = "success"
66+
try:
67+
self._collection_started_at = now_ms()
68+
databases = self._get_databases()
69+
self._log.debug("Collecting schemas for %d databases", len(databases))
70+
for database in databases:
71+
self._log.debug("Starting collection of schemas for database %s", database['name'])
72+
database_name = database['name']
73+
if not database_name:
74+
self._log.warning("database has no name %v", database)
75+
continue
76+
with self._get_cursor(database_name) as cursor:
77+
# Get the next row from the cursor
78+
# We need to know when we've reached the last row so we can efficiently flush the last payload
79+
# without an empty final payload
80+
next_row = self._get_next(cursor)
81+
while next_row:
82+
self._queued_rows.append(self._map_row(database, next_row))
83+
self._total_rows_count += 1
84+
# Because we're iterating over a cursor we need to try to get
85+
# the next row to see if we've reached the last row
86+
next_row = self._get_next(cursor)
87+
is_last_payload = database == databases[-1] and next_row is None
88+
self.maybe_flush(is_last_payload)
89+
self._log.debug("Completed collection of schemas for database %s", database_name)
90+
except Exception as e:
91+
status = "error"
92+
self._log.error("Error collecting schema: %s", e)
93+
raise e
94+
finally:
95+
self._check.histogram(
96+
f"dd.{self._check.dbms}.schema.time",
97+
now_ms() - self._collection_started_at,
98+
tags=self._check.tags + ["status:" + status],
99+
hostname=self._check.reported_hostname,
100+
raw=True,
101+
)
102+
self._check.gauge(
103+
f"dd.{self._check.dbms}.schema.tables_count",
104+
self._total_rows_count,
105+
tags=self._check.tags + ["status:" + status],
106+
hostname=self._check.reported_hostname,
107+
raw=True,
108+
)
109+
self._check.gauge(
110+
f"dd.{self._check.dbms}.schema.payloads_count",
111+
self._collection_payloads_count,
112+
tags=self._check.tags + ["status:" + status],
113+
hostname=self._check.reported_hostname,
114+
raw=True,
115+
)
116+
117+
self._reset()
118+
return True
119+
120+
@property
121+
def base_event(self):
122+
return {
123+
"host": self._check.reported_hostname,
124+
"database_instance": self._check.database_identifier,
125+
"kind": self.kind,
126+
"agent_version": datadog_agent.get_version(),
127+
"collection_interval": self._config.collection_interval,
128+
"dbms": self._check.dbms,
129+
"dbms_version": str(self._check.dbms_version),
130+
"tags": self._check.tags,
131+
"cloud_metadata": self._check.cloud_metadata,
132+
"collection_started_at": self._collection_started_at,
133+
}
134+
135+
def maybe_flush(self, is_last_payload):
136+
if is_last_payload or len(self._queued_rows) >= self._config.payload_chunk_size:
137+
event = self.base_event
138+
event["timestamp"] = now_ms()
139+
# DBM backend expects metadata to be an array of database objects
140+
event["metadata"] = self._queued_rows
141+
self._collection_payloads_count += 1
142+
if is_last_payload:
143+
# For the last payload, we need to include the total number of payloads collected
144+
# This is used for snapshotting to ensure that all payloads have been received
145+
event["collection_payloads_count"] = self._collection_payloads_count
146+
self._check.database_monitoring_metadata(json.dumps(event))
147+
148+
self._queued_rows = []
149+
150+
@property
151+
@abstractmethod
152+
def kind(self) -> str:
153+
"""
154+
Returns the kind property of the schema metadata event.
155+
Subclasses should override this property to return the kind of schema being collected.
156+
"""
157+
raise NotImplementedError("Subclasses must implement kind")
158+
159+
@abstractmethod
160+
def _get_databases(self) -> list[DatabaseInfo]:
161+
"""
162+
Returns a list of database dictionaries.
163+
Subclasses should override this method to return the list of databases to collect schema metadata for.
164+
"""
165+
raise NotImplementedError("Subclasses must implement _get_databases")
166+
167+
@abstractmethod
168+
def _get_cursor(self, database):
169+
"""
170+
Returns a cursor for the given database.
171+
Subclasses should override this method to return the cursor for the given database.
172+
"""
173+
raise NotImplementedError("Subclasses must implement _get_cursor")
174+
175+
@abstractmethod
176+
def _get_next(self, cursor):
177+
"""
178+
Returns the next row from the cursor.
179+
Subclasses should override this method to return the next row from the cursor.
180+
"""
181+
raise NotImplementedError("Subclasses must implement _get_next")
182+
183+
def _map_row(self, database: DatabaseInfo, _cursor_row) -> DatabaseObject:
184+
"""
185+
Maps a cursor row to a dict that matches the schema expected by DBM.
186+
The base implementation of this method returns just the database dictionary.
187+
Subclasses should override this method to add schema and table data based on the cursor row.
188+
"""
189+
return {**database}

datadog_checks_base/datadog_checks/base/utils/db/utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,3 +647,10 @@ def get_tags(self) -> List[str]:
647647
# Generate and cache regular tags
648648
self._cached_tag_list = self._generate_tag_strings(self._tags)
649649
return list(self._cached_tag_list)
650+
651+
652+
def now_ms() -> int:
653+
"""
654+
Get the current time in whole milliseconds.
655+
"""
656+
return int(time.time() * 1000)
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# (C) Datadog, Inc. 2023-present
2+
# All rights reserved
3+
# Licensed under a 3-clause BSD style license (see LICENSE)
4+
from contextlib import contextmanager
5+
6+
import pytest
7+
8+
from datadog_checks.base.checks.db import DatabaseCheck
9+
from datadog_checks.base.utils.db.schemas import SchemaCollector, SchemaCollectorConfig
10+
11+
try:
12+
import datadog_agent # type: ignore
13+
except ImportError:
14+
from datadog_checks.base.stubs import datadog_agent
15+
16+
17+
class TestDatabaseCheck(DatabaseCheck):
18+
__test__ = False
19+
20+
def __init__(self):
21+
super().__init__()
22+
self._reported_hostname = "test_hostname"
23+
self._database_identifier = "test_database_identifier"
24+
self._dbms_version = "test_dbms_version"
25+
self._agent_version = "test_agent_version"
26+
self._tags = ["test_tag"]
27+
self._cloud_metadata = {"test_cloud_metadata": "test_cloud_metadata"}
28+
29+
@property
30+
def reported_hostname(self):
31+
return self._reported_hostname
32+
33+
@property
34+
def database_identifier(self):
35+
return self._database_identifier
36+
37+
@property
38+
def dbms_version(self):
39+
return self._dbms_version
40+
41+
@property
42+
def agent_version(self):
43+
return self._agent_version
44+
45+
@property
46+
def tags(self):
47+
return self._tags
48+
49+
@property
50+
def cloud_metadata(self):
51+
return self._cloud_metadata
52+
53+
54+
class TestSchemaCollector(SchemaCollector):
55+
__test__ = False
56+
57+
def __init__(self, check: DatabaseCheck, config: SchemaCollectorConfig):
58+
super().__init__(check, config)
59+
self._row_index = 0
60+
self._rows = [{'table_name': 'test_table'}]
61+
62+
def _get_databases(self):
63+
return [{'name': 'test_database'}]
64+
65+
@contextmanager
66+
def _get_cursor(self, database: str):
67+
yield {}
68+
69+
def _get_next(self, _cursor):
70+
if self._row_index < len(self._rows):
71+
row = self._rows[self._row_index]
72+
self._row_index += 1
73+
return row
74+
return None
75+
76+
def _map_row(self, database: str, cursor_row: dict):
77+
return {**database, "tables": [cursor_row]}
78+
79+
@property
80+
def kind(self):
81+
return "test_databases"
82+
83+
84+
@pytest.mark.unit
85+
def test_schema_collector(aggregator):
86+
check = TestDatabaseCheck()
87+
collector = TestSchemaCollector(check, SchemaCollectorConfig())
88+
collector.collect_schemas()
89+
90+
events = aggregator.get_event_platform_events("dbm-metadata")
91+
assert len(events) == 1
92+
event = events[0]
93+
assert event['kind'] == collector.kind
94+
assert event['host'] == check.reported_hostname
95+
assert event['database_instance'] == check.database_identifier
96+
assert event['agent_version'] == datadog_agent.get_version()
97+
assert event['collection_interval'] == collector._config.collection_interval
98+
assert event['dbms_version'] == check.dbms_version
99+
assert event['tags'] == check.tags
100+
assert event['cloud_metadata'] == check.cloud_metadata
101+
assert event['metadata'][0]['name'] == 'test_database'
102+
assert event['metadata'][0]['tables'][0]['table_name'] == 'test_table'

0 commit comments

Comments
 (0)