|
7 | 7 | import sys |
8 | 8 | import traceback |
9 | 9 | import tempfile |
| 10 | +from asgiref.sync import sync_to_async |
10 | 11 | from datetime import timedelta |
11 | 12 | from gettext import gettext as _ |
12 | 13 |
|
|
16 | 17 | from django_guid import get_guid |
17 | 18 | from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS |
18 | 19 | from pulpcore.app.models import Task, TaskGroup |
19 | | -from pulpcore.app.util import current_task, get_domain, get_prn |
| 20 | +from pulpcore.app.util import current_task, get_domain, get_prn, deprecation_logger |
20 | 21 | from pulpcore.constants import ( |
21 | 22 | TASK_FINAL_STATES, |
22 | 23 | TASK_INCOMPLETE_STATES, |
@@ -73,20 +74,29 @@ def _execute_task(task): |
73 | 74 | is_coroutine = asyncio.iscoroutine(result) |
74 | 75 |
|
75 | 76 | if immediate is True and not is_coroutine: |
76 | | - raise RuntimeError(_("Immediate tasks must be coroutines.")) |
| 77 | + deprecation_logger.warning( |
| 78 | + _( |
| 79 | + "Immediate tasks must be coroutine functions." |
| 80 | + "Support for non-coroutine immediate tasks will be dropped in pulpcore 3.85." |
| 81 | + ) |
| 82 | + ) |
| 83 | + result = sync_to_async(func) |
| 84 | + is_coroutine = True |
77 | 85 |
|
78 | 86 | if is_coroutine: |
79 | 87 | _logger.debug(_("Task is coroutine %s"), task.pk) |
80 | | - loop = asyncio.get_event_loop() |
81 | 88 | if immediate: |
82 | | - try: |
83 | | - loop.run_until_complete(asyncio.wait_for(result, timeout=IMMEDIATE_TIMEOUT)) |
84 | | - except asyncio.TimeoutError: |
85 | | - raise RuntimeError( |
86 | | - _("Immediate task timed out after {} seconds").format(IMMEDIATE_TIMEOUT) |
87 | | - ) |
| 89 | + coro = asyncio.wait_for(result, timeout=IMMEDIATE_TIMEOUT) |
88 | 90 | else: |
89 | | - loop.run_until_complete(result) |
| 91 | + coro = result |
| 92 | + |
| 93 | + loop = asyncio.get_event_loop() |
| 94 | + try: |
| 95 | + loop.run_until_complete(coro) |
| 96 | + except asyncio.TimeoutError: |
| 97 | + raise RuntimeError( |
| 98 | + _("Immediate task timed out after {} seconds").format(IMMEDIATE_TIMEOUT) |
| 99 | + ) |
90 | 100 |
|
91 | 101 | except Exception: |
92 | 102 | exc_type, exc, tb = sys.exc_info() |
@@ -225,9 +235,6 @@ def dispatch( |
225 | 235 | stack.enter_context(task) |
226 | 236 | else: |
227 | 237 | notify_workers = True |
228 | | - _logger.info("asdfasdf*") |
229 | | - _logger.info("asdfasdf") |
230 | | - _logger.info("*" * 50) |
231 | 238 | if immediate: |
232 | 239 | prior_tasks = Task.objects.filter( |
233 | 240 | state__in=TASK_INCOMPLETE_STATES, pulp_created__lt=task.pulp_created |
|
0 commit comments