Skip to content

Commit 75c24da

Browse files
Trace Based Sampling Log Filter Implementation (#43811)
* Trace Based Sampling and Minimum Severity Log Filters Implementation * Added CHANGELOG * Fix lint and added tests * Modified severity logic * Added type safety * Fix cspell errors * Remove extra line * Remove extraneous new line * Fix filter labels * Fix description * Distro changes for Trace based sampling * Remove previous logic and add code for processor and tests * Remove extra line * Add back space * Remove unused import * Fix cspell errors * Fix lint errors * Fix argument * Update sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/logs/_processor.py Update docstring Co-authored-by: Hector Hernandez <[email protected]> * Fix mypy and naming consistency * Fix lint --------- Co-authored-by: Hector Hernandez <[email protected]>
1 parent b9deb55 commit 75c24da

File tree

9 files changed

+365
-10
lines changed

9 files changed

+365
-10
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
## 1.0.0b45 (Unreleased)
44

55
### Features Added
6+
- Added new log record processor to support Trace Based Sampling
7+
([#43811](https://github.com/Azure/azure-sdk-for-python/pull/43811))
68
- Added Operation Name Propagation for Dependencies and Logs
79
([#43588](https://github.com/Azure/azure-sdk-for-python/pull/43588))
810
- Added local storage support for multiple users on the same Linux system
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
from typing import Optional, Dict, Any
5+
6+
from opentelemetry.sdk._logs import LogData
7+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter
8+
from opentelemetry.trace import get_current_span
9+
10+
11+
class _AzureBatchLogRecordProcessor(BatchLogRecordProcessor):
12+
"""Azure Monitor Log Record Processor with support for trace-based sampling."""
13+
14+
def __init__(
15+
self,
16+
log_exporter: LogExporter,
17+
options: Optional[Dict[str, Any]] = None,
18+
):
19+
"""Initialize the Azure Monitor Log Record Processor.
20+
21+
:param exporter: The LogRecordExporter to use for exporting logs.
22+
:param options: Optional configuration dictionary. Supported options:
23+
- enable_trace_based_sampling_for_logs(bool): Enable trace-based sampling for logs.
24+
"""
25+
super().__init__(log_exporter)
26+
self._options = options or {}
27+
self._enable_trace_based_sampling_for_logs = self._options.get("enable_trace_based_sampling_for_logs")
28+
29+
def on_emit(self, log_data: LogData) -> None:
30+
# cspell: disable
31+
""" Determines whether the logger should drop log records associated with unsampled traces.
32+
If `trace_based_sampling` is `true`, log records associated with unsampled traces are dropped by the `Logger`.
33+
A log record is considered associated with an unsampled trace if it has a valid `SpanId` and its
34+
`TraceFlags` indicate that the trace is unsampled. A log record that isn't associated with a trace
35+
context is not affected by this parameter and therefore bypasses trace based sampling filtering.
36+
37+
:param log_data: Contains the log record to be exported
38+
:type log_data: LogData
39+
"""
40+
41+
# cspell: enable
42+
if self._enable_trace_based_sampling_for_logs:
43+
if hasattr(log_data, "log_record") and log_data.log_record is not None:
44+
if hasattr(log_data.log_record, "context") and log_data.log_record.context is not None:
45+
span = get_current_span(log_data.log_record.context)
46+
span_context = span.get_span_context()
47+
if span_context.is_valid and not span_context.trace_flags.sampled:
48+
return
49+
super().on_emit(log_data)
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
import os
2+
import unittest
3+
from unittest import mock
4+
5+
from opentelemetry.sdk import _logs
6+
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
7+
from opentelemetry._logs.severity import SeverityNumber
8+
from opentelemetry.trace import TraceFlags
9+
10+
from azure.monitor.opentelemetry.exporter.export.logs._exporter import (
11+
AzureMonitorLogExporter,
12+
)
13+
from azure.monitor.opentelemetry.exporter.export.logs._processor import (
14+
_AzureBatchLogRecordProcessor,
15+
)
16+
17+
18+
# pylint: disable=protected-access
19+
class TestAzureBatchLogRecordProcessor(unittest.TestCase):
20+
"""Test cases for the Azure Monitor Batch Log Record Processor with trace-based sampling."""
21+
22+
@classmethod
23+
def setUpClass(cls):
24+
os.environ.pop("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL", None)
25+
os.environ.pop("APPINSIGHTS_INSTRUMENTATIONKEY", None)
26+
os.environ["APPINSIGHTS_INSTRUMENTATIONKEY"] = "1234abcd-5678-4efa-8abc-1234567890ab"
27+
os.environ["APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"] = "true"
28+
cls._exporter = AzureMonitorLogExporter()
29+
30+
def test_processor_initialization_without_trace_based_sampling(self):
31+
"""Test processor initialization without trace-based sampling enabled."""
32+
processor = _AzureBatchLogRecordProcessor(
33+
self._exporter,
34+
options={}
35+
)
36+
self.assertFalse(processor._enable_trace_based_sampling_for_logs)
37+
38+
def test_processor_initialization_with_trace_based_sampling(self):
39+
"""Test processor initialization with trace-based sampling enabled."""
40+
processor = _AzureBatchLogRecordProcessor(
41+
self._exporter,
42+
options={"enable_trace_based_sampling_for_logs": True}
43+
)
44+
self.assertTrue(processor._enable_trace_based_sampling_for_logs)
45+
46+
def test_processor_initialization_without_options(self):
47+
"""Test processor initialization without options."""
48+
processor = _AzureBatchLogRecordProcessor(self._exporter)
49+
self.assertIsNone(processor._enable_trace_based_sampling_for_logs)
50+
51+
def test_on_emit_with_trace_based_sampling_disabled(self):
52+
"""Test on_emit does not filter logs when trace-based sampling is disabled."""
53+
processor = _AzureBatchLogRecordProcessor(
54+
self._exporter,
55+
options={}
56+
)
57+
58+
mock_context = mock.Mock()
59+
mock_span_context = mock.Mock()
60+
mock_span_context.is_valid = True
61+
mock_span_context.trace_flags.sampled = False
62+
63+
mock_span = mock.Mock()
64+
mock_span.get_span_context.return_value = mock_span_context
65+
66+
log_record = _logs.LogData(
67+
_logs.LogRecord(
68+
timestamp=1646865018558419456,
69+
trace_id=125960616039069540489478540494783893221,
70+
span_id=2909973987304607650,
71+
severity_text="INFO",
72+
trace_flags=TraceFlags.DEFAULT,
73+
severity_number=SeverityNumber.INFO,
74+
body="Test log",
75+
context=mock_context
76+
),
77+
InstrumentationScope("test_name"),
78+
)
79+
80+
# Mock the parent class's on_emit method through super
81+
with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
82+
processor.on_emit(log_record)
83+
# Parent on_emit should be called because trace-based sampling is disabled
84+
parent_on_emit_mock.assert_called_once()
85+
86+
def test_on_emit_with_trace_based_sampling_enabled_and_unsampled_trace(self): # cspell:disable-line
87+
"""Test on_emit filters logs when trace-based sampling is enabled and trace is unsampled.""" # cspell:disable-line
88+
processor = _AzureBatchLogRecordProcessor(
89+
self._exporter,
90+
options={"enable_trace_based_sampling_for_logs": True}
91+
)
92+
93+
mock_context = mock.Mock()
94+
mock_span_context = mock.Mock()
95+
mock_span_context.is_valid = True
96+
mock_span_context.trace_flags.sampled = False
97+
98+
mock_span = mock.Mock()
99+
mock_span.get_span_context.return_value = mock_span_context
100+
101+
log_record = _logs.LogData(
102+
_logs.LogRecord(
103+
timestamp=1646865018558419456,
104+
trace_id=125960616039069540489478540494783893221,
105+
span_id=2909973987304607650,
106+
severity_text="INFO",
107+
trace_flags=TraceFlags.DEFAULT,
108+
severity_number=SeverityNumber.INFO,
109+
body="Test log",
110+
context=mock_context
111+
),
112+
InstrumentationScope("test_name"),
113+
)
114+
# Mock get_current_span to return our mock span with proper get_span_context method
115+
with mock.patch("azure.monitor.opentelemetry.exporter.export.logs._processor.get_current_span", return_value=mock_span):
116+
# Mock only the parent class's on_emit method
117+
with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
118+
processor.on_emit(log_record)
119+
# Parent on_emit should NOT be called because trace is unsampled and filtering is enabled # cspell:disable-line
120+
parent_on_emit_mock.assert_not_called()
121+
122+
def test_on_emit_with_trace_based_sampling_enabled_and_sampled_trace(self):
123+
"""Test on_emit does not filter logs when trace-based sampling is enabled and trace is sampled."""
124+
processor = _AzureBatchLogRecordProcessor(
125+
self._exporter,
126+
options={"enable_trace_based_sampling_for_logs": True}
127+
)
128+
129+
mock_context = mock.Mock()
130+
mock_span_context = mock.Mock()
131+
mock_span_context.is_valid = True
132+
mock_span_context.trace_flags.sampled = True
133+
134+
mock_span = mock.Mock()
135+
mock_span.get_span_context.return_value = mock_span_context
136+
137+
log_record = _logs.LogData(
138+
_logs.LogRecord(
139+
timestamp=1646865018558419456,
140+
trace_id=125960616039069540489478540494783893221,
141+
span_id=2909973987304607650,
142+
severity_text="INFO",
143+
trace_flags=TraceFlags.SAMPLED,
144+
severity_number=SeverityNumber.INFO,
145+
body="Test log",
146+
context=mock_context
147+
),
148+
InstrumentationScope("test_name"),
149+
)
150+
151+
with mock.patch("azure.monitor.opentelemetry.exporter.export.logs._processor.get_current_span", return_value=mock_span):
152+
with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
153+
processor.on_emit(log_record)
154+
# Parent on_emit should be called because trace is sampled
155+
parent_on_emit_mock.assert_called_once()
156+
157+
def test_on_emit_with_trace_based_sampling_enabled_and_invalid_span_context(self):
158+
"""Test on_emit does not filter logs with invalid span context."""
159+
processor = _AzureBatchLogRecordProcessor(
160+
self._exporter,
161+
options={"enable_trace_based_sampling_for_logs": True}
162+
)
163+
164+
mock_context = mock.Mock()
165+
mock_span_context = mock.Mock()
166+
mock_span_context.is_valid = False
167+
168+
mock_span = mock.Mock()
169+
mock_span.get_span_context.return_value = mock_span_context
170+
171+
log_record = _logs.LogData(
172+
_logs.LogRecord(
173+
timestamp=1646865018558419456,
174+
trace_id=125960616039069540489478540494783893221,
175+
span_id=2909973987304607650,
176+
severity_text="INFO",
177+
trace_flags=TraceFlags.DEFAULT,
178+
severity_number=SeverityNumber.INFO,
179+
body="Test log",
180+
context=mock_context
181+
),
182+
InstrumentationScope("test_name"),
183+
)
184+
185+
with mock.patch("azure.monitor.opentelemetry.exporter.export.logs._processor.get_current_span", return_value=mock_span):
186+
with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
187+
processor.on_emit(log_record)
188+
# Parent on_emit should be called because span context is invalid
189+
parent_on_emit_mock.assert_called_once()
190+
191+
def test_on_emit_with_trace_based_sampling_enabled_and_no_context(self):
192+
"""Test on_emit does not filter logs when there is no log record context."""
193+
processor = _AzureBatchLogRecordProcessor(
194+
self._exporter,
195+
options={"enable_trace_based_sampling_for_logs": True}
196+
)
197+
198+
log_record = _logs.LogData(
199+
_logs.LogRecord(
200+
timestamp=1646865018558419456,
201+
trace_id=125960616039069540489478540494783893221,
202+
span_id=2909973987304607650,
203+
severity_text="INFO",
204+
trace_flags=TraceFlags.DEFAULT,
205+
severity_number=SeverityNumber.INFO,
206+
body="Test log",
207+
context=None
208+
),
209+
InstrumentationScope("test_name"),
210+
)
211+
212+
with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
213+
processor.on_emit(log_record)
214+
# Parent on_emit should be called because there's no context
215+
parent_on_emit_mock.assert_called_once()
216+
217+
def test_on_emit_integration_with_multiple_log_records(self):
218+
"""Integration test: verify processor handles multiple log records correctly with trace-based sampling."""
219+
processor = _AzureBatchLogRecordProcessor(
220+
self._exporter,
221+
options={"enable_trace_based_sampling_for_logs": True}
222+
)
223+
224+
mock_context = mock.Mock()
225+
226+
# Create unsampled span context # cspell:disable-line
227+
mock_span_context_unsampled = mock.Mock() # cspell:disable-line
228+
mock_span_context_unsampled.is_valid = True # cspell:disable-line
229+
mock_span_context_unsampled.trace_flags.sampled = False # cspell:disable-line
230+
231+
mock_span_unsampled = mock.Mock() # cspell:disable-line
232+
mock_span_unsampled.get_span_context.return_value = mock_span_context_unsampled # cspell:disable-line
233+
234+
# Create sampled span context
235+
mock_span_context_sampled = mock.Mock()
236+
mock_span_context_sampled.is_valid = True
237+
mock_span_context_sampled.trace_flags.sampled = True
238+
239+
mock_span_sampled = mock.Mock()
240+
mock_span_sampled.get_span_context.return_value = mock_span_context_sampled
241+
242+
log_record_unsampled = _logs.LogData( # cspell:disable-line
243+
_logs.LogRecord(
244+
timestamp=1646865018558419456,
245+
trace_id=125960616039069540489478540494783893221,
246+
span_id=2909973987304607650,
247+
severity_text="INFO",
248+
trace_flags=TraceFlags.DEFAULT,
249+
severity_number=SeverityNumber.INFO,
250+
body="Unsampled log", # cspell:disable-line
251+
context=mock_context
252+
),
253+
InstrumentationScope("test_name"),
254+
)
255+
256+
log_record_sampled = _logs.LogData(
257+
_logs.LogRecord(
258+
timestamp=1646865018558419457,
259+
trace_id=125960616039069540489478540494783893222,
260+
span_id=2909973987304607651,
261+
severity_text="INFO",
262+
trace_flags=TraceFlags.SAMPLED,
263+
severity_number=SeverityNumber.INFO,
264+
body="Sampled log",
265+
context=mock_context
266+
),
267+
InstrumentationScope("test_name"),
268+
)
269+
270+
with mock.patch("azure.monitor.opentelemetry.exporter.export.logs._processor.get_current_span") as get_span_mock:
271+
with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
272+
# Test unsampled log is filtered # cspell:disable-line
273+
get_span_mock.return_value = mock_span_unsampled # cspell:disable-line
274+
processor.on_emit(log_record_unsampled) # cspell:disable-line
275+
parent_on_emit_mock.assert_not_called()
276+
277+
# Reset mock
278+
parent_on_emit_mock.reset_mock()
279+
get_span_mock.reset_mock()
280+
281+
# Test sampled log is not filtered
282+
get_span_mock.return_value = mock_span_sampled
283+
processor.on_emit(log_record_sampled)
284+
parent_on_emit_mock.assert_called_once()

sdk/monitor/azure-monitor-opentelemetry/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- Add `trace_based_sampling` logger parameters to filter logs
8+
([#43811](https://github.com/Azure/azure-sdk-for-python/pull/43811))
79
- Performance Counters
810
([#43262](https://github.com/Azure/azure-sdk-for-python/pull/43262))
911
- Adding more diagnostic log message IDs

sdk/monitor/azure-monitor-opentelemetry/azure/monitor/opentelemetry/_configure.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
SAMPLING_TRACES_PER_SECOND_ARG,
4040
SPAN_PROCESSORS_ARG,
4141
VIEWS_ARG,
42+
ENABLE_TRACE_BASED_SAMPLING_ARG,
4243
)
4344
from azure.monitor.opentelemetry._types import ConfigurationValue
4445
from azure.monitor.opentelemetry.exporter._quickpulse import ( # pylint: disable=import-error,no-name-in-module
@@ -109,6 +110,8 @@ def configure_azure_monitor(**kwargs) -> None: # pylint: disable=C4758
109110
`<tempfile.gettempdir()>/Microsoft/AzureMonitor/opentelemetry-python-<your-instrumentation-key>`.
110111
:keyword list[~opentelemetry.sdk.metrics.view.View] views: List of `View` objects to configure and filter
111112
metric output.
113+
:keyword bool enable_trace_based_sampling_for_logs: Boolean value to determine whether to enable trace based
114+
sampling for logs. Defaults to `False`
112115
:rtype: None
113116
"""
114117

@@ -203,7 +206,7 @@ def _setup_logging(configurations: Dict[str, ConfigurationValue]):
203206
try:
204207
from opentelemetry._logs import set_logger_provider
205208
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
206-
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
209+
from azure.monitor.opentelemetry.exporter.export.logs._processor import _AzureBatchLogRecordProcessor
207210

208211
from azure.monitor.opentelemetry.exporter import ( # pylint: disable=import-error,no-name-in-module
209212
AzureMonitorLogExporter
@@ -212,15 +215,17 @@ def _setup_logging(configurations: Dict[str, ConfigurationValue]):
212215
resource: Resource = configurations[RESOURCE_ARG] # type: ignore
213216
enable_performance_counters_config = configurations[ENABLE_PERFORMANCE_COUNTERS_ARG]
214217
logger_provider = LoggerProvider(resource=resource)
218+
enable_trace_based_sampling_for_logs = configurations[ENABLE_TRACE_BASED_SAMPLING_ARG]
215219
if configurations.get(ENABLE_LIVE_METRICS_ARG):
216220
qlp = _QuickpulseLogRecordProcessor()
217221
logger_provider.add_log_record_processor(qlp)
218222
if enable_performance_counters_config:
219223
pclp = _PerformanceCountersLogRecordProcessor()
220224
logger_provider.add_log_record_processor(pclp)
221225
log_exporter = AzureMonitorLogExporter(**configurations)
222-
log_record_processor = BatchLogRecordProcessor(
226+
log_record_processor = _AzureBatchLogRecordProcessor(
223227
log_exporter,
228+
{"enable_trace_based_sampling_for_logs": enable_trace_based_sampling_for_logs},
224229
)
225230
logger_provider.add_log_record_processor(log_record_processor)
226231
set_logger_provider(logger_provider)

0 commit comments

Comments
 (0)