Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
"""

from __future__ import annotations
import asyncio
import logging

from threading import Timer
from typing import Optional

from microsoft_agents.hosting.core import TurnContext
Expand All @@ -20,59 +20,36 @@ class TypingIndicator:
Encapsulates the logic for sending "typing" activity to the user.
"""

def __init__(self, intervalSeconds=1) -> None:
self._intervalSeconds = intervalSeconds
self._task: Optional[asyncio.Task] = None
self._running: bool = False
self._lock = asyncio.Lock()
_interval: int
_timer: Optional[Timer] = None
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _timer class attribute will be shared across all instances of TypingIndicator. This should be an instance attribute initialized in __init__ to avoid state being shared between different instances.

Copilot uses AI. Check for mistakes.

async def start(self, context: TurnContext) -> None:
async with self._lock:
if self._running:
return
def __init__(self, interval=10) -> None:
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default interval changed from 1 second to 10 seconds without updating call sites. Given the original was 1 second and sent typing indicators continuously in a loop, this significantly changes the behavior and timing characteristics.

Copilot uses AI. Check for mistakes.
self._interval = interval

logger.debug(
f"Starting typing indicator with interval: {self._intervalSeconds} seconds"
)
self._running = True
self._task = asyncio.create_task(self._typing_loop(context))
async def start(self, context: TurnContext) -> None:
if self._timer is not None:
return

async def stop(self) -> None:
async with self._lock:
if not self._running:
return
logger.debug(f"Starting typing indicator with interval: {self._interval} ms")
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log message states the interval is in milliseconds ('ms'), but Python's threading.Timer expects the interval in seconds. The interval value should be documented correctly or converted appropriately.

Suggested change
logger.debug(f"Starting typing indicator with interval: {self._interval} ms")
logger.debug(f"Starting typing indicator with interval: {self._interval} seconds")

Copilot uses AI. Check for mistakes.
func = self._on_timer(context)
self._timer = Timer(self._interval, func)
self._timer.start()
await func()
Comment on lines +34 to +37
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Timer is created with an async function but Timer expects a synchronous callable. When the timer expires, it will try to call the async function synchronously, which will not execute the coroutine properly. Additionally, the timer only fires once and won't send recurring typing indicators as the original implementation did.

Copilot uses AI. Check for mistakes.

def stop(self) -> None:
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stop method signature changed from async def to def, but it's still called with await typing.stop() in agent_application.py line 736. This will cause a runtime error since you cannot await a non-coroutine.

Copilot uses AI. Check for mistakes.
if self._timer:
logger.debug("Stopping typing indicator")
self._running = False
task = self._task
self._task = None
self._timer.cancel()
self._timer = None

# Cancel outside the lock to avoid blocking
if task and not task.done():
task.cancel()
def _on_timer(self, context: TurnContext):
async def __call__():
try:
await task
except asyncio.CancelledError:
pass

async def _typing_loop(self, context: TurnContext):
"""Continuously send typing indicators at the specified interval."""
try:
while True:
# Check running status under lock
async with self._lock:
if not self._running:
break

try:
logger.debug("Sending typing activity")
await context.send_activity(Activity(type=ActivityTypes.typing))
except Exception as e:
logger.error(f"Error sending typing activity: {e}")
async with self._lock:
self._running = False
break

await asyncio.sleep(self._intervalSeconds)
except asyncio.CancelledError:
logger.debug("Typing indicator loop cancelled")
logger.debug("Sending typing activity")
await context.send_activity(Activity(type=ActivityTypes.typing))
except Exception as e:
# TODO: Improve when adding logging
logger.error(f"Error sending typing activity: {e}")
self.stop()

return __call__
Loading