-
Notifications
You must be signed in to change notification settings - Fork 739
Implement PoC of MeasurementProcessor proposal #4642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
1afaad4
664b741
e446a30
463d343
67629e8
b3c95d3
732238b
6e02f9d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,245 @@ | ||
# OpenTelemetry Python - MeasurementProcessor Implementation | ||
|
||
This implementation adds support for **MeasurementProcessor** to the OpenTelemetry Python SDK, following the [OpenTelemetry Specification PR #4318](https://github.com/open-telemetry/opentelemetry-specification/pull/4318). | ||
|
||
## Overview | ||
|
||
The MeasurementProcessor allows you to process measurements before they are aggregated and exported. This enables powerful use cases such as: | ||
|
||
- **Dynamic injection of additional attributes** to measurements based on Context (e.g., from Baggage) | ||
- **Dropping attributes** (e.g., removing sensitive information) | ||
- **Dropping individual measurements** (e.g., filtering invalid values) | ||
- **Modifying measurements** (e.g., unit conversion, value transformation) | ||
|
||
## Key Features | ||
|
||
### Chain-of-Responsibility Pattern | ||
|
||
Unlike existing processors in OpenTelemetry (SpanProcessor, LogRecordProcessor), MeasurementProcessor uses a **chain-of-responsibility pattern** where each processor is responsible for calling the next processor in the chain. This gives processors fine-grained control over the processing flow. | ||
|
||
### High Performance | ||
|
||
The implementation is designed for high-performance scenarios: | ||
- Minimal overhead when no processors are configured | ||
- Efficient processor chaining using closures | ||
- No unnecessary object creation in the hot path | ||
|
||
## Architecture | ||
|
||
``` | ||
Measurement → Processor 1 → Processor 2 → ... → Processor N → Aggregation | ||
``` | ||
|
||
Each processor can: | ||
1. **Pass through unchanged**: `next_processor(measurement)` | ||
2. **Modify and pass**: `next_processor(modified_measurement)` | ||
3. **Drop measurement**: Simply don't call `next_processor` | ||
4. **Split into multiple**: Call `next_processor` multiple times | ||
|
||
## Usage | ||
|
||
### Basic Setup | ||
|
||
```python | ||
from opentelemetry.sdk.metrics import MeterProvider | ||
from opentelemetry.sdk.metrics._internal.measurement_processor import ( | ||
BaggageMeasurementProcessor, | ||
StaticAttributeMeasurementProcessor, | ||
ValueRangeMeasurementProcessor, | ||
) | ||
|
||
# Create measurement processors | ||
processors = [ | ||
BaggageMeasurementProcessor(), # Add baggage as attributes | ||
StaticAttributeMeasurementProcessor({"env": "prod"}), # Add static attributes | ||
ValueRangeMeasurementProcessor(min_value=0), # Drop negative values | ||
] | ||
|
||
# Configure MeterProvider with processors | ||
meter_provider = MeterProvider( | ||
measurement_processors=processors, | ||
# ... other configuration | ||
) | ||
``` | ||
|
||
### Built-in Processors | ||
|
||
#### 1. BaggageMeasurementProcessor | ||
|
||
Extracts values from OpenTelemetry Baggage and adds them as measurement attributes, enabling end-to-end telemetry correlation. | ||
|
||
```python | ||
# Add all baggage as attributes with "baggage." prefix | ||
processor = BaggageMeasurementProcessor() | ||
|
||
# Add only specific baggage keys | ||
processor = BaggageMeasurementProcessor(baggage_keys=["user.id", "trace.id"]) | ||
``` | ||
|
||
#### 2. StaticAttributeMeasurementProcessor | ||
|
||
Adds static attributes to all measurements. | ||
|
||
```python | ||
processor = StaticAttributeMeasurementProcessor({ | ||
"environment": "production", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if this is a good idea. For StaticAttributes for all measurements, couldn't we model it as Resource Attributes. If applicable to a subset, then maybe use Meter attributes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could - this is just an example. IMO, none of these implementations I'm showing should be present in When we merge open-telemetry/opentelemetry-specification#4318 and start working on the actual implementation, the concrete processors should probably be contributed here: https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/processor |
||
"service": "api-server", | ||
"version": "1.0.0" | ||
}) | ||
``` | ||
|
||
#### 3. AttributeFilterMeasurementProcessor | ||
|
||
Removes specific attributes from measurements (useful for removing sensitive data). | ||
|
||
```python | ||
processor = AttributeFilterMeasurementProcessor([ | ||
"password", "secret", "auth_token" | ||
]) | ||
``` | ||
|
||
#### 4. ValueRangeMeasurementProcessor | ||
|
||
Drops measurements outside a specified value range. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. while this looks valid, I am not sure if its common to drop measurements based on value.... May I suggest another one which seems to me like a very valid use case - collapsing certain attributes to lower the cardinality. Described here: open-telemetry/opentelemetry-dotnet#6332 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||
|
||
```python | ||
# Drop negative values and values over 1000 | ||
processor = ValueRangeMeasurementProcessor(min_value=0, max_value=1000) | ||
|
||
# Only minimum limit | ||
processor = ValueRangeMeasurementProcessor(min_value=0) | ||
|
||
# Only maximum limit | ||
processor = ValueRangeMeasurementProcessor(max_value=100) | ||
``` | ||
|
||
### Custom Processors | ||
|
||
Create custom processors by implementing the `MeasurementProcessor` interface: | ||
|
||
```python | ||
from opentelemetry.sdk.metrics._internal.measurement_processor import MeasurementProcessor | ||
from opentelemetry.sdk.metrics._internal.measurement import Measurement | ||
from dataclasses import replace | ||
from typing import Callable | ||
|
||
class CustomMeasurementProcessor(MeasurementProcessor): | ||
def process( | ||
self, | ||
measurement: Measurement, | ||
next_processor: Callable[[Measurement], None] | ||
) -> None: | ||
# Example: Add timestamp attribute | ||
new_attributes = dict(measurement.attributes or {}) | ||
new_attributes["processed_at"] = str(int(time.time())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should show example of adding timestamp as a Metric attribute - is there any valid use-case for it? |
||
|
||
modified_measurement = replace(measurement, attributes=new_attributes) | ||
next_processor(modified_measurement) | ||
|
||
# Unit conversion processor | ||
class MetersToFeetProcessor(MeasurementProcessor): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this! A simpler one would be to do sec->msec. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Umm, I'm not sure. Perhaps someone with more experience with Metrics spec could answer this question. |
||
def process(self, measurement: Measurement, next_processor: Callable[[Measurement], None]) -> None: | ||
if measurement.instrument.name.endswith("_meters"): | ||
# Convert meters to feet | ||
feet_value = measurement.value * 3.28084 | ||
new_measurement = replace(measurement, value=feet_value) | ||
next_processor(new_measurement) | ||
else: | ||
next_processor(measurement) | ||
|
||
# Sampling processor | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure of the utility of this for Metrics! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||
class SamplingProcessor(MeasurementProcessor): | ||
def __init__(self, sample_rate: float): | ||
self.sample_rate = sample_rate | ||
|
||
def process(self, measurement: Measurement, next_processor: Callable[[Measurement], None]) -> None: | ||
if random.random() < self.sample_rate: | ||
next_processor(measurement) | ||
# else: drop the measurement | ||
``` | ||
|
||
## Integration with Existing Metrics SDK | ||
|
||
The MeasurementProcessor integrates seamlessly with the existing metrics SDK: | ||
|
||
1. **SdkConfiguration** - Extended to include `measurement_processor_chain` | ||
2. **MeasurementConsumer** - Modified to process measurements through the processor chain | ||
3. **MeterProvider** - Extended constructor to accept `measurement_processors` parameter | ||
|
||
### Configuration Flow | ||
|
||
``` | ||
MeterProvider(measurement_processors=[...]) | ||
↓ | ||
SdkConfiguration(measurement_processor_chain=...) | ||
↓ | ||
SynchronousMeasurementConsumer(sdk_config) | ||
↓ | ||
MeasurementProcessorChain.process(measurement, final_consumer) | ||
``` | ||
|
||
## Advanced Examples | ||
|
||
### Baggage-based Attribute Injection | ||
|
||
```python | ||
from opentelemetry import baggage | ||
from opentelemetry.context import attach, detach | ||
|
||
# Set baggage in context | ||
ctx = baggage.set_baggage("user.id", "12345") | ||
ctx = baggage.set_baggage("tenant.id", "acme-corp", context=ctx) | ||
token = attach(ctx) | ||
|
||
try: | ||
# This measurement will automatically get baggage.user.id and baggage.tenant.id attributes | ||
counter.add(1, {"operation": "login"}) | ||
finally: | ||
detach(token) | ||
``` | ||
|
||
### Complex Processing Chain | ||
|
||
```python | ||
processors = [ | ||
# 1. Add baggage for correlation | ||
BaggageMeasurementProcessor(baggage_keys=["user.id", "trace.id"]), | ||
|
||
# 2. Add environment info | ||
StaticAttributeMeasurementProcessor({ | ||
"environment": "production", | ||
"datacenter": "us-west-2" | ||
}), | ||
|
||
# 3. Remove sensitive attributes | ||
AttributeFilterMeasurementProcessor(["password", "secret", "token"]), | ||
|
||
# 4. Drop invalid measurements | ||
ValueRangeMeasurementProcessor(min_value=0), | ||
|
||
# 5. Custom processing | ||
CustomTimestampProcessor(), | ||
] | ||
``` | ||
|
||
### Error Handling | ||
|
||
Processors should handle errors gracefully to avoid breaking the metrics pipeline: | ||
|
||
```python | ||
class SafeProcessor(MeasurementProcessor): | ||
def process(self, measurement: Measurement, next_processor: Callable[[Measurement], None]) -> None: | ||
try: | ||
# Custom processing logic | ||
processed_measurement = self.transform(measurement) | ||
next_processor(processed_measurement) | ||
except Exception as e: | ||
# Log error but don't break the pipeline | ||
logger.warning(f"Processor error: {e}") | ||
# Pass through original measurement | ||
next_processor(measurement) | ||
``` | ||
|
||
--- | ||
|
||
**Note**: This implementation is experimental and the API may change based on community feedback and the final OpenTelemetry specification. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
#!/usr/bin/env python3 | ||
|
||
""" | ||
Example demonstrating the use of MeasurementProcessor with OpenTelemetry Python SDK. | ||
|
||
This example shows how to: | ||
1. Create custom measurement processors | ||
2. Chain multiple processors together | ||
3. Integrate with MeterProvider | ||
4. Use the provided utility processors | ||
""" | ||
|
||
import time | ||
from typing import Callable | ||
|
||
from opentelemetry import baggage, metrics | ||
from opentelemetry.context import attach, detach | ||
from opentelemetry.sdk.metrics import MeterProvider | ||
from opentelemetry.sdk.metrics._internal.measurement import Measurement | ||
from opentelemetry.sdk.metrics._internal.measurement_processor import ( | ||
AttributeFilterMeasurementProcessor, | ||
BaggageMeasurementProcessor, | ||
MeasurementProcessor, | ||
StaticAttributeMeasurementProcessor, | ||
ValueRangeMeasurementProcessor, | ||
) | ||
from opentelemetry.sdk.metrics.export import ( | ||
ConsoleMetricExporter, | ||
PeriodicExportingMetricReader, | ||
) | ||
|
||
|
||
class CustomMeasurementProcessor(MeasurementProcessor): | ||
"""Example of a custom measurement processor that adds a timestamp attribute.""" | ||
|
||
def process( | ||
self, | ||
measurement: Measurement, | ||
next_processor: Callable[[Measurement], None], | ||
) -> None: | ||
# Add current timestamp as an attribute | ||
from dataclasses import replace | ||
|
||
new_attributes = dict(measurement.attributes or {}) | ||
new_attributes["processed_at"] = str(int(time.time())) | ||
|
||
new_measurement = replace(measurement, attributes=new_attributes) | ||
next_processor(new_measurement) | ||
|
||
|
||
def main(): | ||
print("=== OpenTelemetry MeasurementProcessor Demo ===\n") | ||
|
||
# Create measurement processors | ||
processors = [ | ||
# Add baggage values as attributes (for correlation) | ||
BaggageMeasurementProcessor(), | ||
# Add static environment attributes | ||
StaticAttributeMeasurementProcessor( | ||
{"environment": "demo", "service": "measurement-processor-example"} | ||
), | ||
# Filter out any potentially sensitive attributes | ||
AttributeFilterMeasurementProcessor(["password", "secret"]), | ||
# Drop measurements with invalid values (negative for this demo) | ||
ValueRangeMeasurementProcessor(min_value=0), | ||
# Add custom processing | ||
CustomMeasurementProcessor(), | ||
] | ||
|
||
# Create metrics export pipeline | ||
console_exporter = ConsoleMetricExporter() | ||
reader = PeriodicExportingMetricReader( | ||
exporter=console_exporter, | ||
export_interval_millis=5000, # Export every 5 seconds | ||
) | ||
|
||
# Create MeterProvider with measurement processors | ||
meter_provider = MeterProvider( | ||
metric_readers=[reader], measurement_processors=processors | ||
) | ||
metrics.set_meter_provider(meter_provider) | ||
|
||
# Get meter and create instruments | ||
meter = metrics.get_meter(__name__) | ||
request_counter = meter.create_counter( | ||
"requests_total", description="Total number of requests" | ||
) | ||
response_time_histogram = meter.create_histogram( | ||
"response_time_seconds", description="Response time in seconds" | ||
) | ||
|
||
print("Recording measurements with different scenarios...\n") | ||
|
||
# Scenario 1: Regular measurement with baggage | ||
print("1. Recording with baggage context...") | ||
ctx = baggage.set_baggage("user.id", "12345") | ||
ctx = baggage.set_baggage("trace.id", "abc123", context=ctx) | ||
token = attach(ctx) | ||
|
||
try: | ||
request_counter.add(1, {"endpoint": "/api/users", "method": "GET"}) | ||
response_time_histogram.record( | ||
0.150, {"endpoint": "/api/users", "status": "200"} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: if status_code, should it be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
) | ||
finally: | ||
detach(token) | ||
|
||
# Scenario 2: Measurement with filtered attributes | ||
print("2. Recording with attributes that should be filtered...") | ||
request_counter.add( | ||
1, | ||
{ | ||
"endpoint": "/api/login", | ||
"method": "POST", | ||
"password": "should-be-filtered", # This will be filtered out | ||
"username": "alice", | ||
}, | ||
) | ||
|
||
# Scenario 3: Invalid measurement (negative value) - should be dropped | ||
print( | ||
"3. Recording invalid measurement (negative value - should be dropped)..." | ||
) | ||
response_time_histogram.record( | ||
-1.0, {"endpoint": "/api/error", "status": "500"} | ||
) | ||
|
||
# Scenario 4: Valid measurement without baggage | ||
print("4. Recording normal measurement...") | ||
request_counter.add(2, {"endpoint": "/api/products", "method": "GET"}) | ||
response_time_histogram.record( | ||
0.075, {"endpoint": "/api/products", "status": "200"} | ||
) | ||
|
||
print("\nWaiting for metrics to be exported...") | ||
print("(Check the console output above for processed measurements)") | ||
|
||
# Wait a bit for export | ||
time.sleep(6) | ||
|
||
# Cleanup | ||
meter_provider.shutdown() | ||
print("\nDemo completed!") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Uh oh!
There was an error while loading. Please reload this page.