Skip to content

Commit 9790871

Browse files
committed
feat: version 1.7.0
1 parent 639cc40 commit 9790871

File tree

9 files changed

+2914
-1395
lines changed

9 files changed

+2914
-1395
lines changed

README.md

Lines changed: 74 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,18 @@ Tasks Page | Task Details Page
2020
1) Add this middleware to your project:
2121

2222
```python
23+
import asyncio
24+
import logging
25+
import aiohttp
2326
from typing import Any
2427
from urllib.parse import urljoin
2528
from datetime import datetime, UTC
2629

27-
import httpx
2830
from taskiq import TaskiqMiddleware, TaskiqResult, TaskiqMessage
2931

32+
logger = logging.getLogger(__name__)
33+
34+
3035
class TaskiqAdminMiddleware(TaskiqMiddleware):
3136
def __init__(
3237
self,
@@ -38,63 +43,82 @@ class TaskiqAdminMiddleware(TaskiqMiddleware):
3843
self.url = url
3944
self.api_token = api_token
4045
self.__ta_broker_name = taskiq_broker_name
46+
self._pending: set[asyncio.Task[Any]] = set()
47+
self._client: aiohttp.ClientSession | None = None
4148

42-
async def post_send(self, message):
43-
now = datetime.now(UTC).replace(tzinfo=None).isoformat()
44-
async with httpx.AsyncClient() as client:
45-
await client.post(
46-
headers={"access-token": self.api_token},
47-
url=urljoin(self.url, f"/api/tasks/{message.task_id}/queued"),
48-
json={
49-
"args": message.args,
50-
"kwargs": message.kwargs,
51-
"taskName": message.task_name,
52-
"worker": self.__ta_broker_name,
53-
"queuedAt": now,
54-
},
49+
@staticmethod
50+
def _now_iso() -> str:
51+
return datetime.now(UTC).replace(tzinfo=None).isoformat()
52+
53+
async def startup(self):
54+
self._client = aiohttp.ClientSession(
55+
timeout=aiohttp.ClientTimeout(total=5),
56+
)
57+
58+
async def shutdown(self):
59+
if self._pending:
60+
await asyncio.gather(*self._pending, return_exceptions=True)
61+
if self._client is not None:
62+
await self._client.close()
63+
64+
def _spawn_request(self, endpoint: str, payload: dict[str, Any]) -> None:
65+
async def _send() -> None:
66+
session = self._client or aiohttp.ClientSession(
67+
timeout=aiohttp.ClientTimeout(total=5)
5568
)
69+
70+
async with session.post(
71+
urljoin(self.url, endpoint),
72+
headers={"access-token": self.api_token},
73+
json=payload,
74+
) as resp:
75+
resp.raise_for_status()
76+
if not resp.ok:
77+
logger.error(f"POST {endpoint} - {resp.status}")
78+
79+
task = asyncio.create_task(_send())
80+
self._pending.add(task)
81+
task.add_done_callback(self._pending.discard)
82+
83+
async def post_send(self, message):
84+
self._spawn_request(
85+
f"/api/tasks/{message.task_id}/queued",
86+
{
87+
"args": message.args,
88+
"kwargs": message.kwargs,
89+
"queuedAt": self._now_iso(),
90+
"taskName": message.task_name,
91+
"worker": self.__ta_broker_name,
92+
},
93+
)
5694
return super().post_send(message)
5795

5896
async def pre_execute(self, message: TaskiqMessage):
5997
""""""
60-
now = datetime.now(UTC).replace(tzinfo=None).isoformat()
61-
async with httpx.AsyncClient() as client:
62-
await client.post(
63-
headers={"access-token": self.api_token},
64-
url=urljoin(self.url, f"/api/tasks/{message.task_id}/started"),
65-
json={
66-
"startedAt": now,
67-
"args": message.args,
68-
"kwargs": message.kwargs,
69-
"taskName": message.task_name,
70-
"worker": self.__ta_broker_name,
71-
},
72-
)
98+
99+
self._spawn_request(
100+
f"/api/tasks/{message.task_id}/started",
101+
{
102+
"args": message.args,
103+
"kwargs": message.kwargs,
104+
"startedAt": self._now_iso(),
105+
"taskName": message.task_name,
106+
"worker": self.__ta_broker_name,
107+
},
108+
)
73109
return super().pre_execute(message)
74110

75-
async def post_execute(
76-
self,
77-
message: TaskiqMessage,
78-
result: TaskiqResult[Any],
79-
):
111+
async def post_execute(self, message: TaskiqMessage, result: TaskiqResult[Any]):
80112
""""""
81-
now = datetime.now(UTC).replace(tzinfo=None).isoformat()
82-
async with httpx.AsyncClient() as client:
83-
await client.post(
84-
headers={"access-token": self.api_token},
85-
url=urljoin(
86-
self.url,
87-
f"/api/tasks/{message.task_id}/executed",
88-
),
89-
json={
90-
"finishedAt": now,
91-
"error": result.error
92-
if result.error is None
93-
else repr(result.error),
94-
"executionTime": result.execution_time,
95-
"returnValue": {"return_value": result.return_value},
96-
},
97-
)
113+
self._spawn_request(
114+
f"/api/tasks/{message.task_id}/executed",
115+
{
116+
"finishedAt": self._now_iso(),
117+
"executionTime": result.execution_time,
118+
"error": None if result.error is None else repr(result.error),
119+
"returnValue": {"return_value": result.return_value},
120+
},
121+
)
98122
return super().post_execute(message, result)
99123
```
100124

