-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaudio_agc.py
More file actions
154 lines (125 loc) · 4.91 KB
/
audio_agc.py
File metadata and controls
154 lines (125 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""
Automatic Gain Control (AGC) for Audio
Dynamically adjusts audio levels to maintain consistent volume
"""
import numpy as np
import logging
from collections import deque
logger = logging.getLogger(__name__)
class AutomaticGainControl:
"""
Automatic Gain Control for audio input
Maintains consistent audio levels without manual amplification tuning
"""
def __init__(
self,
target_rms: float = 3000.0,
min_gain: float = 0.1,
max_gain: float = 100.0,
attack_time: float = 0.1,
release_time: float = 0.3,
sample_rate: int = 16000
):
"""
Initialize AGC
Args:
target_rms: Target RMS level to maintain
min_gain: Minimum gain multiplier
max_gain: Maximum gain multiplier
attack_time: How quickly gain decreases when audio is too loud (seconds)
release_time: How quickly gain increases when audio is too quiet (seconds)
sample_rate: Audio sample rate in Hz
"""
self.target_rms = target_rms
self.min_gain = min_gain
self.max_gain = max_gain
self.sample_rate = sample_rate
# Calculate smoothing coefficients
# Attack = how fast we reduce gain (when too loud)
# Release = how fast we increase gain (when too quiet)
self.attack_coeff = 1.0 - np.exp(-1.0 / (attack_time * sample_rate))
self.release_coeff = 1.0 - np.exp(-1.0 / (release_time * sample_rate))
# Current gain value
self.current_gain = 1.0
# RMS history for smoothing
self.rms_history = deque(maxlen=10)
logger.info(f"AGC initialized: target_rms={target_rms}, "
f"gain_range=[{min_gain}, {max_gain}]")
def process(self, audio_data: np.ndarray) -> np.ndarray:
"""
Apply AGC to audio data
Args:
audio_data: Input audio as numpy array (int16 or float32)
Returns:
Gain-adjusted audio (same dtype as input)
"""
if len(audio_data) == 0:
return audio_data
# Convert to float for processing
input_dtype = audio_data.dtype
if input_dtype == np.int16:
audio_float = audio_data.astype(np.float32) / 32768.0
else:
audio_float = audio_data.astype(np.float32)
# Calculate RMS (Root Mean Square) of current audio
current_rms = np.sqrt(np.mean(audio_float ** 2))
# Add to history for smoothing
self.rms_history.append(current_rms)
# Use median of recent RMS values to reduce noise sensitivity
if len(self.rms_history) >= 3:
smooth_rms = np.median(list(self.rms_history))
else:
smooth_rms = current_rms
# Calculate desired gain
if smooth_rms > 0:
desired_gain = self.target_rms / (smooth_rms * 32768.0)
else:
desired_gain = self.current_gain
# Clip to min/max range
desired_gain = np.clip(desired_gain, self.min_gain, self.max_gain)
# Smooth gain changes (attack/release)
if desired_gain < self.current_gain:
# Audio too loud - use attack (fast reduction)
coeff = self.attack_coeff
else:
# Audio too quiet - use release (slower increase)
coeff = self.release_coeff
# Exponential smoothing
self.current_gain = (coeff * desired_gain +
(1.0 - coeff) * self.current_gain)
# Apply gain
audio_float = audio_float * self.current_gain
# Soft clipping to prevent harsh distortion
audio_float = np.tanh(audio_float * 0.9) / 0.9
# Convert back to original dtype
if input_dtype == np.int16:
audio_output = np.clip(audio_float * 32768.0, -32768, 32767).astype(np.int16)
else:
audio_output = audio_float.astype(input_dtype)
# Log occasionally for debugging
if np.random.random() < 0.01: # 1% of the time
logger.debug(f"AGC: rms={current_rms*32768:.1f}, "
f"gain={self.current_gain:.2f}x, "
f"output_rms={np.sqrt(np.mean((audio_output.astype(np.float32)/32768.0)**2))*32768:.1f}")
return audio_output
def reset(self):
"""Reset AGC state"""
self.current_gain = 1.0
self.rms_history.clear()
logger.debug("AGC reset")
def get_current_gain(self) -> float:
"""Get current gain value"""
return self.current_gain
def get_stats(self) -> dict:
"""Get current AGC statistics"""
if self.rms_history:
avg_rms = np.mean(list(self.rms_history)) * 32768.0
else:
avg_rms = 0.0
return {
"current_gain": self.current_gain,
"target_rms": self.target_rms,
"average_rms": avg_rms,
"min_gain": self.min_gain,
"max_gain": self.max_gain
}