diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py index f60d8b45..592754b4 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py @@ -4,9 +4,9 @@ """ from __future__ import annotations -import asyncio import logging +from threading import Timer from typing import Optional from microsoft_agents.hosting.core import TurnContext @@ -20,59 +20,36 @@ class TypingIndicator: Encapsulates the logic for sending "typing" activity to the user. """ - def __init__(self, intervalSeconds=1) -> None: - self._intervalSeconds = intervalSeconds - self._task: Optional[asyncio.Task] = None - self._running: bool = False - self._lock = asyncio.Lock() + _interval: int + _timer: Optional[Timer] = None - async def start(self, context: TurnContext) -> None: - async with self._lock: - if self._running: - return + def __init__(self, interval=10) -> None: + self._interval = interval - logger.debug( - f"Starting typing indicator with interval: {self._intervalSeconds} seconds" - ) - self._running = True - self._task = asyncio.create_task(self._typing_loop(context)) + async def start(self, context: TurnContext) -> None: + if self._timer is not None: + return - async def stop(self) -> None: - async with self._lock: - if not self._running: - return + logger.debug(f"Starting typing indicator with interval: {self._interval} ms") + func = self._on_timer(context) + self._timer = Timer(self._interval, func) + self._timer.start() + await func() + def stop(self) -> None: + if self._timer: logger.debug("Stopping typing indicator") - self._running = False - task = self._task - self._task = None + self._timer.cancel() + self._timer = None - # Cancel outside the lock to avoid blocking - if task and not task.done(): - task.cancel() + def _on_timer(self, context: TurnContext): + async def __call__(): try: - await task - except asyncio.CancelledError: - pass - - async def _typing_loop(self, context: TurnContext): - """Continuously send typing indicators at the specified interval.""" - try: - while True: - # Check running status under lock - async with self._lock: - if not self._running: - break - - try: - logger.debug("Sending typing activity") - await context.send_activity(Activity(type=ActivityTypes.typing)) - except Exception as e: - logger.error(f"Error sending typing activity: {e}") - async with self._lock: - self._running = False - break - - await asyncio.sleep(self._intervalSeconds) - except asyncio.CancelledError: - logger.debug("Typing indicator loop cancelled") + logger.debug("Sending typing activity") + await context.send_activity(Activity(type=ActivityTypes.typing)) + except Exception as e: + # TODO: Improve when adding logging + logger.error(f"Error sending typing activity: {e}") + self.stop() + + return __call__