Skip to content

Commit 7bf366c

Browse files
committed
feat: wip peak finder
1 parent 019f42a commit 7bf366c

27 files changed

+1423
-418
lines changed

docs/processing_module_guide.md

Lines changed: 279 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ The **processing module** (`src/processing/`) is the heart of the rust-photoacou
3636
- **Real-time Audio Analysis**: General-purpose audio processing and analysis
3737
- **Signal Filtering**: Digital filter chains for noise reduction and signal enhancement
3838
- **Multi-channel Processing**: Dual-channel audio operations and differential analysis
39+
- **Analytical Computing**: Real-time spectral analysis and peak detection for frequency tracking
40+
- **Concentration Calculation**: Polynomial-based gas concentration computation from photoacoustic signals
3941

4042
---
4143

@@ -61,10 +63,11 @@ graph TD
6163
```rust,ignore
6264
// Main processing module structure
6365
pub mod processing {
64-
pub mod consumer; // ProcessingConsumer - main processing orchestrator
65-
pub mod graph; // ProcessingGraph - node container and execution engine
66-
pub mod nodes; // ProcessingNode trait and implementations
67-
pub mod result; // ProcessingResult and analysis structures
66+
pub mod consumer; // ProcessingConsumer - main processing orchestrator
67+
pub mod graph; // ProcessingGraph - node container and execution engine
68+
pub mod nodes; // ProcessingNode trait and implementations
69+
pub mod result; // ProcessingResult and analysis structures
70+
pub mod computing_nodes; // ComputingNode - specialized analytical processing nodes
6871
}
6972
```
7073

