Skip to content

Commit 262a0ff

Browse files
authored
fixing event manager so it can clear the queue (vocodedev#441)
* fixing event manager by moving async handle outside to accomodate future annotations * handling events * unit tests for events manager * typo
1 parent d0695ae commit 262a0ff

File tree

2 files changed

+82
-10
lines changed

2 files changed

+82
-10
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import pytest
2+
import asyncio
3+
4+
from vocode.streaming.models.events import PhoneCallEndedEvent, EventType
5+
from vocode.streaming.utils.events_manager import EventsManager
6+
7+
CONVERSATION_ID = "1"
8+
9+
10+
@pytest.mark.asyncio
11+
async def test_initialization():
12+
manager = EventsManager()
13+
assert manager.subscriptions == set()
14+
assert isinstance(manager.queue, asyncio.Queue)
15+
assert not manager.active
16+
17+
18+
@pytest.mark.asyncio
19+
async def test_publish_event():
20+
event = PhoneCallEndedEvent(
21+
conversation_id=CONVERSATION_ID, type=EventType.PHONE_CALL_ENDED
22+
) # Replace with actual Event creation
23+
manager = EventsManager([EventType.PHONE_CALL_ENDED])
24+
manager.publish_event(event)
25+
assert not manager.queue.empty()
26+
27+
28+
@pytest.mark.asyncio
29+
async def test_handle_event_default_implementation():
30+
event = PhoneCallEndedEvent(
31+
conversation_id=CONVERSATION_ID, type=EventType.PHONE_CALL_ENDED
32+
) # Replace with actual Event creation
33+
manager = EventsManager([EventType.PHONE_CALL_ENDED])
34+
await manager.handle_event(event)
35+
36+
37+
@pytest.mark.asyncio
38+
async def test_start_and_active_loop():
39+
event = PhoneCallEndedEvent(
40+
conversation_id=CONVERSATION_ID, type=EventType.PHONE_CALL_ENDED
41+
) # Replace with actual Event creation
42+
manager = EventsManager([EventType.PHONE_CALL_ENDED])
43+
asyncio.create_task(manager.start())
44+
manager.publish_event(event)
45+
await asyncio.sleep(0.1)
46+
manager.active = False
47+
48+
49+
@pytest.mark.asyncio
50+
async def test_flush_method():
51+
event = PhoneCallEndedEvent(
52+
conversation_id=CONVERSATION_ID, type=EventType.PHONE_CALL_ENDED
53+
)
54+
manager = EventsManager([EventType.PHONE_CALL_ENDED])
55+
for _ in range(5):
56+
manager.publish_event(event)
57+
await manager.flush(timeout=2)
58+
assert manager.queue.empty()
59+
60+
61+
@pytest.mark.asyncio
62+
async def test_queue_empty_and_timeout():
63+
manager = EventsManager([EventType.TRANSCRIPT])
64+
await manager.flush(timeout=0)
65+
assert manager.queue.empty()
Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
from __future__ import annotations
2-
32
import asyncio
4-
from typing import List
53

6-
from vocode.streaming.models.events import Event, EventType
4+
5+
from vocode.streaming.models.events import Event
6+
7+
8+
async def flush_event(event):
9+
if event:
10+
del event
711

812

913
class EventsManager:
10-
def __init__(self, subscriptions: List[EventType] = []):
14+
def __init__(self, subscriptions=None):
15+
if subscriptions is None:
16+
subscriptions = []
1117
self.queue: asyncio.Queue[Event] = asyncio.Queue()
1218
self.subscriptions = set(subscriptions)
1319
self.active = False
@@ -21,18 +27,19 @@ async def start(self):
2127
while self.active:
2228
try:
2329
event = await self.queue.get()
30+
await self.handle_event(event)
2431
except asyncio.QueueEmpty:
2532
await asyncio.sleep(1)
26-
await self.handle_event(event)
2733

2834
async def handle_event(self, event: Event):
29-
pass
35+
pass # Default implementation, can be overridden
3036

31-
async def flush(self):
32-
self.active = False
37+
async def flush(self, timeout=30):
3338
while True:
3439
try:
35-
event = self.queue.get_nowait()
36-
await self.handle_event(event)
40+
event = await asyncio.wait_for(self.queue.get(), timeout)
41+
await flush_event(event)
42+
except asyncio.TimeoutError:
43+
break
3744
except asyncio.QueueEmpty:
3845
break

0 commit comments

Comments
 (0)