@@ -21,18 +21,13 @@ Tasks Page | Task Details Page
21211 ) Add this middleware to your project:
2222
2323``` python
24- import asyncio
25- import logging
26- import aiohttp
2724from typing import Any
2825from urllib.parse import urljoin
2926from datetime import datetime, UTC
3027
28+ import httpx
3129from taskiq import TaskiqMiddleware, TaskiqResult, TaskiqMessage
3230
33- logger = logging.getLogger(__name__ )
34-
35-
3631class TaskiqAdminMiddleware (TaskiqMiddleware ):
3732 def __init__ (
3833 self ,
@@ -44,82 +39,63 @@ class TaskiqAdminMiddleware(TaskiqMiddleware):
4439 self .url = url
4540 self .api_token = api_token
4641 self .__ta_broker_name = taskiq_broker_name
47- self ._pending: set[asyncio.Task[Any]] = set ()
48- self ._client: aiohttp.ClientSession | None = None
49-
50- @ staticmethod
51- def _now_iso () -> str :
52- return datetime.now(UTC ).replace(tzinfo = None ).isoformat()
53-
54- async def startup (self ):
55- self ._client = aiohttp.ClientSession(
56- timeout = aiohttp.ClientTimeout(total = 5 ),
57- )
58-
59- async def shutdown (self ):
60- if self ._pending:
61- await asyncio.gather(* self ._pending, return_exceptions = True )
62- if self ._client is not None :
63- await self ._client.close()
64-
65- def _spawn_request (self , endpoint : str , payload : dict[str , Any]) -> None :
66- async def _send () -> None :
67- session = self ._client or aiohttp.ClientSession(
68- timeout = aiohttp.ClientTimeout(total = 5 )
69- )
70-
71- async with session.post(
72- urljoin(self .url, endpoint),
73- headers = {" access-token" : self .api_token},
74- json = payload,
75- ) as resp:
76- resp.raise_for_status()
77- if not resp.ok:
78- logger.error(f " POST { endpoint} - { resp.status} " )
79-
80- task = asyncio.create_task(_send())
81- self ._pending.add(task)
82- task.add_done_callback(self ._pending.discard)
8342
8443 async def post_send (self , message ):
85- self ._spawn_request(
86- f " /api/tasks/ { message.task_id} /queued " ,
87- {
88- " args" : message.args,
89- " kwargs" : message.kwargs,
90- " queuedAt" : self ._now_iso(),
91- " taskName" : message.task_name,
92- " worker" : self .__ta_broker_name,
93- },
94- )
44+ now = datetime.now(UTC ).replace(tzinfo = None ).isoformat()
45+ async with httpx.AsyncClient() as client:
46+ await client.post(
47+ headers = {" access-token" : self .api_token},
48+ url = urljoin(self .url, f " /api/tasks/ { message.task_id} /queued " ),
49+ json = {
50+ " args" : message.args,
51+ " kwargs" : message.kwargs,
52+ " taskName" : message.task_name,
53+ " worker" : self .__ta_broker_name,
54+ " queuedAt" : now,
55+ },
56+ )
9557 return super ().post_send(message)
9658
9759 async def pre_execute (self , message : TaskiqMessage):
9860 """ """
99-
100- self ._spawn_request(
101- f " /api/tasks/ { message.task_id} /started " ,
102- {
103- " args" : message.args,
104- " kwargs" : message.kwargs,
105- " startedAt" : self ._now_iso(),
106- " taskName" : message.task_name,
107- " worker" : self .__ta_broker_name,
108- },
109- )
61+ now = datetime.now(UTC ).replace(tzinfo = None ).isoformat()
62+ async with httpx.AsyncClient() as client:
63+ await client.post(
64+ headers = {" access-token" : self .api_token},
65+ url = urljoin(self .url, f " /api/tasks/ { message.task_id} /started " ),
66+ json = {
67+ " startedAt" : now,
68+ " args" : message.args,
69+ " kwargs" : message.kwargs,
70+ " taskName" : message.task_name,
71+ " worker" : self .__ta_broker_name,
72+ },
73+ )
11074 return super ().pre_execute(message)
11175
112- async def post_execute (self , message : TaskiqMessage, result : TaskiqResult[Any]):
76+ async def post_execute (
77+ self ,
78+ message : TaskiqMessage,
79+ result : TaskiqResult[Any],
80+ ):
11381 """ """
114- self ._spawn_request(
115- f " /api/tasks/ { message.task_id} /executed " ,
116- {
117- " finishedAt" : self ._now_iso(),
118- " executionTime" : result.execution_time,
119- " error" : None if result.error is None else repr (result.error),
120- " returnValue" : {" return_value" : result.return_value},
121- },
122- )
82+ now = datetime.now(UTC ).replace(tzinfo = None ).isoformat()
83+ async with httpx.AsyncClient() as client:
84+ await client.post(
85+ headers = {" access-token" : self .api_token},
86+ url = urljoin(
87+ self .url,
88+ f " /api/tasks/ { message.task_id} /executed " ,
89+ ),
90+ json = {
91+ " finishedAt" : now,
92+ " error" : result.error
93+ if result.error is None
94+ else repr (result.error),
95+ " executionTime" : result.execution_time,
96+ " returnValue" : {" return_value" : result.return_value},
97+ },
98+ )
12399 return super ().post_execute(message, result)
124100```
125101
0 commit comments