Skip to content

Commit 3024b7c

Browse files
authored
1. TypingIndicator Concurrency & Argument fixes (#187)
* 1. TypingIndicator Class (typing_indicator.py) Problems Fixed: ❌ Used threading.Timer (synchronous) in async code → Event loop blocking ❌ Timer couldn't properly execute async callbacks ❌ No race condition protection for concurrent start() calls ❌ Incorrect time unit handling (Timer uses seconds, code expected milliseconds) ❌ Complex and error-prone callback pattern Solutions Implemented: ✅ Replaced threading.Timer with asyncio.Task for proper async operation ✅ Created continuous _typing_loop() that runs asynchronously ✅ Added _running flag to prevent race conditions ✅ Proper interval conversion (milliseconds to seconds) ✅ Clean async/await pattern throughout ✅ Graceful cancellation handling with asyncio.CancelledError ✅ Made stop() async to properly await task cancellation 2. AgentApplication Class (agent_application.py) Problems Fixed: ❌ Called self.typing.stop() without await (sync call to async method) Solutions Implemented: ✅ Changed to await self.typing.stop() for proper async execution 📋 Key Improvements: No Thread Blocking: Uses asyncio tasks instead of threads, preventing event loop blocking Proper Async Patterns: All methods properly use async/await Race Condition Protection: The _running flag prevents multiple simultaneous starts Clean Shutdown: Proper task cancellation and cleanup Better Error Handling: Catches and logs exceptions without crashing Correct Timing: Proper conversion from milliseconds to seconds for asyncio.sleep() 🎯 Benefits: Performance: No blocking of the async event loop Reliability: No deadlocks or race conditions Maintainability: Cleaner, more Pythonic async code Scalability: Works correctly with concurrent operations Safety: Proper resource cleanup and cancellation handling Both files now follow best practices for async Python code and are free from threading bugs! * Fix interval conversion in TypingIndicator class constructor * Fix duplicate raise statement in TypingIndicator class * Refactor TypingIndicator class for improved concurrency and add unit tests for typing activity * Refactor TypingIndicator to use seconds for interval tracking and logging * Fix error handling in typing loop to reset task on failure * Refactor logging statements in TypingIndicator for improved readability * Fix typing loop error handling to ensure task state is managed correctly
1 parent 735164d commit 3024b7c

File tree

3 files changed

+133
-34
lines changed

3 files changed

+133
-34
lines changed

libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/agent_application.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ def __init__(
9393
:param kwargs: Additional configuration parameters.
9494
:type kwargs: Any
9595
"""
96-
self.typing = TypingIndicator()
9796
self._route_list = _RouteList[StateT]()
9897

9998
configuration = kwargs
@@ -659,9 +658,12 @@ async def on_turn(self, context: TurnContext):
659658
await self._start_long_running_call(context, self._on_turn)
660659

661660
async def _on_turn(self, context: TurnContext):
661+
typing = None
662662
try:
663663
if context.activity.type != ActivityTypes.typing:
664-
await self._start_typing(context)
664+
if self._options.start_typing_timer:
665+
typing = TypingIndicator()
666+
await typing.start(context)
665667

666668
self._remove_mentions(context)
667669

@@ -709,11 +711,8 @@ async def _on_turn(self, context: TurnContext):
709711
)
710712
await self._on_error(context, err)
711713
finally:
712-
self.typing.stop()
713-
714-
async def _start_typing(self, context: TurnContext):
715-
if self._options.start_typing_timer:
716-
await self.typing.start(context)
714+
if typing:
715+
await typing.stop()
717716

718717
def _remove_mentions(self, context: TurnContext):
719718
if (

libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
"""
55

66
from __future__ import annotations
7+
import asyncio
78
import logging
89

9-
from threading import Timer
1010
from typing import Optional
1111

1212
from microsoft_agents.hosting.core import TurnContext
@@ -20,36 +20,60 @@ class TypingIndicator:
2020
Encapsulates the logic for sending "typing" activity to the user.
2121
"""
2222

23-
_interval: int
24-
_timer: Optional[Timer] = None
25-
26-
def __init__(self, interval=1000) -> None:
27-
self._interval = interval
23+
def __init__(self, intervalSeconds=1) -> None:
24+
self._intervalSeconds = intervalSeconds
25+
self._task: Optional[asyncio.Task] = None
26+
self._running: bool = False
27+
self._lock = asyncio.Lock()
2828

2929
async def start(self, context: TurnContext) -> None:
30-
if self._timer is not None:
31-
return
30+
async with self._lock:
31+
if self._running:
32+
return
33+
34+
logger.debug(
35+
f"Starting typing indicator with interval: {self._intervalSeconds} seconds"
36+
)
37+
self._running = True
38+
self._task = asyncio.create_task(self._typing_loop(context))
3239

33-
logger.debug(f"Starting typing indicator with interval: {self._interval} ms")
34-
func = self._on_timer(context)
35-
self._timer = Timer(self._interval, func)
36-
self._timer.start()
37-
await func()
40+
async def stop(self) -> None:
41+
async with self._lock:
42+
if not self._running:
43+
return
3844

39-
def stop(self) -> None:
40-
if self._timer:
4145
logger.debug("Stopping typing indicator")
42-
self._timer.cancel()
43-
self._timer = None
46+
self._running = False
47+
task = self._task
48+
self._task = None
4449

45-
def _on_timer(self, context: TurnContext):
46-
async def __call__():
50+
# Cancel outside the lock to avoid blocking
51+
if task and not task.done():
52+
task.cancel()
4753
try:
48-
logger.debug("Sending typing activity")
49-
await context.send_activity(Activity(type=ActivityTypes.typing))
50-
except Exception as e:
51-
# TODO: Improve when adding logging
52-
logger.error(f"Error sending typing activity: {e}")
53-
self.stop()
54-
55-
return __call__
54+
await task
55+
except asyncio.CancelledError:
56+
pass
57+
58+
async def _typing_loop(self, context: TurnContext):
59+
"""Continuously send typing indicators at the specified interval."""
60+
try:
61+
while True:
62+
# Check running status under lock
63+
async with self._lock:
64+
if not self._running:
65+
break
66+
67+
try:
68+
logger.debug("Sending typing activity")
69+
await context.send_activity(Activity(type=ActivityTypes.typing))
70+
except Exception as e:
71+
logger.error(f"Error sending typing activity: {e}")
72+
async with self._lock:
73+
self._running = False
74+
break
75+
76+
await asyncio.sleep(self._intervalSeconds)
77+
except asyncio.CancelledError:
78+
logger.debug("Typing indicator loop cancelled")
79+
raise
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import asyncio
2+
3+
import pytest
4+
5+
from microsoft_agents.activity import Activity, ActivityTypes
6+
from microsoft_agents.hosting.core.app.typing_indicator import TypingIndicator
7+
8+
9+
class StubTurnContext:
10+
"""Test double that tracks sent activities."""
11+
12+
def __init__(self, should_raise: bool = False) -> None:
13+
self.sent_activities = []
14+
self.should_raise = should_raise
15+
16+
async def send_activity(self, activity: Activity):
17+
if self.should_raise:
18+
raise RuntimeError("send_activity failure")
19+
self.sent_activities.append(activity)
20+
return None
21+
22+
23+
@pytest.mark.asyncio
24+
async def test_start_sends_typing_activity():
25+
context = StubTurnContext()
26+
indicator = TypingIndicator(intervalSeconds=0.01)
27+
28+
await indicator.start(context)
29+
await asyncio.sleep(0.03)
30+
await indicator.stop()
31+
32+
assert len(context.sent_activities) >= 1
33+
assert all(activity.type == ActivityTypes.typing for activity in context.sent_activities)
34+
35+
36+
@pytest.mark.asyncio
37+
async def test_start_is_idempotent():
38+
context = StubTurnContext()
39+
indicator = TypingIndicator(intervalSeconds=0.01)
40+
41+
await indicator.start(context)
42+
first_task = indicator._task # noqa: SLF001 - accessing for test verification
43+
44+
await indicator.start(context)
45+
second_task = indicator._task # noqa: SLF001
46+
47+
assert first_task is second_task
48+
49+
await indicator.stop()
50+
51+
52+
@pytest.mark.asyncio
53+
async def test_stop_without_start_is_noop():
54+
indicator = TypingIndicator()
55+
56+
await indicator.stop()
57+
58+
assert indicator._task is None # noqa: SLF001
59+
assert indicator._running is False # noqa: SLF001
60+
61+
62+
@pytest.mark.asyncio
63+
async def test_typing_loop_stops_on_send_error():
64+
context = StubTurnContext(should_raise=True)
65+
indicator = TypingIndicator(intervalSeconds=0.01)
66+
67+
await indicator.start(context)
68+
await asyncio.sleep(0.02)
69+
70+
assert indicator._task is not None # noqa: SLF001
71+
await asyncio.wait_for(indicator._task, timeout=0.1) # Ensure loop exits
72+
73+
assert indicator._running is False # noqa: SLF001
74+
assert indicator._task.done() # noqa: SLF001
75+
76+
await indicator.stop()

0 commit comments

Comments
 (0)