Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion clickhouse/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,22 @@ files:
type: string
display_default: "$server:$port:$db"
example: "$env-$server:$port:$db"
- name: cluster_name
description: |
The name of the ClickHouse cluster as shown in system.clusters.
When set, the agent connects to one node but collects metrics and samples
from all nodes in the cluster using clusterAllReplicas().
Use this for self-hosted ClickHouse clusters.
To find your cluster name run: SELECT DISTINCT cluster FROM system.clusters
For ClickHouse Cloud managed service, use single_endpoint_mode instead.
value:
type: string
display_default: null
example: my_cluster
- name: single_endpoint_mode
description: Set to `true` when connecting through a single endpoint for ClickHouse Cloud.
description: |
Set to `true` when connecting through a single endpoint for ClickHouse Cloud.
For self-hosted clusters, use cluster_name instead.
value:
type: boolean
example: false
Expand Down
1 change: 1 addition & 0 deletions clickhouse/changelog.d/22970.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add per-node hostname attribution for ClickHouse single-endpoint cluster monitoring
30 changes: 9 additions & 21 deletions clickhouse/datadog_checks/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ def _add_core_tags(self):
self.tag_manager.set_tag("port", str(self._config.port), replace=True)
self.tag_manager.set_tag("db", self._config.db, replace=True)
self.tag_manager.set_tag("database_hostname", self.reported_hostname, replace=True)
if self._config.cluster_name:
self.tag_manager.set_tag("clickhouse_cluster", self._config.cluster_name, replace=True)
self.tag_manager.set_tag("database_instance", self.database_identifier, replace=True)

def validate_config(self):
Expand Down Expand Up @@ -326,32 +328,18 @@ def is_single_endpoint_mode(self):
"""
return self._config.single_endpoint_mode

def get_system_table(self, table_name):
def get_system_table(self, table_name: str) -> str:
"""
Get the appropriate system table reference based on deployment type.

For single endpoint mode: Returns clusterAllReplicas('default', system.<table>)
For cluster mode (cluster_name set): Returns clusterAllReplicas('<cluster>', system.<table>)
For single endpoint mode (ClickHouse Cloud): Returns clusterAllReplicas('default', system.<table>)
For direct connection: Returns system.<table>

Args:
table_name: The system table name (e.g., 'query_log', 'processes')

Returns:
str: The table reference to use in SQL queries

