Skip to content

Commit cb0221a

Browse files
authored
Implement live metrics filtering for docs (Azure#38925)
1 parent 42ff95a commit cb0221a

File tree

10 files changed

+486
-220
lines changed

10 files changed

+486
-220
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@
44

55
### Features Added
66

7+
- Implement live metrics filtering for metrics
8+
([#37998](https://github.com/Azure/azure-sdk-for-python/pull/37998))
9+
- Add applying filter/validating filter logic to live metrics filtering
10+
([#38451](https://github.com/Azure/azure-sdk-for-python/pull/38451))
11+
- Implement live metrics filtering for docs
12+
([#38925](https://github.com/Azure/azure-sdk-for-python/pull/38925))
13+
714
### Breaking Changes
815

916
### Bugs Fixed

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_filter.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import (
99
DerivedMetricInfo,
10+
DocumentStreamInfo,
1011
FilterConjunctionGroupInfo,
1112
FilterInfo,
1213
PredicateType,
@@ -18,14 +19,18 @@
1819
from azure.monitor.opentelemetry.exporter._quickpulse._state import (
1920
_clear_quickpulse_projection_map,
2021
_set_quickpulse_derived_metric_infos,
22+
_set_quickpulse_doc_stream_infos,
2123
_set_quickpulse_etag,
2224
)
2325
from azure.monitor.opentelemetry.exporter._quickpulse._types import (
2426
_DATA_FIELD_NAMES,
2527
_TelemetryData,
2628
)
2729
from azure.monitor.opentelemetry.exporter._quickpulse._utils import _filter_time_stamp_to_ms
28-
from azure.monitor.opentelemetry.exporter._quickpulse._validate import _validate_derived_metric_info
30+
from azure.monitor.opentelemetry.exporter._quickpulse._validate import (
31+
_validate_derived_metric_info,
32+
_validate_document_filter_group_info,
33+
)
2934

3035

3136
# Apply filter configuration based off response
@@ -38,7 +43,7 @@ def _update_filter_configuration(etag: str, config_bytes: bytes):
3843
# Process metric filter configuration
3944
_parse_metric_filter_configuration(config)
4045
# # Process document filter configuration
41-
# _parse_document_filter_configuration(config)
46+
_parse_document_filter_configuration(config)
4247
# Update new etag
4348
_set_quickpulse_etag(etag)
4449

@@ -67,21 +72,23 @@ def _parse_metric_filter_configuration(config: Dict[str, Any]) -> None:
6772
_set_quickpulse_derived_metric_infos(metric_infos)
6873

6974

70-
# def _parse_document_filter_configuration(config: Dict[str, Any]) -> None:
71-
# # Process document filter configuration
72-
# doc_infos: Dict[TelemetryType, Dict[str, List[FilterConjunctionGroupInfo]]] = {}
73-
# for doc_stream_dict in config.get("document_streams", []):
74-
# doc_stream = DocumentStreamInfo.from_dict(doc_stream_dict)
75-
# for doc_filter_group in doc_stream.document_filter_groups:
76-
# if not _validate_document_filter_group_info(doc_filter_group):
77-
# continue
78-
# # TODO: Rename exception fields
79-
# telemetry_type: TelemetryType = TelemetryType(doc_filter_group.telemetry_type)
80-
# if telemetry_type not in doc_infos:
81-
# doc_infos[telemetry_type] = {}
82-
# if doc_stream.id not in doc_infos[telemetry_type]:
83-
# doc_infos[telemetry_type][doc_stream.id] = []
84-
# doc_infos[telemetry_type][doc_stream.id].append(doc_filter_group.filters)
75+
def _parse_document_filter_configuration(config: Dict[str, Any]) -> None:
76+
# Process document filter configuration
77+
doc_infos: Dict[TelemetryType, Dict[str, List[FilterConjunctionGroupInfo]]] = {}
78+
for doc_stream_dict in config.get("DocumentStreams", []):
79+
doc_stream = DocumentStreamInfo.from_dict(doc_stream_dict)
80+
for doc_filter_group in doc_stream.document_filter_groups:
81+
if not _validate_document_filter_group_info(doc_filter_group):
82+
continue
83+
# Rename exception fields by parsing out "Exception." portion
84+
_rename_exception_fields_for_filtering(doc_filter_group.filters)
85+
telemetry_type: TelemetryType = TelemetryType(doc_filter_group.telemetry_type)
86+
if telemetry_type not in doc_infos:
87+
doc_infos[telemetry_type] = {}
88+
if doc_stream.id not in doc_infos[telemetry_type]:
89+
doc_infos[telemetry_type][doc_stream.id] = []
90+
doc_infos[telemetry_type][doc_stream.id].append(doc_filter_group.filters)
91+
_set_quickpulse_doc_stream_infos(doc_infos)
8592

8693

8794
def _rename_exception_fields_for_filtering(filter_groups: FilterConjunctionGroupInfo):

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py

Lines changed: 65 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
33
# cSpell:disable
4-
from typing import Any
4+
from typing import Any, Dict, List, Optional
55

66
import logging
77
import platform
@@ -39,9 +39,12 @@
3939
_QuickpulseMetricReader,
4040
)
4141
from azure.monitor.opentelemetry.exporter._quickpulse._filter import (
42+
_check_filters,
4243
_check_metric_filters,
4344
)
4445
from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import (
46+
DerivedMetricInfo,
47+
FilterConjunctionGroupInfo,
4548
MonitoringDataPoint,
4649
TelemetryType,
4750
)
@@ -53,6 +56,7 @@
5356
_is_post_state,
5457
_append_quickpulse_document,
5558
_get_quickpulse_derived_metric_infos,
59+
_get_quickpulse_doc_stream_infos,
5660
_set_global_quickpulse_state,
5761
)
5862
from azure.monitor.opentelemetry.exporter._quickpulse._types import (
@@ -174,8 +178,6 @@ def _record_span(self, span: ReadableSpan) -> None:
174178
# Only record if in post state
175179
if _is_post_state():
176180
try:
177-
document = _get_span_document(span)
178-
_append_quickpulse_document(document)
179181
duration_ms = 0
180182
if span.end_time and span.start_time:
181183
duration_ms = (span.end_time - span.start_time) / 1e9 # type: ignore
@@ -195,13 +197,14 @@ def _record_span(self, span: ReadableSpan) -> None:
195197
self._dependency_failure_rate_counter.add(1)
196198
self._dependency_duration.record(duration_ms)
197199

198-
metric_infos_dict = _get_quickpulse_derived_metric_infos()
199-
# check if filtering is enabled
200-
if metric_infos_dict:
201-
# Derive metrics for quickpulse filtering
202-
data = _TelemetryData._from_span(span)
203-
_derive_metrics_from_telemetry_data(data)
204-
# TODO: derive exception metrics from span events
200+
# Derive metrics for quickpulse filtering
201+
data = _TelemetryData._from_span(span)
202+
_derive_metrics_from_telemetry_data(data)
203+
204+
# Process docs for quickpulse filtering
205+
_apply_document_filters_from_telemetry_data(data)
206+
207+
# TODO: derive exception metrics from span events
205208
except Exception: # pylint: disable=broad-except
206209
_logger.exception("Exception occurred while recording span.")
207210

@@ -210,31 +213,33 @@ def _record_log_record(self, log_data: LogData) -> None:
210213
if _is_post_state():
211214
try:
212215
if log_data.log_record:
216+
exc_type = None
213217
log_record = log_data.log_record
214218
if log_record.attributes:
215-
document = _get_log_record_document(log_data)
216-
_append_quickpulse_document(document)
217219
exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE)
218220
exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
219221
if exc_type is not None or exc_message is not None:
220222
self._exception_rate_counter.add(1)
221223

222-
metric_infos_dict = _get_quickpulse_derived_metric_infos()
223-
# check if filtering is enabled
224-
if metric_infos_dict:
225-
# Derive metrics for quickpulse filtering
226-
data = _TelemetryData._from_log_record(log_record)
227-
_derive_metrics_from_telemetry_data(data)
224+
# Derive metrics for quickpulse filtering
225+
data = _TelemetryData._from_log_record(log_record)
226+
_derive_metrics_from_telemetry_data(data)
227+
228+
# Process docs for quickpulse filtering
229+
_apply_document_filters_from_telemetry_data(data, exc_type) # type: ignore
228230
except Exception: # pylint: disable=broad-except
229231
_logger.exception("Exception occurred while recording log record.")
230232

231233

232234
# Filtering
233235

234-
# Called by record_span/record_log when processing a span/log_record
236+
# Called by record_span/record_log when processing a span/log_record for metrics filtering
235237
# Derives metrics from projections if applicable to current filters in config
236238
def _derive_metrics_from_telemetry_data(data: _TelemetryData):
237-
metric_infos_dict = _get_quickpulse_derived_metric_infos()
239+
metric_infos_dict: Dict[TelemetryType, List[DerivedMetricInfo]] = _get_quickpulse_derived_metric_infos()
240+
# if empty, filtering was not configured
241+
if not metric_infos_dict:
242+
return
238243
metric_infos = [] # type: ignore
239244
if isinstance(data, _RequestData):
240245
metric_infos = metric_infos_dict.get(TelemetryType.REQUEST) # type: ignore
@@ -249,4 +254,44 @@ def _derive_metrics_from_telemetry_data(data: _TelemetryData):
249254
# generate filtered metrics
250255
_create_projections(metric_infos, data)
251256

257+
258+
# Called by record_span/record_log when processing a span/log_record for docs filtering
259+
# Finds doc stream Ids and their doc filter configurations
260+
def _apply_document_filters_from_telemetry_data(data: _TelemetryData, exc_type: Optional[str] = None):
261+
doc_config_dict: Dict[TelemetryType, Dict[str, List[FilterConjunctionGroupInfo]]] = _get_quickpulse_doc_stream_infos() # pylint: disable=C0301
262+
stream_ids = set()
263+
doc_config = {} # type: ignore
264+
if isinstance(data, _RequestData):
265+
doc_config = doc_config_dict.get(TelemetryType.REQUEST, {}) # type: ignore
266+
elif isinstance(data, _DependencyData):
267+
doc_config = doc_config_dict.get(TelemetryType.DEPENDENCY, {}) # type: ignore
268+
elif isinstance(data, _ExceptionData):
269+
doc_config = doc_config_dict.get(TelemetryType.EXCEPTION, {}) # type: ignore
270+
elif isinstance(data, _TraceData):
271+
doc_config = doc_config_dict.get(TelemetryType.TRACE, {}) # type: ignore
272+
for stream_id, filter_groups in doc_config.items():
273+
for filter_group in filter_groups:
274+
if _check_filters(filter_group.filters, data):
275+
stream_ids.add(stream_id)
276+
break
277+
278+
# We only append and send the document if either:
279+
# 1. The document matched the filtering for a specific streamId
280+
# 2. Filtering was not enabled for this telemetry type (empty doc_config)
281+
if len(stream_ids) > 0 or not doc_config:
282+
if type(data) in (_DependencyData, _RequestData):
283+
document = _get_span_document(data) # type: ignore
284+
else:
285+
document = _get_log_record_document(data, exc_type) # type: ignore
286+
# A stream (with a unique streamId) is relevant if there are multiple sources sending to the same
287+
# ApplicationInsights instace with live metrics enabled
288+
# Modify the document's streamIds to determine which stream to send to in post
289+
# Note that the default case is that the list of document_stream_ids is empty, in which
290+
# case no filtering is done for the telemetry type and it is sent to all streams
291+
if stream_ids:
292+
document.document_stream_ids = list(stream_ids)
293+
294+
# Add the generated document to be sent to quickpulse
295+
_append_quickpulse_document(document)
296+
252297
# cSpell:enable

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
AggregationType,
1616
DerivedMetricInfo,
1717
DocumentIngress,
18+
FilterConjunctionGroupInfo,
1819
TelemetryType,
1920
)
2021

@@ -39,6 +40,7 @@ class _QuickpulseState(Enum):
3940
_QUICKPULSE_ETAG = ""
4041
_QUICKPULSE_DERIVED_METRIC_INFOS: Dict[TelemetryType, List[DerivedMetricInfo]] = {}
4142
_QUICKPULSE_PROJECTION_MAP: Dict[str, Tuple[AggregationType, float, int]] = {}
43+
_QUICKPULSE_DOC_STREAM_INFOS: Dict[TelemetryType, Dict[str, List[FilterConjunctionGroupInfo]]] = {}
4244

4345

4446
def _set_global_quickpulse_state(state: _QuickpulseState) -> None:
@@ -127,8 +129,8 @@ def _get_quickpulse_etag() -> str:
127129
return _QUICKPULSE_ETAG
128130

129131

130-
# Used for updating filter configuration when etag has changed
131-
# Contains filter and projection to apply for each telemetry type if exists
132+
# Used for updating metric filter configuration when etag has changed
133+
# Contains filter and projection of metrics to apply for each telemetry type if exists
132134
def _set_quickpulse_derived_metric_infos(filters: Dict[TelemetryType, List[DerivedMetricInfo]]) -> None:
133135
# pylint: disable=global-statement
134136
global _QUICKPULSE_DERIVED_METRIC_INFOS
@@ -175,3 +177,16 @@ def _clear_quickpulse_projection_map():
175177
# pylint: disable=global-variable-not-assigned
176178
global _QUICKPULSE_PROJECTION_MAP
177179
_QUICKPULSE_PROJECTION_MAP.clear()
180+
181+
182+
# Used for updating doc filter configuration when etag has changed
183+
# Contains filter and projection of docs to apply for each telemetry type if exists
184+
# Format is Dict[TelemetryType, Dict[stream.id, List[FilterConjunctionGroupInfo]]]
185+
def _set_quickpulse_doc_stream_infos(filters: Dict[TelemetryType, Dict[str, List[FilterConjunctionGroupInfo]]]) -> None:
186+
# pylint: disable=global-statement
187+
global _QUICKPULSE_DOC_STREAM_INFOS
188+
_QUICKPULSE_DOC_STREAM_INFOS = filters
189+
190+
191+
def _get_quickpulse_doc_stream_infos() -> Dict[TelemetryType, Dict[str, List[FilterConjunctionGroupInfo]]]:
192+
return _QUICKPULSE_DOC_STREAM_INFOS

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_types.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ def _from_span(span: ReadableSpan):
9494
attributes = {}
9595
dependency_type = "InProc"
9696
data = ""
97+
if span.end_time and span.start_time:
98+
duration_ms = (span.end_time - span.start_time) / 1e9
9799
if span.attributes:
98100
attributes = span.attributes
99101
target = trace_utils._get_target_for_dependency_from_peer(attributes)

0 commit comments

Comments
 (0)