@@ -103,7 +127,7 @@ class TaskiqAdminMiddleware(TaskiqMiddleware):
103127
```python
104128
...
105129
broker = (
106-
ListQueueBroker(
130+
RedisStreamBroker(
107131
url=redis_url,
108132
queue_name="my_lovely_queue",
109133
)

package.json

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,55 @@
11
{
2-
"name": "nuxt-app",
2+
"name": "taskiq-admin",
33
"private": true,
44
"type": "module",
5-
"version": "1.6.0",
5+
"version": "1.7.0",
66
"scripts": {
77
"build": "nuxt build",
88
"dev": "nuxt dev",
99
"generate": "nuxt generate",
1010
"preview": "nuxt preview",
1111
"postinstall": "nuxt prepare",
1212
"typecheck": "tsc --noEmit",
13+
"test": "vitest --run",
1314
"db:push": "drizzle-kit push",
14-
"generate:sql": "drizzle-kit export --sql | sed 's/CREATE TABLE/CREATE TABLE IF NOT EXISTS/g; s/CREATE INDEX/CREATE INDEX IF NOT EXISTS/g' > dbschema.sql",
15-
"generate:future:sql": "drizzle-kit export --sql | sed 's/CREATE TABLE/CREATE TABLE IF NOT EXISTS/g; s/CREATE INDEX/CREATE INDEX IF NOT EXISTS/g' > dbschema.sql; sed -i '1s/^/PRAGMA journal_mode = WAL; PRAGMA synchronous = normal; PRAGMA journal_size_limit = 6144000;\\n/' dbschema.sql"
15+
"generate:sql": "drizzle-kit export --sql | sed 's/CREATE TABLE/CREATE TABLE IF NOT EXISTS/g; s/CREATE INDEX/CREATE INDEX IF NOT EXISTS/g' > dbschema.sql; sed -i '1s/^/PRAGMA journal_mode = WAL; PRAGMA synchronous = normal; PRAGMA journal_size_limit = 6144000;\\n/' dbschema.sql",
16+
"generate:deprecated:sql": "drizzle-kit export --sql | sed 's/CREATE TABLE/CREATE TABLE IF NOT EXISTS/g; s/CREATE INDEX/CREATE INDEX IF NOT EXISTS/g' > dbschema.sql"
1617
},
1718
"dependencies": {
18-
"@internationalized/date": "^3.8.0",
19+
"@internationalized/date": "^3.8.2",
1920
"@tailwindcss/vite": "^4.1.3",
2021
"@tanstack/vue-table": "^8.21.3",
21-
"@vueuse/core": "^12.8.2",
22-
"better-sqlite3": "^11.9.1",
23-
"bootstrap": "^5.3.3",
22+
"@vueuse/core": "^13.4.0",
23+
"better-sqlite3": "^12.1.1",
2424
"class-variance-authority": "^0.7.1",
2525
"clsx": "^2.1.1",
2626
"dayjs": "^1.11.13",
27-
"dotenv": "^16.4.7",
28-
"drizzle-orm": "^0.42.0",
29-
"lucide-vue-next": "^0.487.0",
30-
"nuxt": "^3.16.2",
27+
"dotenv": "^16.6.0",
28+
"drizzle-orm": "^0.44.2",
29+
"lucide-vue-next": "^0.524.0",
30+
"nuxt": "^3.17.5",
3131
"reka-ui": "^2.2.0",
3232
"tailwind-merge": "^3.2.0",
3333
"tailwindcss": "^4.1.3",
3434
"tw-animate-css": "^1.2.5",
35-
"vue": "^3.5.13",
36-
"vue-router": "^4.5.0",
37-
"vue-sonner": "^1.3.0",
38-
"zod": "^3.24.3"
35+
"vue": "^3.5.17",
36+
"vue-router": "^4.5.1",
37+
"vue-sonner": "^2.0.1",
38+
"zod": "^3.25.67"
3939
},
4040
"packageManager": "[email protected]+sha1.a428b12202bc4f23b17e6dffe730734dae5728e2",
4141
"devDependencies": {
4242
"@iconify-json/radix-icons": "^1.2.2",
43-
"@iconify/vue": "^4.3.0",
43+
"@iconify/vue": "^5.0.0",
44+
"@nuxt/test-utils": "^3.19.1",
4445
"@types/better-sqlite3": "^7.6.12",
45-
"drizzle-kit": "^0.31.0",
46+
"@vue/test-utils": "^2.4.6",
47+
"drizzle-kit": "^0.31.4",
48+
"happy-dom": "^18.0.1",
49+
"playwright-core": "^1.53.1",
4650
"prettier": "^3.5.3",
47-
"typescript": "^5.8.3"
51+
"tsx": "^4.20.3",
52+
"typescript": "^5.8.3",
53+
"vitest": "^3.2.4"
4854
}
4955
}

0 commit comments

Comments
 (0)