Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 172 additions & 14 deletions relay-event-normalization/src/normalize/span/ai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ impl CalculatedCost {
pub fn total(&self) -> f64 {
self.input + self.output
}

/// Returns `true` if the cost calculation resulted in negative values.
///
/// This indicates token misalignment where cached/reasoning tokens exceed total tokens.
pub fn is_negative(&self) -> bool {
self.input < 0.0 || self.output < 0.0
}
}

/// Calculates the total cost for a model call.
Expand Down Expand Up @@ -107,14 +114,32 @@ pub fn calculate_costs(model_cost: &ModelCostV2, tokens: UsedTokens) -> Option<C
Some(CalculatedCost { input, output })
}

/// Status of AI cost calculation for metrics tracking.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum CostCalculationStatus {
/// Cost calculation succeeded with non-negative values.
Positive,
/// Cost calculation resulted in negative values (token misalignment).
Negative,
/// Cost calculation failed (no usage tokens found or no model cost available).
Failed,
}

/// Calculates the cost of an AI model based on the model cost and the tokens used.
/// Calculated cost is in US dollars.
fn extract_ai_model_cost_data(model_cost: Option<&ModelCostV2>, data: &mut SpanData) {
let Some(model_cost) = model_cost else { return };
///
/// Returns the status of the cost calculation for metrics tracking.
fn extract_ai_model_cost_data(
model_cost: Option<&ModelCostV2>,
data: &mut SpanData,
) -> CostCalculationStatus {
let Some(model_cost) = model_cost else {
return CostCalculationStatus::Failed;
};

let used_tokens = UsedTokens::from_span_data(&*data);
let Some(costs) = calculate_costs(model_cost, used_tokens) else {
return;
return CostCalculationStatus::Failed;
};

data.gen_ai_cost_total_tokens
Expand All @@ -125,6 +150,12 @@ fn extract_ai_model_cost_data(model_cost: Option<&ModelCostV2>, data: &mut SpanD
.set_value(Value::F64(costs.input).into());
data.gen_ai_cost_output_tokens
.set_value(Value::F64(costs.output).into());

if costs.is_negative() {
CostCalculationStatus::Negative
} else {
CostCalculationStatus::Positive
}
}

/// Maps AI-related measurements (legacy) to span data.
Expand Down Expand Up @@ -171,7 +202,13 @@ fn set_total_tokens(data: &mut SpanData) {
}

/// Extract the additional data into the span
fn extract_ai_data(data: &mut SpanData, duration: f64, ai_model_costs: &ModelCosts) {
///
/// Returns the status of cost calculation if attempted, None otherwise.
fn extract_ai_data(
data: &mut SpanData,
duration: f64,
ai_model_costs: &ModelCosts,
) -> Option<CostCalculationStatus> {
// Extracts the response tokens per second
if data.gen_ai_response_tokens_per_second.value().is_none()
&& duration > 0.0
Expand All @@ -185,6 +222,8 @@ fn extract_ai_data(data: &mut SpanData, duration: f64, ai_model_costs: &ModelCos
}

// Extracts the total cost of the AI model used
// Note: We need the if-let pattern here because we need to borrow data mutably inside.
#[allow(clippy::manual_map)]
if let Some(model_id) = data
.gen_ai_request_model
.value()
Expand All @@ -195,21 +234,28 @@ fn extract_ai_data(data: &mut SpanData, duration: f64, ai_model_costs: &ModelCos
.and_then(|val| val.as_str())
})
{
extract_ai_model_cost_data(ai_model_costs.cost_per_token(model_id), data)
Some(extract_ai_model_cost_data(
ai_model_costs.cost_per_token(model_id),
data,
))
} else {
None
}
}

/// Enrich the AI span data
///
/// Returns the status of cost calculation if attempted, None otherwise.
fn enrich_ai_span_data(
span_data: &mut Annotated<SpanData>,
span_op: &Annotated<OperationType>,
measurements: &Annotated<Measurements>,
duration: f64,
model_costs: Option<&ModelCosts>,
operation_type_map: Option<&AiOperationTypeMap>,
) {
) -> Option<CostCalculationStatus> {
if !is_ai_span(span_data, span_op.value()) {
return;
return None;
}

let data = span_data.get_or_insert_with(SpanData::default);
Expand All @@ -218,20 +264,27 @@ fn enrich_ai_span_data(

set_total_tokens(data);

if let Some(model_costs) = model_costs {
extract_ai_data(data, duration, model_costs);
}
let cost_status = if let Some(model_costs) = model_costs {
extract_ai_data(data, duration, model_costs)
} else {
None
};

if let Some(operation_type_map) = operation_type_map {
infer_ai_operation_type(data, span_op.value(), operation_type_map);
}

cost_status
}

/// Enrich the AI span data
///
/// Returns the status of cost calculation if attempted, None otherwise.
pub fn enrich_ai_span(
span: &mut Span,
model_costs: Option<&ModelCosts>,
operation_type_map: Option<&AiOperationTypeMap>,
) {
) -> Option<CostCalculationStatus> {
let duration = span
.get_value("span.duration")
.and_then(|v| v.as_f64())
Expand All @@ -244,7 +297,7 @@ pub fn enrich_ai_span(
duration,
model_costs,
operation_type_map,
);
)
}

/// Extract the ai data from all of an event's spans
Expand All @@ -264,7 +317,7 @@ pub fn enrich_ai_event_data(
.as_mut()
.and_then(|c| c.get_mut::<TraceContext>())
{
enrich_ai_span_data(
let _ = enrich_ai_span_data(
&mut trace_context.data,
&trace_context.op,
&event.measurements,
Expand All @@ -282,7 +335,7 @@ pub fn enrich_ai_event_data(
.and_then(|v| v.as_f64())
.unwrap_or(0.0);

enrich_ai_span_data(
let _ = enrich_ai_span_data(
&mut span.data,
&span.op,
&span.measurements,
Expand Down Expand Up @@ -439,6 +492,111 @@ mod tests {
output: -7.0,
}
");

// Verify that negative costs are detected
assert!(cost.is_negative());
}

#[test]
fn test_is_negative_with_positive_costs() {
let cost = CalculatedCost {
input: 10.0,
output: 20.0,
};
assert!(!cost.is_negative());
}

#[test]
fn test_is_negative_with_negative_input() {
let cost = CalculatedCost {
input: -5.0,
output: 20.0,
};
assert!(cost.is_negative());
}

#[test]
fn test_is_negative_with_negative_output() {
let cost = CalculatedCost {
input: 10.0,
output: -3.0,
};
assert!(cost.is_negative());
}

#[test]
fn test_is_negative_with_zero_costs() {
let cost = CalculatedCost {
input: 0.0,
output: 0.0,
};
assert!(!cost.is_negative());
}

#[test]
fn test_extract_ai_model_cost_data_returns_failed_when_no_model_cost() {
let mut data = SpanData {
gen_ai_usage_input_tokens: Annotated::new(100.0.into()),
gen_ai_usage_output_tokens: Annotated::new(50.0.into()),
..Default::default()
};

let status = extract_ai_model_cost_data(None, &mut data);
assert_eq!(status, CostCalculationStatus::Failed);
}

#[test]
fn test_extract_ai_model_cost_data_returns_failed_when_no_tokens() {
let mut data = SpanData::default();
let model_cost = ModelCostV2 {
input_per_token: 1.0,
output_per_token: 2.0,
output_reasoning_per_token: 0.0,
input_cached_per_token: 0.5,
input_cache_write_per_token: 0.75,
};

let status = extract_ai_model_cost_data(Some(&model_cost), &mut data);
assert_eq!(status, CostCalculationStatus::Failed);
}

#[test]
fn test_extract_ai_model_cost_data_returns_positive_for_valid_calculation() {
let mut data = SpanData {
gen_ai_usage_input_tokens: Annotated::new(100.0.into()),
gen_ai_usage_output_tokens: Annotated::new(50.0.into()),
..Default::default()
};
let model_cost = ModelCostV2 {
input_per_token: 1.0,
output_per_token: 2.0,
output_reasoning_per_token: 0.0,
input_cached_per_token: 0.5,
input_cache_write_per_token: 0.75,
};

let status = extract_ai_model_cost_data(Some(&model_cost), &mut data);
assert_eq!(status, CostCalculationStatus::Positive);
}

#[test]
fn test_extract_ai_model_cost_data_returns_negative_for_misaligned_tokens() {
let mut data = SpanData {
gen_ai_usage_input_tokens: Annotated::new(1.0.into()),
gen_ai_usage_input_tokens_cached: Annotated::new(10.0.into()),
gen_ai_usage_output_tokens: Annotated::new(50.0.into()),
..Default::default()
};
let model_cost = ModelCostV2 {
input_per_token: 1.0,
output_per_token: 2.0,
output_reasoning_per_token: 0.0,
input_cached_per_token: 0.5,
input_cache_write_per_token: 0.75,
};

let status = extract_ai_model_cost_data(Some(&model_cost), &mut data);
assert_eq!(status, CostCalculationStatus::Negative);
}

#[test]
Expand Down
48 changes: 41 additions & 7 deletions relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use relay_dynamic_config::{
CombinedMetricExtractionConfig, ErrorBoundary, GlobalConfig, ProjectConfig,
};
use relay_event_normalization::AiOperationTypeMap;
use relay_event_normalization::span::ai::enrich_ai_span;
use relay_event_normalization::{
BorrowedSpanOpDefaults, ClientHints, CombinedMeasurementsConfig, FromUserAgentInfo,
GeoIpLookup, MeasurementsConfig, ModelCosts, PerformanceScoreConfig, RawUserAgentInfo,
Expand Down Expand Up @@ -69,17 +68,24 @@ pub async fn process(
ErrorBoundary::Ok(ref config) if config.is_enabled() => Some(config),
_ => None,
};

// Extract metadata we need before we move/borrow managed_envelope mutably
let client_name =
crate::utils::client_name_tag(managed_envelope.envelope().meta().client_name()).to_owned();
let client_ip_opt = managed_envelope
.envelope()
.meta()
.client_addr()
.map(IpAddr::from);

let normalize_span_config = NormalizeSpanConfig::new(
ctx.config,
ctx.global_config,
ctx.project_info.config(),
managed_envelope,
managed_envelope
.envelope()
.meta()
.client_addr()
.map(IpAddr::from),
client_ip_opt,
geo_lookup,
&client_name,
);

let client_ip = managed_envelope.envelope().meta().client_addr();
Expand Down Expand Up @@ -262,6 +268,8 @@ struct NormalizeSpanConfig<'a> {
/// An initialized GeoIP lookup.
geo_lookup: &'a GeoIpLookup,
span_op_defaults: BorrowedSpanOpDefaults<'a>,
/// The SDK client name for metrics tracking.
client_name: &'a str,
}

impl<'a> NormalizeSpanConfig<'a> {
Expand All @@ -272,6 +280,7 @@ impl<'a> NormalizeSpanConfig<'a> {
managed_envelope: &ManagedEnvelope,
client_ip: Option<IpAddr>,
geo_lookup: &'a GeoIpLookup,
client_name: &'a str,
) -> Self {
let aggregator_config = config.aggregator_config_for(MetricNamespace::Spans);

Expand Down Expand Up @@ -301,6 +310,7 @@ impl<'a> NormalizeSpanConfig<'a> {
client_ip,
geo_lookup,
span_op_defaults: global_config.span_op_defaults.borrow(),
client_name,
}
}
}
Expand Down Expand Up @@ -357,6 +367,7 @@ fn normalize(
client_ip,
geo_lookup,
span_op_defaults,
client_name,
} = config;

set_segment_attributes(annotated_span);
Expand Down Expand Up @@ -456,7 +467,29 @@ fn normalize(

normalize_performance_score(span, performance_score);

enrich_ai_span(span, ai_model_costs, ai_operation_type_map);
let cost_status = relay_event_normalization::span::ai::enrich_ai_span(
span,
ai_model_costs,
ai_operation_type_map,
);

// Emit AI cost calculation metric if a cost calculation was attempted
if let Some(status) = cost_status {
use relay_event_normalization::span::ai::CostCalculationStatus;
let status_tag = match status {
CostCalculationStatus::Positive => "calculation_positive",
CostCalculationStatus::Negative => "calculation_negative",
CostCalculationStatus::Failed => "calculation_failed",
};

let origin = crate::utils::ai_origin_tag(crate::envelope::ClientName::from(client_name));

relay_statsd::metric!(
counter(RelayCounters::AiCostCalculation) += 1,
status = status_tag,
origin = origin,
);
}

tag_extraction::extract_measurements(span, is_mobile);

Expand Down Expand Up @@ -818,6 +851,7 @@ mod tests {
client_ip: Some(IpAddr("2.125.160.216".to_owned())),
geo_lookup: &GEO_LOOKUP,
span_op_defaults: Default::default(),
client_name: "other",
}
}

Expand Down
Loading
Loading