[STT-1648] improve: Clear resource cache in background thread#3195
[STT-1648] improve: Clear resource cache in background thread#3195MarkLark86 wants to merge 5 commits intosuperdesk:developfrom
Conversation
| return | ||
|
|
||
| # 1. Signal cancellation to all tasks | ||
| for task in _thread_tasks: |
There was a problem hiding this comment.
I wonder if it should cancel right away or rather wait a bit
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent request failures caused by exceptions during resource cache invalidation by moving cache cleanup work to background threads and adding lifecycle hooks to drain these background tasks during shutdown.
Changes:
- Introduces
superdesk.core.tasksfor running callables in a background thread and tracking/awaiting them on shutdown. - Updates Eve backend write paths to clean resource cache in a background thread and only for resources with caching enabled.
- Adds Redis socket timeout configuration defaults and adjusts dependency pinning / test expectations.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
superdesk/core/tasks.py |
New background-thread task runner and shutdown drain helper. |
superdesk/cache.py |
Adds clean_in_thread() and configures Redis client timeouts. |
superdesk/eve_backend.py |
Routes cache invalidation through _clean_cache() and background cleanup. |
superdesk/services.py |
Marks cacheable services via uses_cache = True. |
superdesk/factory/app.py |
Hooks shutdown to await/cancel background thread tasks. |
superdesk/default_settings.py |
Adds Redis socket/connect timeout settings. |
tests/core/tasks_test.py |
Adds tests for the new tasks module. |
setup.py |
Pins pydantic to <2.13. |
features/content_publish.feature |
Updates expected publish state in feature scenarios. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def run_in_thread(func: Callable, *args: Any, **kwargs: Any) -> asyncio.Task: | ||
| """Run a callable in a thread pool without awaiting it.""" | ||
|
|
||
| # ``asyncio.to_thread`` copies the current context, so Quart/Flask functionality should still work | ||
| task_name = f"thread_task__{func.__module__}_{func.__qualname__}" | ||
| coroutine = asyncio.to_thread(func, *args, **kwargs) | ||
| task = asyncio.create_task(coroutine, name=task_name) | ||
| _thread_tasks.add(task) | ||
| task.add_done_callback(_handle_background_task_result) | ||
| return task |
There was a problem hiding this comment.
run_in_thread uses asyncio.create_task, which raises RuntimeError: no running event loop if this function is called from a synchronous context (e.g., a sync Quart route executed in a worker thread). Since this is now used by cache.clean_in_thread, consider handling the "no running loop" case explicitly (e.g., fall back to executing synchronously, or require/accept an explicit loop and schedule via loop.call_soon_threadsafe).
| # 1. Signal cancellation to all tasks | ||
| for task in _thread_tasks: | ||
| task.cancel() | ||
|
|
||
| try: | ||
| # 2. Wrap the gather in a wait_for to enforce the timeout | ||
| await asyncio.wait_for( | ||
| asyncio.gather(*_thread_tasks, return_exceptions=True), | ||
| timeout, | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| # 3. Handle tasks that refused to stop in time | ||
| still_running = [t for t in _thread_tasks if not t.done()] | ||
| logger.warning(f"Background threads shutdown timed out. {len(still_running)} tasks still active.") | ||
| finally: | ||
| # 4. Clear the set to release references | ||
| _thread_tasks.clear() |
There was a problem hiding this comment.
Cancelling an asyncio.Task created from asyncio.to_thread(...) does not reliably stop the underlying worker thread; it only cancels the awaiter. As a result, wait_thread_tasks_to_complete may clear _thread_tasks while the thread work continues, and shutdown can still hang when the event loop later waits for the default executor to finish. Consider not calling task.cancel() here; instead await completion up to timeout, log unfinished tasks, and keep tracking them (or use a dedicated executor whose shutdown behavior you control).
| # 1. Signal cancellation to all tasks | |
| for task in _thread_tasks: | |
| task.cancel() | |
| try: | |
| # 2. Wrap the gather in a wait_for to enforce the timeout | |
| await asyncio.wait_for( | |
| asyncio.gather(*_thread_tasks, return_exceptions=True), | |
| timeout, | |
| ) | |
| except asyncio.TimeoutError: | |
| # 3. Handle tasks that refused to stop in time | |
| still_running = [t for t in _thread_tasks if not t.done()] | |
| logger.warning(f"Background threads shutdown timed out. {len(still_running)} tasks still active.") | |
| finally: | |
| # 4. Clear the set to release references | |
| _thread_tasks.clear() | |
| # ``asyncio.to_thread`` work cannot be reliably stopped by cancelling the | |
| # wrapping ``asyncio.Task`` once the worker thread is running. Wait for the | |
| # currently tracked tasks up to ``timeout`` and keep unfinished tasks | |
| # tracked so shutdown code can still account for them. | |
| tasks_to_wait_for = set(_thread_tasks) | |
| done, pending = await asyncio.wait(tasks_to_wait_for, timeout=timeout) | |
| if pending: | |
| logger.warning(f"Background threads shutdown timed out. {len(pending)} tasks still active.") |
| class SuperdeskCache(hermes.Hermes): | ||
| def clean_in_thread(self, tags: list[str]) -> None: | ||
| run_in_thread(self.clean, tags) |
There was a problem hiding this comment.
clean_in_thread calls run_in_thread, which requires a running asyncio event loop. If cache.clean_in_thread is invoked from a sync codepath (e.g., a sync Eve/Quart handler running in a worker thread), this will raise RuntimeError: no running event loop. Consider making clean_in_thread resilient (e.g., fall back to self.clean(tags) when no loop is running).
| def long_running_func(): | ||
| # This will be interrupted if the thread is cancelled, | ||
| time.sleep(5) | ||
| raise Exception("This should not be reached") | ||
|
|
||
| with patch("superdesk.core.tasks.logger") as mock_logger: | ||
| task = tasks.run_in_thread(long_running_func) | ||
| self.assertIn(task, tasks._thread_tasks) | ||
|
|
||
| await tasks.wait_thread_tasks_to_complete(timeout=1) | ||
|
|
||
| mock_logger.warning.assert_called_with( | ||
| "Background task was cancelled", extra={"task_name": task.get_name()} | ||
| ) | ||
|
|
||
| self.assertTrue(task.done()) | ||
| self.assertTrue(task.cancelled()) | ||
| self.assertEqual(len(tasks._thread_tasks), 0) |
There was a problem hiding this comment.
This test assumes that cancelling the asyncio task will interrupt the underlying time.sleep(5) running in asyncio.to_thread, but cancelling the awaiter does not actually stop the worker thread. This makes the test non-deterministic and it may fail depending on timing/timeout behavior. Consider rewriting the test to avoid relying on thread interruption (e.g., coordinate with a threading.Event and assert either the timeout-warning path or that the task completes normally).
| mock_logger.warning.assert_called_with( | ||
| "Background task was cancelled", extra={"task_name": task.get_name()} | ||
| ) | ||
|
|
There was a problem hiding this comment.
mock_logger.warning.assert_called_with(...) asserts only the last warning call. If wait_thread_tasks_to_complete logs an additional warning (e.g., its timeout warning), this assertion will fail even if the cancellation warning was emitted. Prefer assert_any_call(...) or assert the sequence of calls explicitly.
| def _clean_cache(self, endpoint_name: str) -> None: | ||
| if getattr(superdesk.get_resource_service(endpoint_name), "uses_cache", False): | ||
| cache.clean_in_thread([endpoint_name]) |
There was a problem hiding this comment.
cache.clean_in_thread(...) relies on asyncio.create_task under the hood, which requires a running event loop. If _clean_cache is reached from a synchronous Eve/Quart codepath running in a worker thread (no running loop), this will raise at runtime and cache invalidation will fail. Consider making clean_in_thread (or this call site) fall back to synchronous cache.clean(...) when no loop is running.
Purpose
When modifying resources, sometimes the cache for that resource is cleared. If an exception is raised during clearing of this cache, the parent request will also fail.
What has changed
superdesk.core.tasksmodule that allows executing a function in another thread, with optional waiting (user can await the result ofrun_in_thread)clean_in_threadutil function on the cache classWIP: This PR doesn't yet include tests for the new
superdesk.core.tasksmodule, I will update this when they're ready.Resolves: STT-1648