Skip to content

Commit b577491

Browse files
authored
Refactor live metrics filtering modules (Azure#38837)
1 parent 972ebbb commit b577491

File tree

14 files changed

+1058
-847
lines changed

14 files changed

+1058
-847
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
### Other Changes
1717

18+
- Refactored live metrics filtering modules
19+
([#38837](https://github.com/Azure/azure-sdk-for-python/pull/38837))
20+
1821
## 1.0.0b32 (2024-11-04)
1922

2023
### Breaking Changes
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
# cSpell:disable
4+
5+
from datetime import datetime
6+
from typing import Iterable
7+
8+
import psutil
9+
10+
from opentelemetry.metrics import CallbackOptions, Observation
11+
12+
from azure.monitor.opentelemetry.exporter._quickpulse._state import (
13+
_get_quickpulse_last_process_cpu,
14+
_get_quickpulse_last_process_time,
15+
_get_quickpulse_process_elapsed_time,
16+
_set_quickpulse_last_process_cpu,
17+
_set_quickpulse_last_process_time,
18+
_set_quickpulse_process_elapsed_time,
19+
)
20+
21+
PROCESS = psutil.Process()
22+
NUM_CPUS = psutil.cpu_count()
23+
24+
25+
# pylint: disable=unused-argument
26+
def _get_process_memory(options: CallbackOptions) -> Iterable[Observation]:
27+
memory = 0
28+
try:
29+
# rss is non-swapped physical memory a process has used
30+
memory = PROCESS.memory_info().rss
31+
except (psutil.NoSuchProcess, psutil.AccessDenied):
32+
pass
33+
yield Observation(memory, {})
34+
35+
36+
# pylint: disable=unused-argument
37+
def _get_process_time_normalized_old(options: CallbackOptions) -> Iterable[Observation]:
38+
normalized_cpu_percentage = 0.0
39+
try:
40+
cpu_times = PROCESS.cpu_times()
41+
# total process time is user + system in s
42+
total_time_s = cpu_times.user + cpu_times.system
43+
process_time_s = total_time_s - _get_quickpulse_last_process_time()
44+
_set_quickpulse_last_process_time(process_time_s)
45+
# Find elapsed time in s since last collection
46+
current_time = datetime.now()
47+
elapsed_time_s = (current_time - _get_quickpulse_process_elapsed_time()).total_seconds()
48+
_set_quickpulse_process_elapsed_time(current_time)
49+
# Obtain cpu % by dividing by elapsed time
50+
cpu_percentage = process_time_s / elapsed_time_s
51+
# Normalize by dividing by amount of logical cpus
52+
normalized_cpu_percentage = cpu_percentage / NUM_CPUS
53+
_set_quickpulse_last_process_cpu(normalized_cpu_percentage)
54+
except (psutil.NoSuchProcess, psutil.AccessDenied, ZeroDivisionError):
55+
pass
56+
yield Observation(normalized_cpu_percentage, {})
57+
58+
59+
# pylint: disable=unused-argument
60+
def _get_process_time_normalized(options: CallbackOptions) -> Iterable[Observation]:
61+
yield Observation(_get_quickpulse_last_process_cpu(), {})
62+
63+
# cSpell:enable

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from azure.monitor.opentelemetry.exporter._quickpulse._generated._configuration import QuickpulseClientConfiguration
3535
from azure.monitor.opentelemetry.exporter._quickpulse._generated._client import QuickpulseClient
3636
from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint
37+
from azure.monitor.opentelemetry.exporter._quickpulse._filter import _update_filter_configuration
3738
from azure.monitor.opentelemetry.exporter._quickpulse._policy import _QuickpulseRedirectPolicy
3839
from azure.monitor.opentelemetry.exporter._quickpulse._state import (
3940
_get_and_clear_quickpulse_documents,
@@ -46,7 +47,6 @@
4647
)
4748
from azure.monitor.opentelemetry.exporter._quickpulse._utils import (
4849
_metric_to_quick_pulse_data_points,
49-
_update_filter_configuration,
5050
)
5151
from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser
5252
from azure.monitor.opentelemetry.exporter._utils import (
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import json
4+
5+
from dataclasses import fields
6+
from typing import Any, Dict, List
7+
8+
from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import (
9+
DerivedMetricInfo,
10+
FilterConjunctionGroupInfo,
11+
FilterInfo,
12+
PredicateType,
13+
TelemetryType,
14+
)
15+
from azure.monitor.opentelemetry.exporter._quickpulse._projection import (
16+
_init_derived_metric_projection,
17+
)
18+
from azure.monitor.opentelemetry.exporter._quickpulse._state import (
19+
_clear_quickpulse_projection_map,
20+
_set_quickpulse_derived_metric_infos,
21+
_set_quickpulse_etag,
22+
)
23+
from azure.monitor.opentelemetry.exporter._quickpulse._types import (
24+
_DATA_FIELD_NAMES,
25+
_TelemetryData,
26+
)
27+
from azure.monitor.opentelemetry.exporter._quickpulse._utils import _filter_time_stamp_to_ms
28+
from azure.monitor.opentelemetry.exporter._quickpulse._validate import _validate_derived_metric_info
29+
30+
31+
# Apply filter configuration based off response
32+
# Called on post response from exporter
33+
def _update_filter_configuration(etag: str, config_bytes: bytes):
34+
# Clear projection map
35+
_clear_quickpulse_projection_map()
36+
# config is a byte string that when decoded is a json
37+
config = json.loads(config_bytes.decode("utf-8"))
38+
# Process metric filter configuration
39+
_parse_metric_filter_configuration(config)
40+
# # Process document filter configuration
41+
# _parse_document_filter_configuration(config)
42+
# Update new etag
43+
_set_quickpulse_etag(etag)
44+
45+
46+
def _parse_metric_filter_configuration(config: Dict[str, Any]) -> None:
47+
seen_ids = set()
48+
# Process metric filter configuration
49+
metric_infos: Dict[TelemetryType, List[DerivedMetricInfo]] = {}
50+
for metric_info_dict in config.get("Metrics", []):
51+
metric_info = DerivedMetricInfo.from_dict(metric_info_dict)
52+
# Skip duplicate ids
53+
if metric_info.id in seen_ids:
54+
continue
55+
if not _validate_derived_metric_info(metric_info):
56+
continue
57+
# Rename exception fields by parsing out "Exception." portion
58+
for filter_group in metric_info.filter_groups:
59+
_rename_exception_fields_for_filtering(filter_group)
60+
telemetry_type: TelemetryType = TelemetryType(metric_info.telemetry_type)
61+
metric_info_list = metric_infos.get(telemetry_type, [])
62+
metric_info_list.append(metric_info)
63+
metric_infos[telemetry_type] = metric_info_list
64+
seen_ids.add(metric_info.id)
65+
# Initialize projections from this derived metric info
66+
_init_derived_metric_projection(metric_info)
67+
_set_quickpulse_derived_metric_infos(metric_infos)
68+
69+
70+
# def _parse_document_filter_configuration(config: Dict[str, Any]) -> None:
71+
# # Process document filter configuration
72+
# doc_infos: Dict[TelemetryType, Dict[str, List[FilterConjunctionGroupInfo]]] = {}
73+
# for doc_stream_dict in config.get("document_streams", []):
74+
# doc_stream = DocumentStreamInfo.from_dict(doc_stream_dict)
75+
# for doc_filter_group in doc_stream.document_filter_groups:
76+
# if not _validate_document_filter_group_info(doc_filter_group):
77+
# continue
78+
# # TODO: Rename exception fields
79+
# telemetry_type: TelemetryType = TelemetryType(doc_filter_group.telemetry_type)
80+
# if telemetry_type not in doc_infos:
81+
# doc_infos[telemetry_type] = {}
82+
# if doc_stream.id not in doc_infos[telemetry_type]:
83+
# doc_infos[telemetry_type][doc_stream.id] = []
84+
# doc_infos[telemetry_type][doc_stream.id].append(doc_filter_group.filters)
85+
86+
87+
def _rename_exception_fields_for_filtering(filter_groups: FilterConjunctionGroupInfo):
88+
for filter in filter_groups.filters:
89+
if filter.field_name.startswith("Exception."):
90+
filter.field_name = filter.field_name.replace("Exception.", "")
91+
92+
93+
def _check_metric_filters(metric_infos: List[DerivedMetricInfo], data: _TelemetryData) -> bool:
94+
match = False
95+
for metric_info in metric_infos:
96+
# Should only be a single `FilterConjunctionGroupInfo` in `filter_groups`
97+
# but we use a logical OR to match if there is more than one
98+
for group in metric_info.filter_groups:
99+
match = match or _check_filters(group.filters, data)
100+
return match
101+
102+
103+
# pylint: disable=R0911
104+
def _check_filters(filters: List[FilterInfo], data: _TelemetryData) -> bool:
105+
if not filters:
106+
return True
107+
# # All of the filters need to match for this to return true (and operation).
108+
for filter in filters:
109+
name = filter.field_name
110+
predicate = filter.predicate
111+
comparand = filter.comparand
112+
if name == "*":
113+
return _check_any_field_filter(filter, data)
114+
if name.startswith("CustomDimensions."):
115+
return _check_custom_dim_field_filter(filter, data.custom_dimensions)
116+
field_names = _DATA_FIELD_NAMES.get(type(data))
117+
if field_names is None:
118+
field_names = {}
119+
field_name = field_names.get(name.lower(), "")
120+
val = getattr(data, field_name, "")
121+
if name == "Success":
122+
if predicate == PredicateType.EQUAL:
123+
return str(val).lower() == comparand.lower()
124+
if predicate == PredicateType.NOT_EQUAL:
125+
return str(val).lower() != comparand.lower()
126+
elif name in ("ResultCode", "ResponseCode", "Duration"):
127+
try:
128+
val = int(val)
129+
except Exception: # pylint: disable=broad-exception-caught,invalid-name
130+
return False
131+
numerical_val = _filter_time_stamp_to_ms(comparand) if name == "Duration" else int(comparand)
132+
if numerical_val is None:
133+
return False
134+
if predicate == PredicateType.EQUAL:
135+
return val == numerical_val
136+
if predicate == PredicateType.NOT_EQUAL:
137+
return val != numerical_val
138+
if predicate == PredicateType.GREATER_THAN:
139+
return val > numerical_val
140+
if predicate == PredicateType.GREATER_THAN_OR_EQUAL:
141+
return val >= numerical_val
142+
if predicate == PredicateType.LESS_THAN:
143+
return val < numerical_val
144+
if predicate == PredicateType.LESS_THAN_OR_EQUAL:
145+
return val <= numerical_val
146+
return False
147+
else:
148+
# string fields
149+
return _field_string_compare(str(val), comparand, predicate)
150+
151+
return False
152+
153+
154+
def _check_any_field_filter(filter: FilterInfo, data: _TelemetryData) -> bool:
155+
# At this point, the only predicates possible to pass in are Contains and DoesNotContain
156+
# At config validation time the predicate is checked to be one of these two.
157+
for field in fields(data):
158+
if field.name == "custom_dimensions":
159+
for val in data.custom_dimensions.values():
160+
if _field_string_compare(str(val), filter.comparand, filter.predicate):
161+
return True
162+
else:
163+
val = getattr(data, field.name, None) # type: ignore
164+
if val is not None:
165+
if _field_string_compare(str(val), filter.comparand, filter.predicate):
166+
return True
167+
return False
168+
169+
170+
def _check_custom_dim_field_filter(filter: FilterInfo, custom_dimensions: Dict[str, str]) -> bool:
171+
field = filter.field_name.replace("CustomDimensions.", "")
172+
value = custom_dimensions.get(field)
173+
if value is not None:
174+
return _field_string_compare(str(value), filter.comparand, filter.predicate)
175+
return False
176+
177+
178+
def _field_string_compare(value: str, comparand: str, predicate: str) -> bool:
179+
if predicate == PredicateType.EQUAL:
180+
return value == comparand
181+
if predicate == PredicateType.NOT_EQUAL:
182+
return value != comparand
183+
if predicate == PredicateType.CONTAINS:
184+
return comparand.lower() in value.lower()
185+
if predicate == PredicateType.DOES_NOT_CONTAIN:
186+
return comparand.lower() not in value.lower()
187+
return False

0 commit comments

Comments
 (0)