Skip to content

Commit 2315e02

Browse files
VenuEmmadiatoulme
authored andcommitted
feat(isolationforestprocessor): add adaptive window sizing for dynamic resource optimization (open-telemetry#42752)
#### Description This PR adds **adaptive window sizing** functionality to the Isolation Forest processor, enabling dynamic adjustment of sliding window sizes based on traffic patterns, memory usage, and model stability. This enhancement optimizes resource utilization and improves anomaly detection accuracy for varying workload conditions. **Key Features Added:** - **Dynamic window scaling** that adapts to traffic velocity - **Memory-aware shrinkage** to prevent resource exhaustion - **Stability-based expansion** for optimal model performance - **Configurable adaptation parameters** with sensible defaults - **Enhanced logging** with adaptive statistics and metrics **Backward Compatibility:** - **100% backward compatible** - all existing configurations continue to work unchanged - **No breaking changes** - adaptive window is disabled by default (`enabled: false`) - **Preserves existing behavior** - when adaptive window is disabled, processor functions identically to before - **Optional feature** - users can opt-in to adaptive functionality via configuration #### Link to tracking issue Fixes open-telemetry#42751 #### Testing **Comprehensive test coverage added:** - **Unit tests** for all adaptive window algorithms (velocity tracking, memory monitoring, stability checking) - **Configuration validation tests** for all adaptive window parameters and edge cases - **Integration tests** verifying adaptive behavior with processor lifecycle - **Boundary condition tests** for min/max window sizes and adaptation rates - **Multi-model adaptive tests** ensuring compatibility with existing multi-model functionality - **Error handling tests** for invalid configurations and runtime edge cases **Test Results:** - All existing tests continue to pass (backward compatibility verified) - New adaptive functionality achieves >90% code coverage - Integration tests confirm proper adaptive behavior under various load patterns #### Documentation **Documentation updated:** - README.md enhanced with adaptive window sizing feature, configuration parameters, and usage examples - Configuration tables expanded with adaptive window parameters and best practices - Performance characteristics updated to reflect dynamic training costs with adaptive sizing --------- Co-authored-by: Antoine Toulme <[email protected]>
1 parent 2d2289e commit 2315e02

File tree

10 files changed

+1278
-40
lines changed

10 files changed

+1278
-40
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: isolationforestprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add adaptive window sizing feature that automatically adjusts window size based on traffic patterns, memory usage, and model stability
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [42751]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The adaptive window sizing feature enables dynamic adjustment of the isolation forest sliding window size based on:
20+
- Traffic velocity and throughput patterns
21+
- Memory usage and resource constraints
22+
- Model stability and performance metrics
23+
This enhancement improves resource utilization and anomaly detection accuracy for varying workload patterns.
24+
25+
# If your change doesn't affect end users or the exported elements of any package,
26+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
27+
# Optional: The change log or logs in which this entry should be included.
28+
# e.g. '[user]' or '[user, api]'
29+
# Include 'user' if the change is relevant to end users.
30+
# Include 'api' if there is a change to a library API.
31+
# Default: '[user]'
32+
change_logs: [user]

processor/isolationforestprocessor/README.md

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The **Isolation Forest processor** adds inline, unsupervised anomaly detection t
2424
| **Realtime Isolation Forest** | Builds an ensemble of random trees over a sliding window of recent data and assigns a 0–1 anomaly score on ingestion (≈ *O(log n)* per point). |
2525
| **Multi‑signal support** | Can be inserted into **traces**, **metrics**, **logs** pipelines – one config powers all three. |
2626
| **Per‑entity modelling** | `features` config lets you maintain a separate model per unique combination of resource / attribute keys (e.g. per‑pod, per‑service). |
27+
| **Adaptive Window Sizing** | Automatically adjusts window size based on traffic patterns, memory usage, and model stability for optimal performance and resource utilization. |
2728
| **Flexible output** | • Add an attribute `iforest.is_anomaly=true` <br>• Emit a gauge metric `iforest.anomaly_score` <br>• Drop anomalous telemetry entirely. |
2829
| **Config‑driven** | Tune tree count, subsample size, contamination rate, sliding‑window length, retraining interval, target metrics, and more – all in `collector.yml`. |
2930
| **Zero external deps** | Pure Go implementation; runs wherever the Collector does (edge, gateway, or backend). |
@@ -35,7 +36,8 @@ The **Isolation Forest processor** adds inline, unsupervised anomaly detection t
3536
1. **Training window** – The processor keeps up to `window_size` of the most recent data points for every feature‑group.
3637
2. **Periodic (re‑)training** – Every `training_interval`, it draws `subsample_size` points from that window and grows `forest_size` random isolation trees.
3738
3. **Scoring** – Each new point is pushed through the forest. Shorter average path length ⇒ higher anomaly score.
38-
4. **Post‑processing**
39+
4. **Adaptive sizing** – When enabled, window size automatically adjusts based on traffic velocity, memory usage, and model stability.
40+
5. **Post‑processing**
3941

4042
* If `add_anomaly_score: true`, a gauge metric `iforest.anomaly_score` is emitted with identical attributes/timestamp.
4143
* If the score ≥ `anomaly_threshold`, the original span/metric/log is flagged with `iforest.is_anomaly=true`.
@@ -61,6 +63,21 @@ Performance is linear in `forest_size` and logarithmic in `window_size`; a defau
6163
| `metrics_to_analyze` | \[]string | `[]` | Only these metric names are scored (metrics pipeline only). Blank ⇒ all. |
6264
| `add_anomaly_score` | bool | `false` | Emit `iforest.anomaly_score` metric. |
6365
| `drop_anomalous_data` | bool | `false` | Remove anomalous items from the batch instead of forwarding. |
66+
| `adaptive_window` | object | `null` | Enables adaptive window sizing (see Adaptive Window section below). |
67+
68+
### 🔄 Adaptive Window Configuration
69+
70+
When enabled, the processor automatically adjusts window size based on traffic patterns and resource constraints:
71+
72+
| Field | Type | Default | Notes |
73+
| -------------------------- | -------- | ------- | -------------------------------------------------------- |
74+
| `enabled` | bool | `false` | Enable adaptive window sizing. |
75+
| `min_window_size` | int | `1000` | Minimum window size (safety bound). |
76+
| `max_window_size` | int | `100000`| Maximum window size (memory protection). |
77+
| `memory_limit_mb` | int | `256` | Shrink window when memory usage exceeds this limit. |
78+
| `adaptation_rate` | float | `0.1` | Rate of window size changes (0.0-1.0). |
79+
| `velocity_threshold` | float | `50.0` | Samples/sec threshold for triggering window growth. |
80+
| `stability_check_interval` | duration | `5m` | How often to evaluate model stability for expansion. |
6481

6582
See the sample below for context.
6683

@@ -120,7 +137,7 @@ service:
120137
121138
### What the example does
122139

123-
| Signal | What’s scored | Feature grouping | Output | Notes |
140+
| Signal | What’s scored | Feature grouping | Output | Notes |
124141
| ----------- | ---------------------------------------------------------- | ------------------------------ | ----------------------------------------- | ------------------------------------------------------------------------------------------------ |
125142
| **Traces** | Span **duration** (ns) | `service.name`, `k8s.pod.name` | `iforest.is_anomaly` attr + optional drop | Use a span/trace exporter to route anomalies. |
126143
| **Metrics** | Only `system.cpu.utilization`, `system.memory.utilization` | Same | Attribute + score metric | The score appears as `iforest.anomaly_score` gauge. |
@@ -133,6 +150,7 @@ service:
133150
* **Tune `forest_size` vs. latency** – start with 100 trees; raise to 200–300 if scores look noisy.
134151
* **Use per‑entity models** – add `features` (service, pod, host) to avoid global comparisons across very different series.
135152
* **Let contamination drive threshold** – set `contamination_rate` to the % of traffic you’re comfortable labelling outlier; avoid hand‑tuning `anomaly_threshold`.
153+
* **Use adaptive window sizing** – enable for dynamic workloads; the processor will automatically grow windows during high traffic and shrink under memory pressure.
136154
* **Route anomalies** – keep `drop_anomalous_data=false` and add a simple \[routing‑processor] downstream to ship anomalies to a dedicated exporter or topic.
137155
* **Monitor model health** – the emitted `iforest.anomaly_score` metric is perfect for a Grafana panel; watch its distribution and adapt window / contamination accordingly.
138156

@@ -147,21 +165,26 @@ service:
147165
│ • Sliding window (per feature‑group) │
148166
│ • Forest of N trees (per feature‑group) │
149167
Telemetry ───▶ │ • Score calculator & anomaly decision │ ───▶ Next processor/exporter
168+
│ • Adaptive window sizing (optional) │
150169
└───────────────────────────────────────────────────┘
151170
```
152171

153-
*Training cost*: **O(window\_size × forest\_size × log subsample\_size)** every `training_interval`
154-
*Scoring cost*: **O(forest\_size × log subsample\_size)** per item
172+
173+
*Training cost*: **O(current_window_size × forest_size × log subsample_size)** every `training_interval`
174+
*Scoring cost*: **O(forest_size × log subsample_size)** per item
175+
176+
**Note:** With adaptive window sizing enabled, `current_window_size` dynamically adjusts between `min_window_size` and `max_window_size` based on traffic patterns and memory constraints, making training costs adaptive to workload conditions.
177+
155178

156179
---
157180

158181
## 🤝 Contributing
159182

160183
* **Bugs / Questions** – please open an issue in the fork first.
184+
* **Recently added**: Adaptive window sizing for dynamic traffic patterns.
161185
* **Planned enhancements**
162186

163187
* Multivariate scoring (multiple numeric attributes per point).
164-
* Adaptive window size.
165188
* Expose Prometheus counters for training time / CPU cost.
166189

167190
PRs welcome – please include unit tests and doc updates.

processor/isolationforestprocessor/config.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,23 @@ type Config struct {
2828
Features FeatureConfig `mapstructure:"features"`
2929
Models []ModelConfig `mapstructure:"models"`
3030
Performance PerformanceConfig `mapstructure:"performance"`
31+
32+
// Adaptive window sizing configuration
33+
AdaptiveWindow *AdaptiveWindowConfig `mapstructure:"adaptive_window"`
34+
}
35+
36+
// AdaptiveWindowConfig configures automatic window size adjustment based on traffic patterns
37+
type AdaptiveWindowConfig struct {
38+
// Core configuration
39+
Enabled bool `mapstructure:"enabled"` // Enable adaptive sizing
40+
MinWindowSize int `mapstructure:"min_window_size"` // Minimum samples to keep
41+
MaxWindowSize int `mapstructure:"max_window_size"` // Maximum samples (memory protection)
42+
MemoryLimitMB int `mapstructure:"memory_limit_mb"` // Auto-shrink when exceeded
43+
AdaptationRate float64 `mapstructure:"adaptation_rate"` // Adjustment speed (0.0-1.0)
44+
45+
// Optional parameters with defaults
46+
VelocityThreshold float64 `mapstructure:"velocity_threshold"` // Grow when >N samples/sec
47+
StabilityCheckInterval string `mapstructure:"stability_check_interval"` // Check model accuracy interval
3148
}
3249

3350
type FeatureConfig struct {
@@ -79,6 +96,17 @@ func createDefaultConfig() component.Config {
7996
BatchSize: 1000,
8097
ParallelWorkers: 4,
8198
},
99+
100+
// Default adaptive window configuration (disabled by default for backward compatibility)
101+
AdaptiveWindow: &AdaptiveWindowConfig{
102+
Enabled: false, // Disabled by default - backward compatibility
103+
MinWindowSize: 1000, // Match MinSamples for consistency
104+
MaxWindowSize: 100000, // Reasonable upper bound
105+
MemoryLimitMB: 256, // Half of total processor memory
106+
AdaptationRate: 0.1, // Conservative adjustment speed
107+
VelocityThreshold: 50, // Default growth threshold
108+
StabilityCheckInterval: "5m", // Check model stability every 5 minutes
109+
},
82110
}
83111
}
84112

@@ -118,9 +146,74 @@ func (cfg *Config) Validate() error {
118146
return errors.New("at least one feature type must be configured")
119147
}
120148

149+
// Validate adaptive window configuration
150+
if cfg.AdaptiveWindow != nil {
151+
if err := cfg.validateAdaptiveWindow(); err != nil {
152+
return fmt.Errorf("adaptive_window validation failed: %w", err)
153+
}
154+
}
155+
156+
return nil
157+
}
158+
159+
// validateAdaptiveWindow validates the adaptive window configuration
160+
func (cfg *Config) validateAdaptiveWindow() error {
161+
aw := cfg.AdaptiveWindow
162+
163+
if aw.MinWindowSize <= 0 {
164+
return errors.New("min_window_size must be positive")
165+
}
166+
167+
if aw.MaxWindowSize <= aw.MinWindowSize {
168+
return errors.New("max_window_size must be greater than min_window_size")
169+
}
170+
171+
// Ensure consistency with main config
172+
if aw.MinWindowSize < cfg.MinSamples {
173+
return fmt.Errorf("adaptive_window.min_window_size (%d) should be >= min_samples (%d) for consistency",
174+
aw.MinWindowSize, cfg.MinSamples)
175+
}
176+
177+
if aw.MemoryLimitMB <= 0 {
178+
return errors.New("memory_limit_mb must be positive")
179+
}
180+
181+
// Memory limit should be reasonable compared to total processor memory
182+
if aw.MemoryLimitMB > cfg.Performance.MaxMemoryMB {
183+
return fmt.Errorf("adaptive_window.memory_limit_mb (%d) should not exceed performance.max_memory_mb (%d)",
184+
aw.MemoryLimitMB, cfg.Performance.MaxMemoryMB)
185+
}
186+
187+
if aw.AdaptationRate < 0.0 || aw.AdaptationRate > 1.0 {
188+
return errors.New("adaptation_rate must be between 0.0 and 1.0")
189+
}
190+
191+
if aw.VelocityThreshold < 0 {
192+
return errors.New("velocity_threshold must be non-negative")
193+
}
194+
195+
if aw.StabilityCheckInterval != "" {
196+
if _, err := time.ParseDuration(aw.StabilityCheckInterval); err != nil {
197+
return fmt.Errorf("stability_check_interval is not a valid duration: %w", err)
198+
}
199+
}
200+
121201
return nil
122202
}
123203

204+
// IsAdaptiveWindowEnabled returns true if adaptive window sizing is enabled
205+
func (cfg *Config) IsAdaptiveWindowEnabled() bool {
206+
return cfg.AdaptiveWindow != nil && cfg.AdaptiveWindow.Enabled
207+
}
208+
209+
// GetStabilityCheckInterval returns the stability check interval duration
210+
func (cfg *Config) GetStabilityCheckInterval() (time.Duration, error) {
211+
if cfg.AdaptiveWindow == nil || cfg.AdaptiveWindow.StabilityCheckInterval == "" {
212+
return 5 * time.Minute, nil // Default
213+
}
214+
return time.ParseDuration(cfg.AdaptiveWindow.StabilityCheckInterval)
215+
}
216+
124217
func (cfg *Config) GetTrainingWindowDuration() (time.Duration, error) {
125218
return time.ParseDuration(cfg.TrainingWindow)
126219
}

0 commit comments

Comments
 (0)