@@ -74,6 +77,8 @@ pub mod processing {
7477
use crate::acquisition::AudioFrame; // Input audio data
7578
use crate::preprocessing::{Filter, DifferentialCalculator}; // Processing algorithms
7679
use crate::config::processing::*; // Configuration structures
80+
use realfft::{RealFftPlanner, RealToComplex}; // FFT processing for ComputingNodes
81+
use num_complex; // Complex number arithmetic for spectral analysis
7782
```
7883

7984
---
@@ -280,6 +285,118 @@ pub enum MixStrategy {
280285

281286
---
282287

288+
### Computing Nodes (Analytical Processing)
289+
290+
Computing Nodes are specialized ProcessingNodes that implement the **pass-through pattern** while performing analytical computations on the data. They allow the original signal to flow unchanged to the next node while extracting analytical information and publishing it to a shared state for use by other nodes.
291+
292+
#### Key Features:
293+
- **Pass-through Processing**: Data flows unchanged, maintaining pipeline integrity
294+
- **Parallel Analysis**: Performs computations without affecting data latency
295+
- **Shared State**: Results are published to thread-safe shared state (`Arc<RwLock<ComputingSharedData>>`)
296+
- **Real-time Analytics**: Enables real-time frequency tracking and concentration calculation
297+
298+
#### PeakFinderNode (Spectral Analysis)
299+
**Purpose**: Performs real-time FFT-based spectral analysis to detect frequency peaks while passing data through unchanged.
300+
301+
```rust,ignore
302+
use rust_photoacoustic::processing::computing_nodes::PeakFinderNode;
303+
304+
// Create peak finder with custom configuration
305+
let peak_finder = PeakFinderNode::new("peak_detector".to_string())
306+
.with_detection_threshold(0.1) // 10% amplitude threshold
307+
.with_frequency_range(900.0, 1100.0) // Focus on 1kHz range
308+
.with_fft_size(2048) // FFT window size
309+
.with_smoothing_factor(0.7) // Temporal smoothing
310+
.with_sample_rate(48000);
311+
312+
// Add to processing graph
313+
graph.add_node(Box::new(peak_finder))?;
314+
```
315+
316+
**Configuration Options**:
317+
```rust,ignore
318+
// Builder pattern configuration
319+
let peak_finder = PeakFinderNode::new("peak_detector".to_string())
320+
.with_detection_threshold(0.15) // Minimum peak amplitude (0.0-1.0)
321+
.with_frequency_range(800.0, 1200.0) // Analysis frequency range (Hz)
322+
.with_fft_size(4096) // FFT window size (power of 2)
323+
.with_sample_rate(44100) // Sample rate adaptation
324+
.with_smoothing_factor(0.8); // Moving average smoothing (0.0-1.0)
325+
```
326+
327+
**Algorithmic Features**:
328+
- **FFT Spectral Analysis**: Uses `realfft` for efficient real-valued FFT
329+
- **Hann Windowing**: Reduces spectral leakage for better frequency resolution
330+
- **Coherence Filtering**: Requires multiple consecutive detections for validation
331+
- **Adaptive Thresholding**: Configurable amplitude threshold for peak detection
332+
- **Frequency Range Limiting**: Focus analysis on specific frequency bands
333+
334+
**Input/Output**:
335+
- **Input**: `AudioFrame`, `SingleChannel`, or `DualChannel` (pass-through)
336+
- **Output**: Same as input (unchanged)
337+
- **Shared State**: Publishes `peak_frequency` and `peak_amplitude` to `ComputingSharedData`
338+
339+
**Accessing Results**:
340+
```rust,ignore
341+
// Get shared state for reading results
342+
let shared_state = peak_finder.get_shared_state();
343+
if let Ok(state) = shared_state.read() {
344+
if let Some(freq) = state.peak_frequency {
345+
println!("Detected peak at {} Hz", freq);
346+
}
347+
if let Some(amp) = state.peak_amplitude {
348+
println!("Peak amplitude: {}", amp);
349+
}
350+
}
351+
```
352+
353+
#### ComputingSharedData Structure
354+
**Purpose**: Thread-safe shared data structure for communicating analytical results between computing nodes and other processing nodes.
355+
356+
```rust,ignore
357+
use rust_photoacoustic::processing::computing_nodes::{ComputingSharedData, SharedComputingState};
358+
359+
// Shared data structure
360+
pub struct ComputingSharedData {
361+
pub peak_frequency: Option<f32>, // Detected resonance frequency (Hz)
362+
pub peak_amplitude: Option<f32>, // Normalized peak amplitude (0.0-1.0)
363+
pub concentration_ppm: Option<f32>, // Calculated gas concentration (ppm)
364+
pub polynomial_coefficients: [f64; 5], // 4th-degree polynomial coefficients
365+
pub last_update: SystemTime, // Timestamp for data validation
366+
}
367+
368+
// Type alias for easy access
369+
pub type SharedComputingState = Arc<RwLock<ComputingSharedData>>;
370+
```
371+
372+
**Integration with Other Nodes**:
373+
```rust,ignore
374+
// Example: Dynamic filter that adapts based on detected peak frequency
375+
impl ProcessingNode for DynamicFilterNode {
376+
fn process(&mut self, input: ProcessingData) -> Result<ProcessingData> {
377+
// Read current peak frequency from shared state
378+
if let Ok(state) = self.computing_state.read() {
379+
if let Some(peak_freq) = state.peak_frequency {
380+
// Adapt filter center frequency
381+
self.update_center_frequency(peak_freq)?;
382+
}
383+
}
384+
385+
// Apply adaptive filtering
386+
self.apply_filter(input)
387+
}
388+
}
389+
```
390+
391+
**Use Cases for Computing Nodes**:
392+
1. **Frequency Tracking**: Real-time adaptation of filter parameters based on detected resonance
393+
2. **Signal Quality Assessment**: Continuous monitoring of signal characteristics
394+
3. **Concentration Calculation**: Polynomial-based gas concentration from peak amplitudes
395+
4. **Performance Monitoring**: Real-time analysis of signal processing effectiveness
396+
5. **Adaptive Processing**: Dynamic parameter adjustment based on signal conditions
397+
398+
---
399+
283400
### Output Nodes
284401

285402
#### StreamingNode (Real-Time Streaming Output)
@@ -374,14 +491,22 @@ GET /api/stream/audio/fast/123e4567-e89b-12d3-a456-426614174000
374491
graph LR
375492
A[AudioFrame] --> B[InputNode]
376493
B --> C[DualChannel]
377-
C --> D[FilterNode]
378-
D --> E[DualChannel Filtered]
379-
E --> F[DifferentialNode]
380-
F --> G[SingleChannel Diff]
381-
G --> H[PhotoacousticOutputNode]
382-
H --> I[PhotoacousticResult]
494+
C --> D[PeakFinderNode]
495+
D --> E[DualChannel + Analytics]
496+
E --> F[FilterNode]
497+
F --> G[DualChannel Filtered]
498+
G --> H[DifferentialNode]
499+
H --> I[SingleChannel Diff]
500+
I --> J[PhotoacousticOutputNode]
501+
J --> K[PhotoacousticResult]
502+
503+
D -.-> L[ComputingSharedData]
504+
L -.-> M[Peak Frequency]
505+
L -.-> N[Peak Amplitude]
383506
```
384507

508+
**Note**: The PeakFinderNode demonstrates the pass-through pattern - data flows unchanged while analytical results are published to shared state.
509+
385510
### Data Transformation Chain
386511

387512
1. **AudioFrame****InputNode****DualChannel**
@@ -460,8 +585,21 @@ processing:
460585
detection_threshold: 0.05
461586
analysis_window_size: 2048
462587

588+
# Computing nodes for analytical processing
589+
- id: "peak_detector"
590+
node_type: "computing_peak_finder"
591+
parameters:
592+
detection_threshold: 0.1
593+
frequency_min: 800.0
594+
frequency_max: 1200.0
595+
fft_size: 2048
596+
smoothing_factor: 0.7
597+
coherence_threshold: 3
598+
463599
connections:
464600
- from: "input"
601+
to: "peak_detector"
602+
- from: "peak_detector"
465603
to: "bandpass_filter"
466604
- from: "bandpass_filter"
467605
to: "differential"
@@ -614,6 +752,85 @@ fn create_parallel_processing_chain() -> Result<ProcessingGraph> {
614752
}
615753
```
616754

755+
### Computing Node Integration
756+
757+
```rust,ignore
758+
use rust_photoacoustic::processing::computing_nodes::PeakFinderNode;
759+
760+
fn create_analytical_processing_chain() -> Result<ProcessingGraph> {
761+
let mut graph = ProcessingGraph::new();
762+
763+
// 1. Input node
764+
graph.add_node(Box::new(InputNode::new("input".to_string())))?;
765+
766+
// 2. Peak finder for real-time frequency analysis (pass-through)
767+
let peak_finder = Box::new(
768+
PeakFinderNode::new("peak_detector".to_string())
769+
.with_detection_threshold(0.1)
770+
.with_frequency_range(800.0, 1200.0)
771+
.with_fft_size(2048)
772+
.with_smoothing_factor(0.7)
773+
);
774+
graph.add_node(peak_finder)?;
775+
776+
// 3. Adaptive bandpass filter (could use peak frequency from shared state)
777+
let filter = Box::new(BandpassFilter::new(1000.0, 100.0));
778+
let filter_node = Box::new(FilterNode::new(
779+
"adaptive_bandpass".to_string(),
780+
filter,
781+
ChannelTarget::Both
782+
));
783+
graph.add_node(filter_node)?;
784+
785+
// 4. Differential calculation
786+
let diff_calc = Box::new(SimpleDifferential::new());
787+
let diff_node = Box::new(DifferentialNode::new(
788+
"differential".to_string(),
789+
diff_calc
790+
));
791+
graph.add_node(diff_node)?;
792+
793+
// 5. Final analysis
794+
let output = Box::new(
795+
PhotoacousticOutputNode::new("output".to_string())
796+
.with_detection_threshold(0.05)
797+
);
798+
graph.add_node(output)?;
799+
800+
// Connect nodes - peak finder in pass-through mode
801+
graph.connect("input", "peak_detector")?;
802+
graph.connect("peak_detector", "adaptive_bandpass")?;
803+
graph.connect("adaptive_bandpass", "differential")?;
804+
graph.connect("differential", "output")?;
805+
806+
graph.set_output_node("output")?;
807+
808+
Ok(graph)
809+
}
810+
811+
// Example of accessing computing results
812+
fn monitor_peak_detection(peak_finder: &PeakFinderNode) {
813+
let shared_state = peak_finder.get_shared_state();
814+
815+
if let Ok(state) = shared_state.read() {
816+
match (state.peak_frequency, state.peak_amplitude) {
817+
(Some(freq), Some(amp)) => {
818+
println!("Peak detected: {} Hz at {:.2}% amplitude", freq, amp * 100.0);
819+
820+
// Check data freshness
821+
let age = state.last_update.elapsed().unwrap_or_default();
822+
if age.as_millis() < 100 {
823+
println!("Data is fresh ({}ms old)", age.as_millis());
824+
} else {
825+
println!("Warning: Stale data ({}ms old)", age.as_millis());
826+
}
827+
}
828+
_ => println!("No peak detected"),
829+
}
830+
}
831+
}
832+
```
833+
617834
### Custom Processing Node
618835

619836
```rust,ignore
@@ -780,6 +997,58 @@ impl ProcessingGraph {
780997
}
781998
```
782999

1000+
### 5. Computing Node Best Practices
1001+
1002+
```rust,ignore
1003+
// ✅ Good: Pass-through behavior with analytical computation
1004+
impl ProcessingNode for PeakFinderNode {
1005+
fn process(&mut self, input: ProcessingData) -> Result<ProcessingData> {
1006+
// Perform analysis on input data
1007+
self.analyze_spectrum(&input)?;
1008+
1009+
// ✅ Return input unchanged (pass-through)
1010+
Ok(input)
1011+
}
1012+
}
1013+
1014+
// ✅ Efficient shared state access
1015+
impl ProcessingNode for AdaptiveFilterNode {
1016+
fn process(&mut self, input: ProcessingData) -> Result<ProcessingData> {
1017+
// ✅ Non-blocking read attempt
1018+
if let Ok(state) = self.computing_state.try_read() {
1019+
if let Some(peak_freq) = state.peak_frequency {
1020+
// Adapt parameters based on fresh data
1021+
if state.last_update.elapsed().unwrap_or_default().as_millis() < 100 {
1022+
self.update_center_frequency(peak_freq)?;
1023+
}
1024+
}
1025+
}
1026+
1027+
// Process with current parameters
1028+
self.apply_filter(input)
1029+
}
1030+
}
1031+
1032+
// ✅ Robust error handling in computing nodes
1033+
impl ProcessingNode for PeakFinderNode {
1034+
fn process(&mut self, input: ProcessingData) -> Result<ProcessingData> {
1035+
// ✅ Graceful handling of analysis failures
1036+
match self.analyze_spectrum(&input) {
1037+
Ok(peak_data) => {
1038+
self.update_shared_state(peak_data);
1039+
}
1040+
Err(e) => {
1041+
// Log error but don't fail the pipeline
1042+
log::warn!("Peak analysis failed: {}, continuing with pass-through", e);
1043+
}
1044+
}
1045+
1046+
// Always return input unchanged
1047+
Ok(input)
1048+
}
1049+
}
1050+
```
1051+
7831052
---
7841053

7851054
## Advanced Usage

rust/config.example.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,16 @@ processing:
272272
order: 2 # optional order second-order (-12dB/octave) (must be even)
273273
# for first-order use a combination of lowpass and highpass filters
274274

275+
# Peak finder for real-time frequency analysis (pass-through)
276+
# Note: fft_size uses photoacoustic.frame_size and sample_rate uses photoacoustic.sample_rate
277+
- id: "peak_detector"
278+
node_type: "computing_peak_finder"
279+
parameters:
280+
detection_threshold: 0.1 # 10% amplitude threshold
281+
frequency_min: 1800.0 # Lower bound (Hz)
282+
frequency_max: 2200.0 # Upper bound (Hz)
283+
smoothing_factor: 0.7 # Moving average smoothing
284+
275285
# Apply gain adjustment (+3 dB amplification)
276286
- id: "gain_amplifier"
277287
node_type: "gain"
@@ -302,6 +312,8 @@ processing:
302312
- from: streaming_post_differential
303313
to: bandpass_filter
304314
- from: bandpass_filter
315+
to: peak_detector
316+
- from: peak_detector
305317
to: streaming_bandpass_filter
306318
- from: streaming_bandpass_filter
307319
to: processed_recorder

0 commit comments

Comments
 (0)