Example:
>>> self.get_system_table('query_log')
"clusterAllReplicas('default', system.query_log)" # Single endpoint mode
>>> self.get_system_table('query_log')
"system.query_log" # Direct connection
"""
if self._config.single_endpoint_mode:
# Single endpoint mode: Use clusterAllReplicas to query all nodes
# The cluster name is 'default' for ClickHouse Cloud and most setups
return f"clusterAllReplicas('default', system.{table_name})"
else:
# Direct connection: Query the local system table directly
return f"system.{table_name}"
cluster = self._config.cluster_name or ('default' if self._config.single_endpoint_mode else None)
if cluster:
return f"clusterAllReplicas('{cluster}', system.{table_name})"
return f"system.{table_name}"

def ping_clickhouse(self):
return self._client.ping()
Expand Down
1 change: 1 addition & 0 deletions clickhouse/datadog_checks/clickhouse/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def _apply_features(config: InstanceConfig, validation_result: ValidationResult)
None if config.dbm else "Requires `dbm: true`",
)
validation_result.add_feature(FeatureKey.SINGLE_ENDPOINT_MODE, config.single_endpoint_mode)
validation_result.add_feature(FeatureKey.CLUSTER_MODE, bool(config.cluster_name))


def _safefloat(value) -> float:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class InstanceConfig(BaseModel):
arbitrary_types_allowed=True,
frozen=True,
)
cluster_name: Optional[str] = None
compression: Optional[str] = None
connect_timeout: Optional[int] = None
custom_queries: Optional[tuple[CustomQuery, ...]] = None
Expand Down
11 changes: 11 additions & 0 deletions clickhouse/datadog_checks/clickhouse/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,19 @@ instances:
#
# template: $env-$server:$port:$db

## @param cluster_name - string - optional
## The name of the ClickHouse cluster as shown in system.clusters.
## When set, the agent connects to one node but collects metrics and samples
## from all nodes in the cluster using clusterAllReplicas().
## Use this for self-hosted ClickHouse clusters.
## To find your cluster name run: SELECT DISTINCT cluster FROM system.clusters
## For ClickHouse Cloud managed service, use single_endpoint_mode instead.
#
# cluster_name: my_cluster

## @param single_endpoint_mode - boolean - optional - default: false
## Set to `true` when connecting through a single endpoint for ClickHouse Cloud.
## For self-hosted clusters, use cluster_name instead.
#
# single_endpoint_mode: false

Expand Down
2 changes: 2 additions & 0 deletions clickhouse/datadog_checks/clickhouse/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class FeatureKey(Enum):
QUERY_SAMPLES = "query_samples"
QUERY_COMPLETIONS = "query_completions"
SINGLE_ENDPOINT_MODE = "single_endpoint_mode"
CLUSTER_MODE = "cluster_mode"


FeatureNames = {
Expand All @@ -30,6 +31,7 @@ class FeatureKey(Enum):
FeatureKey.QUERY_SAMPLES: 'Query Samples',
FeatureKey.QUERY_COMPLETIONS: 'Query Completions',
FeatureKey.SINGLE_ENDPOINT_MODE: 'Single Endpoint Mode',
FeatureKey.CLUSTER_MODE: 'Cluster Mode',
}


Expand Down
2 changes: 2 additions & 0 deletions clickhouse/datadog_checks/clickhouse/query_completions.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def _collect_completed_queries(self):

row_dict = {
'normalized_query_hash': str(normalized_query_hash),
'hostname': str(server_node) if server_node else None,
'query': str(query_text) if query_text else '',
'user': str(user) if user else '',
'query_type': str(query_type) if query_type else '',
Expand Down Expand Up @@ -308,6 +309,7 @@ def _create_batched_payload(self, rows):
'event_time_microseconds': row.get('event_time_microseconds', 0),
'initial_query_id': row.get('initial_query_id', ''),
'is_initial_query': row.get('is_initial_query', True),
'hostname': row.get('hostname'),
'metadata': {
'tables': row.get('dd_tables'),
'commands': row.get('dd_commands'),
Expand Down
5 changes: 4 additions & 1 deletion clickhouse/datadog_checks/clickhouse/statement_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
port,
client_hostname,
is_cancelled,
http_user_agent
http_user_agent,
hostName() as server_node
FROM {processes_table}
WHERE query NOT LIKE '%system.processes%'
AND query NOT LIKE '%system.query_log%'
Expand Down Expand Up @@ -239,6 +240,7 @@ def _normalize_row(self, row):
client_hostname,
is_cancelled,
http_user_agent,
server_node,
) = row

normalized_row = {
Expand Down Expand Up @@ -268,6 +270,7 @@ def _normalize_row(self, row):
'client_hostname': str(client_hostname) if client_hostname else None,
'is_cancelled': bool(is_cancelled) if is_cancelled is not None else False,
'http_user_agent': str(http_user_agent) if http_user_agent else None,
'server_node': str(server_node) if server_node else None,
}

return self._obfuscate_query(normalized_row)
Expand Down
7 changes: 6 additions & 1 deletion clickhouse/datadog_checks/clickhouse/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ def _load_query_log_statements(self):

result_row = {
'normalized_query_hash': str(normalized_query_hash),
'server_node': str(server_node) if server_node else '',

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Remove stale server_node from merged statement metrics

This row now carries server_node, but ClickhouseStatementMetrics._merge_rows_across_nodes still collapses multiple node rows into one row per normalized_query_hash by summing metrics across all nodes. The merged row therefore keeps a single node label (from the max-count row) while count, total_time, read/write bytes, etc. represent cluster-wide totals, which misattributes data whenever the same query runs on more than one node.

Useful? React with 👍 / 👎.

'query': str(query_text) if query_text else '',
'user': str(query_user) if query_user else '',
'query_type': str(query_type) if query_type else '',
Expand Down Expand Up @@ -313,13 +314,17 @@ def _merge_rows_across_nodes(node_rows):
"""
Merge per-(query, node) rows into per-query rows.

Rows are grouped by (normalized_query_hash, server_node) so that the same
query running on different nodes produces separate merged rows, preserving
per-node attribution in cluster mode.

Summable metrics (count, total_time, …) are summed.
peak_memory_usage takes the max.
Non-metric fields are taken from the node with the highest execution count.
"""
groups = {}
for row in node_rows:
key = row['normalized_query_hash']
key = (row['normalized_query_hash'], row.get('server_node', ''))
if key not in groups:
groups[key] = []
groups[key].append(row)
Expand Down
6 changes: 5 additions & 1 deletion clickhouse/tests/test_statement_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def test_normalize_row_with_all_fields(check_with_dbm):
"""Test that all fields are properly normalized from system.processes"""
samples = check_with_dbm.statement_samples

# Create a mock row with all 26 fields from ACTIVE_QUERIES_QUERY
# Create a mock row with all 27 fields from ACTIVE_QUERIES_QUERY
mock_row = (
1.234, # elapsed
'abc-123-def', # query_id
Expand Down Expand Up @@ -223,6 +223,7 @@ def test_normalize_row_with_all_fields(check_with_dbm):
'app-server-01', # client_hostname
0, # is_cancelled
'python-requests/2.28.0', # http_user_agent
'clickhouse-01', # server_node
)

normalized_row = samples._normalize_row(mock_row)
Expand Down Expand Up @@ -262,6 +263,9 @@ def test_normalize_row_with_all_fields(check_with_dbm):
# Verify HTTP field
assert normalized_row['http_user_agent'] == 'python-requests/2.28.0'

# Verify server node field
assert normalized_row['server_node'] == 'clickhouse-01'

# Verify obfuscation happened
assert 'statement' in normalized_row
assert 'query_signature' in normalized_row
Expand Down
Loading