Skip to content

Commit 48cb536

Browse files
authored
fix: run notifications in background
1 parent 0b601a9 commit 48cb536

File tree

5 files changed

+205
-5
lines changed

5 files changed

+205
-5
lines changed

fastapi_jsonrpc/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -919,6 +919,15 @@ async def handle_req(
919919
RequestValidationError(_normalize_errors(solved_dependency.errors))
920920
)
921921

922+
# We MUST NOT return response for Notification
923+
# https://www.jsonrpc.org/specification#notification
924+
# Since we do not need response - run in scheduler
925+
if ctx.request.id is None:
926+
scheduler = await self.entrypoint.get_scheduler()
927+
await scheduler.spawn(call_sync_async(self.func, **solved_dependency.values))
928+
return {}
929+
930+
# Для обычных запросов продолжаем как раньше
922931
result = await call_sync_async(self.func, **solved_dependency.values)
923932

924933
response = {

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ rst_include = "^2.1.0"
2828
pytest = "^6.2"
2929
sentry-sdk = "^2.0"
3030
requests = ">0.0.0"
31-
httpx = ">=0.23.0,<0.24.0" # FastAPI/Starlette extra test deps
31+
httpx = ">=0.27.0,<0.29.0" # FastAPI/Starlette extra test deps
3232

3333
[build-system]
3434
requires = ["poetry>=0.12"]

tests/conftest.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,17 @@ def requester(method, params, request_id=0):
129129
return requester
130130

131131

132+
@pytest.fixture
133+
def ep_wait_all_requests_done(app_client, ep):
134+
"""Returns function which waits until inner scheduler was empty
135+
That's means all requests are done
136+
"""
137+
def wait_empty(ep=ep):
138+
app_client.portal.call(ep.scheduler.wait_and_close)
139+
140+
return wait_empty
141+
142+
132143
@pytest.fixture
133144
def openapi_compatible():
134145
supported_openapi_versions = [packaging.version.parse("3.0.2"), packaging.version.parse("3.1.0")]

tests/test_jsonrpc.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import threading
2+
13
from json import dumps as json_dumps
24
from typing import List
35

@@ -48,28 +50,30 @@ def test_no_params(echo, json_request):
4850

4951

5052
@pytest.mark.parametrize('request_id', [111, 'qwe'])
51-
def test_basic(echo, json_request, request_id):
53+
def test_basic(echo, json_request, request_id, ep_wait_all_requests_done):
5254
resp = json_request({
5355
'id': request_id,
5456
'jsonrpc': '2.0',
5557
'method': 'echo',
5658
'params': {'data': 'data-123'},
5759
})
5860
assert resp == {'id': request_id, 'jsonrpc': '2.0', 'result': 'data-123'}
61+
ep_wait_all_requests_done()
5962
assert echo.history == ['data-123']
6063

6164

62-
def test_notify(echo, raw_request):
65+
def test_notify(echo, raw_request, ep_wait_all_requests_done):
6366
resp = raw_request(json_dumps({
6467
'jsonrpc': '2.0',
6568
'method': 'echo',
6669
'params': {'data': 'data-123'},
6770
}))
6871
assert not resp.content
72+
ep_wait_all_requests_done()
6973
assert echo.history == ['data-123']
7074

7175

72-
def test_batch_notify(echo, raw_request):
76+
def test_batch_notify(echo, raw_request, ep_wait_all_requests_done):
7377
resp = raw_request(json_dumps([
7478
{
7579
'jsonrpc': '2.0',
@@ -83,6 +87,7 @@ def test_batch_notify(echo, raw_request):
8387
},
8488
]))
8589
assert not resp.content
90+
ep_wait_all_requests_done()
8691
assert set(echo.history) == {'data-111', 'data-222'}
8792

8893

@@ -279,7 +284,7 @@ def test_method_not_found(echo, json_request):
279284
assert echo.history == []
280285

281286

282-
def test_batch(echo, json_request):
287+
def test_batch(echo, json_request, ep_wait_all_requests_done):
283288
resp = json_request([
284289
{
285290
'id': 111,
@@ -310,6 +315,7 @@ def test_batch(echo, json_request):
310315
{'id': 'qwe', 'jsonrpc': '2.0', 'result': 'data-qwe'},
311316
{'id': 'method-not-found', 'jsonrpc': '2.0', 'error': {'code': -32601, 'message': 'Method not found'}},
312317
]
318+
ep_wait_all_requests_done()
313319
assert set(echo.history) == {'data-111', 'data-notify', 'data-qwe'}
314320

315321

tests/test_notification.py

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import asyncio
2+
import collections
3+
import time
4+
from typing import Dict
5+
6+
import pytest
7+
from fastapi import Body
8+
9+
10+
class ExecutionTracker:
11+
def __init__(self):
12+
self.executions = []
13+
self.last_execution_time = 0
14+
15+
def record(self, method_name, delay):
16+
self.executions.append((method_name, delay))
17+
self.last_execution_time = time.monotonic()
18+
19+
20+
@pytest.fixture
21+
def tracker():
22+
return ExecutionTracker()
23+
24+
25+
@pytest.fixture
26+
def ep(ep, tracker):
27+
@ep.method()
28+
async def delayed_method(
29+
delay: float = Body(..., ge=0),
30+
message: str = Body(...),
31+
) -> dict:
32+
start_time = time.monotonic()
33+
await asyncio.sleep(delay)
34+
tracker.record("delayed_method", delay)
35+
return {"message": message, "execution_time": time.monotonic() - start_time}
36+
37+
@ep.method()
38+
async def instant_method(
39+
message: str = Body(...),
40+
) -> Dict[str, str]:
41+
tracker.record("instant_method", 0)
42+
return {"message": message}
43+
44+
return ep
45+
46+
47+
def test_regular_request__no_background(app, json_request, tracker):
48+
start_time = time.monotonic()
49+
delay = 0.5
50+
51+
# Запрос с ID (синхронный)
52+
response = json_request(
53+
{
54+
"jsonrpc": "2.0",
55+
"method": "delayed_method",
56+
"params": {"delay": delay, "message": "sync request"},
57+
"id": 1
58+
}
59+
)
60+
61+
execution_time = time.monotonic() - start_time
62+
63+
# Проверяем, что время выполнения больше чем задержка (т.е. запрос ждал завершения)
64+
assert execution_time >= delay
65+
assert response == {
66+
"jsonrpc": "2.0",
67+
"result": {
68+
"message": "sync request",
69+
"execution_time": pytest.approx(delay, abs=0.1)
70+
},
71+
"id": 1
72+
}
73+
assert len(tracker.executions) == 1
74+
assert tracker.executions[0][0] == "delayed_method"
75+
76+
77+
def test_single_request__notification_in_background(app, app_client, tracker, ep_wait_all_requests_done):
78+
start_time = time.monotonic()
79+
delay = 0.5
80+
81+
# Запрос без ID (уведомление, должен выполниться асинхронно)
82+
response = app_client.post(
83+
"/api/v1/jsonrpc",
84+
json={
85+
"jsonrpc": "2.0",
86+
"method": "delayed_method",
87+
"params": {"delay": delay, "message": "async notification"}
88+
}
89+
)
90+
91+
execution_time = time.monotonic() - start_time
92+
93+
# Проверяем, что время выполнения меньше чем задержка (т.е. запрос не ждал завершения)
94+
assert execution_time < delay
95+
assert response.status_code == 200
96+
assert response.content == b'' # Пустой ответ для уведомления
97+
98+
# Ждем, чтобы убедиться что задача завершилась
99+
ep_wait_all_requests_done()
100+
101+
# Проверяем, что функция действительно была выполнена
102+
assert len(tracker.executions) == 1
103+
assert tracker.executions[0][0] == "delayed_method"
104+
105+
106+
def test_batch_request__notification_in_background(app, app_client, tracker, ep_wait_all_requests_done):
107+
start_time = time.monotonic()
108+
delay1 = 0.5
109+
delay2 = 0.3
110+
111+
# Batch-запрос с обычными запросами и уведомлениями
112+
response = app_client.post(
113+
"/api/v1/jsonrpc",
114+
json=[
115+
# Обычный запрос
116+
{
117+
"jsonrpc": "2.0",
118+
"method": "delayed_method",
119+
"params": {"delay": delay1, "message": "sync request 1"},
120+
"id": 1
121+
},
122+
# Уведомление
123+
{
124+
"jsonrpc": "2.0",
125+
"method": "delayed_method",
126+
"params": {"delay": delay2, "message": "notification 1"}
127+
},
128+
# Еще один обычный запрос
129+
{
130+
"jsonrpc": "2.0",
131+
"method": "instant_method",
132+
"params": {"message": "sync request 2"},
133+
"id": 2
134+
},
135+
# Еще одно уведомление
136+
{
137+
"jsonrpc": "2.0",
138+
"method": "instant_method",
139+
"params": {"message": "notification 2"}
140+
}
141+
]
142+
)
143+
144+
execution_time = time.monotonic() - start_time
145+
146+
# Проверяем, что время выполнения больше чем максимальная задержка среди обычных запросов
147+
assert execution_time >= delay1
148+
assert response.status_code == 200
149+
150+
result = response.json()
151+
# В ответе должны быть только запросы с ID
152+
assert len(result) == 2
153+
154+
# Проверяем содержимое ответов (порядок может быть любым)
155+
result_dict = {item["id"]: item for item in result}
156+
157+
assert result_dict[1]["jsonrpc"] == "2.0"
158+
assert result_dict[1]["result"]["message"] == "sync request 1"
159+
assert float(result_dict[1]["result"]["execution_time"]) >= delay1
160+
161+
assert result_dict[2]["jsonrpc"] == "2.0"
162+
assert result_dict[2]["result"]["message"] == "sync request 2"
163+
164+
# Ждем, чтобы убедиться что все задачи завершились
165+
ep_wait_all_requests_done()
166+
167+
# Проверяем что все функции действительно были выполнены (всего 4)
168+
assert len(tracker.executions) == 4
169+
170+
# Проверяем типы выполненных функций (должны быть 2 delayed_method и 2 instant_method)
171+
method_counts = collections.Counter((x[0] for x in tracker.executions))
172+
173+
assert method_counts["delayed_method"] == 2
174+
assert method_counts["instant_method"] == 2

0 commit comments

Comments
 (0)