@@ -1149,6 +1149,171 @@ def _run(gate, wall_time):
11491149 assert elapsed < sleep_sec * 1.75
11501150
11511151
1152+ class TestPoolExhaustionUnderContention :
1153+ """Tests for pool behavior when more callers than pool size."""
1154+
1155+ def test_callers_exceed_pool_size_no_deadlock (self ):
1156+ """4 concurrent callers with pool_size=2 should all complete (no deadlock)."""
1157+ sleep_sec = 0.05
1158+ model = _SlowMockModel (sleep_sec = sleep_sec )
1159+ pool_size = 2
1160+ num_callers = 4
1161+ with (
1162+ patch ('utils.stt.vad_gate._vad_model' , model ),
1163+ patch ('utils.stt.vad_gate._vad_torch' , None ),
1164+ patch ('utils.stt.vad_gate._vad_model_pool' , None ),
1165+ patch ('utils.stt.vad_gate.VAD_GATE_MODEL_POOL_SIZE' , pool_size ),
1166+ ):
1167+ gates = [
1168+ VADStreamingGate (sample_rate = 16000 , channels = 1 , mode = 'active' , uid = f'u{ i } ' , session_id = f's{ i } ' )
1169+ for i in range (num_callers )
1170+ ]
1171+ barrier = threading .Barrier (num_callers + 1 )
1172+ results = [None ] * num_callers
1173+ errors = [None ] * num_callers
1174+ chunk = _make_pcm (40 )
1175+
1176+ def _run (idx ):
1177+ try :
1178+ barrier .wait (timeout = 5 )
1179+ results [idx ] = gates [idx ].process_audio (chunk , 1000.0 )
1180+ except Exception as e :
1181+ errors [idx ] = e
1182+
1183+ threads = [threading .Thread (target = _run , args = (i ,)) for i in range (num_callers )]
1184+ for t in threads :
1185+ t .start ()
1186+ start = time .perf_counter ()
1187+ barrier .wait (timeout = 5 )
1188+ for t in threads :
1189+ t .join (timeout = 10 )
1190+ elapsed = time .perf_counter () - start
1191+
1192+ # All callers should complete (no deadlock)
1193+ for i in range (num_callers ):
1194+ assert errors [i ] is None , f'Caller { i } got error: { errors [i ]} '
1195+ assert results [i ] is not None , f'Caller { i } got no result (deadlock?)'
1196+
1197+ # With pool_size=2, 4 callers should take ~2x the single-call time (2 batches)
1198+ # Allow generous margin for CI jitter
1199+ assert elapsed < sleep_sec * 6 , f'Took { elapsed :.3f} s — possible starvation'
1200+
1201+ def test_pool_recovery_after_contention (self ):
1202+ """After contention burst, pool models should be returned and reusable."""
1203+ model = _SlowMockModel (sleep_sec = 0.01 )
1204+ pool_size = 2
1205+ with (
1206+ patch ('utils.stt.vad_gate._vad_model' , model ),
1207+ patch ('utils.stt.vad_gate._vad_torch' , None ),
1208+ patch ('utils.stt.vad_gate._vad_model_pool' , None ),
1209+ patch ('utils.stt.vad_gate.VAD_GATE_MODEL_POOL_SIZE' , pool_size ),
1210+ ):
1211+ # First: burst of contention
1212+ gates = [
1213+ VADStreamingGate (sample_rate = 16000 , channels = 1 , mode = 'active' , uid = f'u{ i } ' , session_id = f's{ i } ' )
1214+ for i in range (4 )
1215+ ]
1216+ chunk = _make_pcm (40 )
1217+ threads = []
1218+ for g in gates :
1219+ t = threading .Thread (target = lambda gate : gate .process_audio (chunk , 1000.0 ), args = (g ,))
1220+ threads .append (t )
1221+ t .start ()
1222+ for t in threads :
1223+ t .join (timeout = 10 )
1224+
1225+ # After contention: pool should be fully returned (pool_size items available)
1226+ from utils .stt .vad_gate import _vad_model_pool
1227+
1228+ assert _vad_model_pool is not None
1229+ assert (
1230+ _vad_model_pool .qsize () == pool_size
1231+ ), f'Pool has { _vad_model_pool .qsize ()} models, expected { pool_size } '
1232+
1233+ # New caller should work immediately
1234+ new_gate = VADStreamingGate (sample_rate = 16000 , channels = 1 , mode = 'active' , uid = 'new' , session_id = 'new' )
1235+ out = new_gate .process_audio (chunk , 2000.0 )
1236+ assert out is not None
1237+
1238+
1239+ class TestLongSessionStress :
1240+ """Tests for long-session invariants (large counters, checkpoint churn)."""
1241+
1242+ def test_metrics_consistent_after_many_chunks (self ):
1243+ """After 5000 chunks, metrics counters should be internally consistent."""
1244+ gate = VADStreamingGate (sample_rate = 16000 , channels = 1 , mode = 'active' , uid = 'stress' , session_id = 'stress' )
1245+ chunk = _make_pcm (30 )
1246+ t = 1000.0
1247+ n_chunks = 5000
1248+
1249+ # Alternate speech/silence in blocks to exercise all state transitions
1250+ for i in range (n_chunks ):
1251+ block = (i // 50 ) % 2
1252+ _set_vad_speech (block == 0 ) # 50 speech, 50 silence, repeat
1253+ gate .process_audio (chunk , t + i * 0.03 )
1254+
1255+ metrics = gate .get_metrics ()
1256+ assert metrics ['chunks_total' ] == n_chunks
1257+ assert metrics ['chunks_speech' ] + metrics ['chunks_silence' ] == n_chunks
1258+ assert metrics ['bytes_received' ] == len (chunk ) * n_chunks
1259+ assert metrics ['bytes_sent' ] <= metrics ['bytes_received' ]
1260+ assert metrics ['bytes_skipped' ] >= 0
1261+ assert abs (metrics ['bytes_saved_ratio' ] - metrics ['bytes_skipped' ] / metrics ['bytes_received' ]) < 1e-9
1262+
1263+ def test_mapper_checkpoint_cap_with_many_transitions (self ):
1264+ """DgWallMapper should cap checkpoints at _MAX_CHECKPOINTS even with 1000+ transitions."""
1265+ gate = VADStreamingGate (sample_rate = 16000 , channels = 1 , mode = 'active' , uid = 'stress' , session_id = 'stress' )
1266+ chunk = _make_pcm (30 )
1267+ t = 1000.0
1268+ n_transitions = 1200 # Well above _MAX_CHECKPOINTS (500)
1269+
1270+ for cycle in range (n_transitions ):
1271+ # Speech burst (2 chunks) → silence past hangover
1272+ _set_vad_speech (True )
1273+ for j in range (2 ):
1274+ gate .process_audio (chunk , t )
1275+ t += 0.03
1276+ _set_vad_speech (False )
1277+ # 140 silence chunks at 30ms each = 4200ms > 4000ms hangover
1278+ for j in range (140 ):
1279+ gate .process_audio (chunk , t )
1280+ t += 0.03
1281+
1282+ # Mapper checkpoints should be capped
1283+ cps = gate .dg_wall_mapper ._checkpoints
1284+ assert (
1285+ len (cps ) <= DgWallMapper ._MAX_CHECKPOINTS
1286+ ), f'Checkpoints { len (cps )} exceeds cap { DgWallMapper ._MAX_CHECKPOINTS } '
1287+ assert len (cps ) > 0 , 'Should have at least some checkpoints'
1288+
1289+ # Remap should still work (no crash) and produce monotonic results
1290+ if len (cps ) >= 2 :
1291+ dg_times = [0.5 , 1.0 , 5.0 , 10.0 ]
1292+ wall_times = [gate .dg_wall_mapper .dg_to_wall_rel (t ) for t in dg_times ]
1293+ for i in range (1 , len (wall_times )):
1294+ assert (
1295+ wall_times [i ] >= wall_times [i - 1 ]
1296+ ), f'Non-monotonic remap: dg={ dg_times [i ]} -> wall={ wall_times [i ]} < { wall_times [i - 1 ]} '
1297+
1298+ def test_json_log_after_long_session (self ):
1299+ """to_json_log should produce valid output after extended session."""
1300+ gate = VADStreamingGate (sample_rate = 16000 , channels = 1 , mode = 'active' , uid = 'stress' , session_id = 'stress' )
1301+ chunk = _make_pcm (30 )
1302+ t = 1000.0
1303+
1304+ # Run 2000 chunks of alternating speech/silence
1305+ for i in range (2000 ):
1306+ _set_vad_speech ((i // 100 ) % 2 == 0 )
1307+ gate .process_audio (chunk , t + i * 0.03 )
1308+
1309+ log = gate .to_json_log ()
1310+ assert log ['event' ] == 'vad_gate_metrics'
1311+ assert log ['chunks_total' ] == 2000
1312+ assert 0.0 <= log ['speech_ratio' ] <= 1.0
1313+ assert 0.0 <= log ['estimated_savings_pct' ] <= 100.0
1314+ assert log ['session_duration_sec' ] > 0
1315+
1316+
11521317class TestSpeechThreshold :
11531318 """Tests for configurable speech threshold."""
11541319
0 commit comments