Skip to content

Commit 0dc05d9

Browse files
c1lylquerel
andauthored
[otap-dataflow benchmark] filter processor benchmarks + internal telemetry (open-telemetry#1448)
Add filter processor scenarios to the nightly benchmark suite Collect internal metrics inside the filter processor tracking, number of signals before and after the filtering ```rust /// Pdata-oriented metrics for the OTAP FilterProcessor #[metric_set(name = "filter.processor.pdata.metrics")] #[derive(Debug, Default, Clone)] pub struct FilterPdataMetrics { /// Number of log signals consumed #[metric(unit = "{log}")] pub log_signals_consumed: Counter<u64>, /// Number of span signals consumed #[metric(unit = "{span}")] pub span_signals_consumed: Counter<u64>, /// Number of log signals sent #[metric(unit = "{log}")] pub log_signals_sent: Counter<u64>, /// Number of span signals sent #[metric(unit = "{span}")] pub span_signals_sent: Counter<u64>, } ``` --------- Co-authored-by: Laurent Quérel <[email protected]>
1 parent 9e5f4f6 commit 0dc05d9

File tree

14 files changed

+1025
-50
lines changed

14 files changed

+1025
-50
lines changed

.github/workflows/pipeline-perf-test-nightly.yml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ jobs:
8787
cd tools/pipeline_perf_test
8888
python orchestrator/run_orchestrator.py --debug --config test_suites/integration/nightly/backpressure-docker.yaml
8989
90+
- name: Run filter performance test log suite
91+
run: |
92+
cd tools/pipeline_perf_test
93+
python orchestrator/run_orchestrator.py --debug --config test_suites/integration/nightly/filter-docker.yaml
94+
9095
- name: Upload syslog results for processing
9196
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
9297
with:
@@ -99,6 +104,12 @@ jobs:
99104
name: backpressure-nightly-results
100105
path: tools/pipeline_perf_test/results/nightly_backpressure/gh-actions-benchmark/*.json
101106

107+
- name: Upload filter results for processing
108+
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
109+
with:
110+
name: filter-nightly-results
111+
path: tools/pipeline_perf_test/results/nightly_filter/gh-actions-benchmark/*.json
112+
102113
- name: Add benchmark link to job summary
103114
run: |
104115
echo "### Benchmark Results" >> $GITHUB_STEP_SUMMARY
@@ -129,6 +140,15 @@ jobs:
129140
merge-multiple: true
130141
path: backpressure_results
131142

143+
144+
- name: Download filter artifacts
145+
uses: actions/download-artifact@634f93cb2916e3fdff6788551b99b062d0335ce0 # v5.0.0
146+
with:
147+
pattern: filter-nightly-results*
148+
merge-multiple: true
149+
path: filter_results
150+
151+
132152
- name: Consolidate syslog benchmark data
133153
run: |
134154
echo "Consolidating benchmark JSON files..."
@@ -159,6 +179,21 @@ jobs:
159179
echo "Consolidated benchmark data:"
160180
cat backpressure_output.json
161181
182+
- name: Consolidate filter benchmark data
183+
run: |
184+
echo "Consolidating benchmark JSON files..."
185+
find filter_results -name "*.json" -type f | while read file; do
186+
echo "Processing: $file"
187+
cat "$file"
188+
echo
189+
done
190+
191+
# Combine all benchmark JSON files into a single output (find them recursively)
192+
find filter_results -name "*.json" -type f -exec cat {} \; | jq -s 'map(.[])' > filter_output.json
193+
194+
echo "Consolidated benchmark data:"
195+
cat filter_output.json
196+
162197
- name: Update benchmark data
163198
uses: benchmark-action/github-action-benchmark@4bdcce38c94cec68da58d012ac24b7b1155efe8b # v1.20.7
164199
with:
@@ -183,6 +218,18 @@ jobs:
183218
auto-push: true
184219
save-data-file: true
185220

221+
- name: Update filter benchmark data
222+
uses: benchmark-action/github-action-benchmark@4bdcce38c94cec68da58d012ac24b7b1155efe8b # v1.20.7
223+
with:
224+
tool: "customSmallerIsBetter"
225+
output-file-path: filter_output.json
226+
gh-pages-branch: benchmarks
227+
max-items-in-chart: 100
228+
github-token: ${{ secrets.GITHUB_TOKEN }}
229+
benchmark-data-dir-path: "docs/benchmarks/nightly/filter"
230+
auto-push: true
231+
save-data-file: true
232+
186233
- name: Add benchmark link to job summary
187234
run: |
188235
echo "### Benchmark Results" >> $GITHUB_STEP_SUMMARY

rust/otap-dataflow/configs/fake-filter-debug-noop.yaml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ nodes:
1414
dispatch_strategy: round_robin
1515
config:
1616
traffic_config:
17-
max_signal_count: 1000
18-
max_batch_size: 100
19-
signals_per_second: 100
17+
max_batch_size: 1000
18+
signals_per_second: 100000
19+
metric_weight: 0
20+
trace_weight: 0
2021
log_weight: 100
2122
registry_path: https://github.com/open-telemetry/semantic-conventions.git[model]
2223
filter:
@@ -50,6 +51,8 @@ nodes:
5051
record_attributes:
5152
- key: gen_ai.system
5253
value: openai
54+
- key: ios.app.state
55+
value: active
5356
severity_texts: []
5457
severity_number: null
5558
bodies: []

rust/otap-dataflow/crates/otap/src/filter_processor.rs

Lines changed: 63 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,31 +9,35 @@
99
//! ToDo: Collect telemetry like number of filtered data is removed datapoints
1010
1111
use self::config::Config;
12+
use self::metrics::FilterPdataMetrics;
1213
use crate::{OTAP_PROCESSOR_FACTORIES, pdata::OtapPdata};
1314
use async_trait::async_trait;
1415
use linkme::distributed_slice;
15-
1616
use otap_df_config::SignalType;
1717
use otap_df_config::error::Error as ConfigError;
1818
use otap_df_config::node::NodeUserConfig;
1919
use otap_df_engine::config::ProcessorConfig;
2020
use otap_df_engine::context::PipelineContext;
21+
use otap_df_engine::control::NodeControlMsg;
2122
use otap_df_engine::error::{Error, ProcessorErrorKind, format_error_sources};
2223
use otap_df_engine::local::processor as local;
2324
use otap_df_engine::message::Message;
2425
use otap_df_engine::node::NodeId;
2526
use otap_df_engine::processor::ProcessorWrapper;
2627
use otap_df_pdata::otap::OtapArrowRecords;
28+
use otap_df_telemetry::metrics::MetricSet;
2729
use serde_json::Value;
2830
use std::sync::Arc;
2931

3032
mod config;
33+
mod metrics;
3134
/// The URN for the filter processor
3235
pub const FILTER_PROCESSOR_URN: &str = "urn:otel:filter:processor";
3336

3437
/// processor that outputs all data received to stdout
3538
pub struct FilterProcessor {
3639
config: Config,
40+
metrics: MetricSet<FilterPdataMetrics>,
3741
}
3842

3943
/// Factory function to create a FilterProcessor.
@@ -71,20 +75,19 @@ impl FilterProcessor {
7175
/// Creates a new FilterProcessor
7276
#[must_use]
7377
#[allow(dead_code)]
74-
pub fn new(config: Config, _pipeline_ctx: PipelineContext) -> Self {
75-
FilterProcessor { config }
78+
pub fn new(config: Config, pipeline_ctx: PipelineContext) -> Self {
79+
let metrics = pipeline_ctx.register_metrics::<FilterPdataMetrics>();
80+
FilterProcessor { config, metrics }
7681
}
7782

7883
/// Creates a new FilterProcessor from a configuration object
79-
pub fn from_config(
80-
_pipeline_ctx: PipelineContext,
81-
config: &Value,
82-
) -> Result<Self, ConfigError> {
84+
pub fn from_config(pipeline_ctx: PipelineContext, config: &Value) -> Result<Self, ConfigError> {
85+
let metrics = pipeline_ctx.register_metrics::<FilterPdataMetrics>();
8386
let config: Config =
8487
serde_json::from_value(config.clone()).map_err(|e| ConfigError::InvalidUserConfig {
8588
error: e.to_string(),
8689
})?;
87-
Ok(FilterProcessor { config })
90+
Ok(FilterProcessor { config, metrics })
8891
}
8992
}
9093

@@ -96,8 +99,13 @@ impl local::Processor<OtapPdata> for FilterProcessor {
9699
effect_handler: &mut local::EffectHandler<OtapPdata>,
97100
) -> Result<(), Error> {
98101
match msg {
99-
Message::Control(_control) => {
100-
// ToDo: add internal telemetry that will be sent out here
102+
Message::Control(control) => {
103+
if let NodeControlMsg::CollectTelemetry {
104+
mut metrics_reporter,
105+
} = control
106+
{
107+
_ = metrics_reporter.report(&mut self.metrics);
108+
}
101109
Ok(())
102110
}
103111
Message::PData(pdata) => {
@@ -113,32 +121,52 @@ impl local::Processor<OtapPdata> for FilterProcessor {
113121
arrow_records
114122
}
115123
SignalType::Logs => {
116-
self.config
117-
.log_filters()
118-
.filter(arrow_records)
119-
.map_err(|e| {
120-
let source_detail = format_error_sources(&e);
121-
Error::ProcessorError {
122-
processor: effect_handler.processor_id(),
123-
kind: ProcessorErrorKind::Other,
124-
error: format!("Filter error: {e}"),
125-
source_detail,
126-
}
127-
})?
124+
// get logs
125+
let (filtered_arrow_records, log_signals_consumed, log_signals_filtered) =
126+
self.config
127+
.log_filters()
128+
.filter(arrow_records)
129+
.map_err(|e| {
130+
let source_detail = format_error_sources(&e);
131+
Error::ProcessorError {
132+
processor: effect_handler.processor_id(),
133+
kind: ProcessorErrorKind::Other,
134+
error: format!("Filter error: {e}"),
135+
source_detail,
136+
}
137+
})?;
138+
139+
// get logs after
140+
self.metrics.log_signals_consumed.add(log_signals_consumed);
141+
self.metrics.log_signals_filtered.add(log_signals_filtered);
142+
143+
filtered_arrow_records
144+
}
145+
SignalType::Traces => {
146+
// get spans
147+
let (filtered_arrow_records, span_signals_consumed, span_signals_filtered) =
148+
self.config
149+
.trace_filters()
150+
.filter(arrow_records)
151+
.map_err(|e| {
152+
let source_detail = format_error_sources(&e);
153+
Error::ProcessorError {
154+
processor: effect_handler.processor_id(),
155+
kind: ProcessorErrorKind::Other,
156+
error: format!("Filter error: {e}"),
157+
source_detail,
158+
}
159+
})?;
160+
161+
self.metrics
162+
.span_signals_consumed
163+
.add(span_signals_consumed);
164+
self.metrics
165+
.span_signals_filtered
166+
.add(span_signals_filtered);
167+
168+
filtered_arrow_records
128169
}
129-
SignalType::Traces => self
130-
.config
131-
.trace_filters()
132-
.filter(arrow_records)
133-
.map_err(|e| {
134-
let source_detail = format_error_sources(&e);
135-
Error::ProcessorError {
136-
processor: effect_handler.processor_id(),
137-
kind: ProcessorErrorKind::Other,
138-
error: format!("Filter error: {e}"),
139-
source_detail,
140-
}
141-
})?,
142170
};
143171
effect_handler
144172
.send_message(OtapPdata::new(context, filtered_arrow_records.into()))
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Metrics for the OTAP FilterProcessor node.
5+
use otap_df_telemetry::instrument::Counter;
6+
use otap_df_telemetry_macros::metric_set;
7+
8+
/// Pdata-oriented metrics for the OTAP FilterProcessor
9+
#[metric_set(name = "filter.processor.pdata.metrics")]
10+
#[derive(Debug, Default, Clone)]
11+
pub struct FilterPdataMetrics {
12+
/// Number of log signals consumed
13+
#[metric(unit = "{log}")]
14+
pub log_signals_consumed: Counter<u64>,
15+
/// Number of span signals consumed
16+
#[metric(unit = "{span}")]
17+
pub span_signals_consumed: Counter<u64>,
18+
19+
/// Number of log signals filtered
20+
#[metric(unit = "{log}")]
21+
pub log_signals_filtered: Counter<u64>,
22+
/// Number of span signals filtered
23+
#[metric(unit = "{span}")]
24+
pub span_signals_filtered: Counter<u64>,
25+
}

rust/otap-dataflow/crates/pdata/src/otap/filter.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -511,14 +511,16 @@ fn apply_filter(
511511
payload: &mut OtapArrowRecords,
512512
payload_type: ArrowPayloadType,
513513
filter: &BooleanArray,
514-
) -> Result<()> {
514+
) -> Result<(u64, u64)> {
515515
let record_batch = payload
516516
.get(payload_type)
517517
.ok_or_else(|| Error::RecordBatchNotFound { payload_type })?;
518+
let num_rows_before = record_batch.num_rows() as u64;
518519
let filtered_record_batch = arrow::compute::filter_record_batch(record_batch, filter)
519520
.map_err(|e| Error::ColumnLengthMismatch { source: e })?;
521+
let num_rows_removed = num_rows_before - (filtered_record_batch.num_rows() as u64);
520522
payload.set(payload_type, filtered_record_batch);
521-
Ok(())
523+
Ok((num_rows_before, num_rows_removed))
522524
}
523525

524526
/// update_child_record_batch_filter() takes an child record batch, with it's respective filter

rust/otap-dataflow/crates/pdata/src/otap/filter/logs.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,10 @@ impl LogFilter {
103103
}
104104

105105
/// take a logs payload and return the filtered result
106-
pub fn filter(&self, mut logs_payload: OtapArrowRecords) -> Result<OtapArrowRecords> {
106+
pub fn filter(
107+
&self,
108+
mut logs_payload: OtapArrowRecords,
109+
) -> Result<(OtapArrowRecords, u64, u64)> {
107110
let (resource_attr_filter, log_record_filter, log_attr_filter) = if let Some(include_config) =
108111
&self.include
109112
&& let Some(exclude_config) = &self.exclude
@@ -136,7 +139,13 @@ impl LogFilter {
136139
include_config.create_filters(&logs_payload, false)?
137140
} else {
138141
// both include and exclude is none
139-
return Ok(logs_payload);
142+
let num_rows = logs_payload
143+
.get(ArrowPayloadType::Logs)
144+
.ok_or_else(|| Error::RecordBatchNotFound {
145+
payload_type: ArrowPayloadType::Logs,
146+
})?
147+
.num_rows() as u64;
148+
return Ok((logs_payload, num_rows, num_rows));
140149
};
141150

142151
let (log_record_filter, child_record_batch_filters) = self.sync_up_filters(
@@ -146,17 +155,17 @@ impl LogFilter {
146155
log_attr_filter,
147156
)?;
148157

149-
apply_filter(
158+
let (log_rows_before, log_rows_removed) = apply_filter(
150159
&mut logs_payload,
151160
ArrowPayloadType::Logs,
152161
&log_record_filter,
153162
)?;
154163

155164
for (payload_type, filter) in child_record_batch_filters {
156-
apply_filter(&mut logs_payload, payload_type, &filter)?;
165+
let (_, _) = apply_filter(&mut logs_payload, payload_type, &filter)?;
157166
}
158167

159-
Ok(logs_payload)
168+
Ok((logs_payload, log_rows_before, log_rows_removed))
160169
}
161170

162171
/// this function takes the filters for each record batch and makes sure that incomplete

0 commit comments

Comments
 (0)