Skip to content

Commit ebfe977

Browse files
CahidArdaclaude
andauthored
feat: add flow control API (#56)
* feat: add flow control API Add client.flow_control namespace with list(), get(), and reset() methods (sync + async). Add FlowControlInfo dataclass. DX-2401 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test: add flow control API tests Test list, get, reset, and search for sync and async clients. DX-2401 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: fmt * DX-2401: Add get_global_parallelism, remove list/reset - Add GlobalParallelismInfo dataclass and get_global_parallelism() method (sync + async) - Remove flow_control.list() and flow_control.reset() methods - Update tests accordingly Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: fmt * fix: retry delay expression parsing * fix: use correct async fixture and method in async message tests --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e28c6b3 commit ebfe977

File tree

8 files changed

+235
-3
lines changed

8 files changed

+235
-3
lines changed

qstash/asyncio/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Literal, Optional, Union
33

44
from qstash.asyncio.dlq import AsyncDlqApi
5+
from qstash.asyncio.flow_control import AsyncFlowControlApi
56
from qstash.asyncio.log import AsyncLogApi
67
from qstash.asyncio.http import AsyncHttpClient
78
from qstash.asyncio.message import AsyncMessageApi
@@ -49,3 +50,6 @@ def __init__(
4950

5051
self.dlq = AsyncDlqApi(self.http)
5152
"""Dlq (Dead Letter Queue) api."""
53+
54+
self.flow_control = AsyncFlowControlApi(self.http)
55+
"""Flow control api."""

qstash/asyncio/flow_control.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from qstash.asyncio.http import AsyncHttpClient
2+
from qstash.flow_control_api import (
3+
FlowControlInfo,
4+
GlobalParallelismInfo,
5+
parse_flow_control_info,
6+
)
7+
8+
9+
class AsyncFlowControlApi:
10+
def __init__(self, http: AsyncHttpClient) -> None:
11+
self._http = http
12+
13+
async def get(self, flow_control_key: str) -> FlowControlInfo:
14+
"""
15+
Gets a single flow control by key.
16+
17+
:param flow_control_key: The flow control key to get.
18+
"""
19+
response = await self._http.request(
20+
path=f"/v2/flowControl/{flow_control_key}",
21+
method="GET",
22+
)
23+
24+
return parse_flow_control_info(response)
25+
26+
async def get_global_parallelism(self) -> GlobalParallelismInfo:
27+
"""
28+
Gets the global parallelism info.
29+
"""
30+
response = await self._http.request(
31+
path="/v2/globalParallelism",
32+
method="GET",
33+
)
34+
35+
return GlobalParallelismInfo(
36+
parallelism_max=response.get("parallelismMax", 0),
37+
parallelism_count=response.get("parallelismCount", 0),
38+
)

qstash/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Optional, Union, Literal
33

44
from qstash.dlq import DlqApi
5+
from qstash.flow_control_api import FlowControlApi
56
from qstash.log import LogApi
67
from qstash.http import RetryConfig, HttpClient
78
from qstash.message import MessageApi
@@ -50,3 +51,6 @@ def __init__(
5051

5152
self.dlq = DlqApi(self.http)
5253
"""Dlq (Dead Letter Queue) api."""
54+
55+
self.flow_control = FlowControlApi(self.http)
56+
"""Flow control api."""

qstash/flow_control_api.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import dataclasses
2+
from typing import Any, Dict
3+
4+
from qstash.http import HttpClient
5+
6+
7+
@dataclasses.dataclass
8+
class FlowControlInfo:
9+
"""Information about a flow control key."""
10+
11+
key: str
12+
"""The flow control key."""
13+
14+
wait_list_size: int
15+
"""The number of messages waiting in the wait list."""
16+
17+
parallelism_max: int
18+
"""The maximum parallelism configured for this flow control key."""
19+
20+
parallelism_count: int
21+
"""The current number of active requests for this flow control key."""
22+
23+
rate_max: int
24+
"""The maximum rate configured for this flow control key."""
25+
26+
rate_count: int
27+
"""The current number of requests consumed in the current period."""
28+
29+
rate_period: int
30+
"""The rate period in seconds."""
31+
32+
rate_period_start: int
33+
"""The start time of the current rate period as a unix timestamp."""
34+
35+
36+
@dataclasses.dataclass
37+
class GlobalParallelismInfo:
38+
"""Information about global parallelism."""
39+
40+
parallelism_max: int
41+
"""The maximum global parallelism."""
42+
43+
parallelism_count: int
44+
"""The current number of active requests globally."""
45+
46+
47+
def parse_flow_control_info(response: Dict[str, Any]) -> FlowControlInfo:
48+
return FlowControlInfo(
49+
key=response["flowControlKey"],
50+
wait_list_size=response.get("waitListSize", 0),
51+
parallelism_max=response.get("parallelismMax", 0),
52+
parallelism_count=response.get("parallelismCount", 0),
53+
rate_max=response.get("rateMax", 0),
54+
rate_count=response.get("rateCount", 0),
55+
rate_period=response.get("ratePeriod", 0),
56+
rate_period_start=response.get("ratePeriodStart", 0),
57+
)
58+
59+
60+
class FlowControlApi:
61+
def __init__(self, http: HttpClient) -> None:
62+
self._http = http
63+
64+
def get(self, flow_control_key: str) -> FlowControlInfo:
65+
"""
66+
Gets a single flow control by key.
67+
68+
:param flow_control_key: The flow control key to get.
69+
"""
70+
response = self._http.request(
71+
path=f"/v2/flowControl/{flow_control_key}",
72+
method="GET",
73+
)
74+
75+
return parse_flow_control_info(response)
76+
77+
def get_global_parallelism(self) -> GlobalParallelismInfo:
78+
"""
79+
Gets the global parallelism info.
80+
"""
81+
response = self._http.request(
82+
path="/v2/globalParallelism",
83+
method="GET",
84+
)
85+
86+
return GlobalParallelismInfo(
87+
parallelism_max=response.get("parallelismMax", 0),
88+
parallelism_count=response.get("parallelismCount", 0),
89+
)

qstash/log.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ def parse_logs_response(response: List[Dict[str, Any]]) -> List[Log]:
246246
flow_control=flow_control,
247247
method=event.get("method"),
248248
max_retries=event.get("maxRetries"),
249-
retry_delay_expression=event.get("retryDelayExpression"),
249+
retry_delay_expression=event.get("retryDelayExpr"),
250250
label=event.get("label"),
251251
)
252252
)
@@ -285,6 +285,8 @@ def list(
285285
params=params,
286286
)
287287

288+
print(response)
289+
288290
logs = parse_logs_response(response["events"])
289291

290292
return ListLogsResponse(

tests/asyncio/test_flow_control.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import asyncio
2+
3+
import pytest
4+
5+
from qstash import AsyncQStash
6+
from qstash.flow_control_api import GlobalParallelismInfo
7+
from qstash.message import FlowControl, PublishResponse
8+
9+
10+
FLOW_CONTROL_KEY = "test-flow-control-key-async"
11+
12+
13+
@pytest.mark.asyncio
14+
async def test_flow_control_get_async(async_client: AsyncQStash) -> None:
15+
# Publish a message with flow control to ensure the key exists
16+
result = await async_client.message.publish_json(
17+
body={"test": "value"},
18+
url="https://httpstat.us/200?sleep=30000",
19+
flow_control=FlowControl(
20+
key=FLOW_CONTROL_KEY,
21+
parallelism=5,
22+
rate=10,
23+
period="1m",
24+
),
25+
)
26+
assert isinstance(result, PublishResponse)
27+
assert result.message_id
28+
29+
# Small delay to let flow control state propagate
30+
await asyncio.sleep(1)
31+
32+
# Get a single flow control by key
33+
single = await async_client.flow_control.get(FLOW_CONTROL_KEY)
34+
assert single.key == FLOW_CONTROL_KEY
35+
assert isinstance(single.wait_list_size, int)
36+
assert isinstance(single.parallelism_max, int)
37+
assert isinstance(single.parallelism_count, int)
38+
39+
# Clean up message
40+
await async_client.message.cancel(result.message_id)
41+
42+
43+
@pytest.mark.asyncio
44+
async def test_flow_control_get_global_parallelism_async(
45+
async_client: AsyncQStash,
46+
) -> None:
47+
info = await async_client.flow_control.get_global_parallelism()
48+
assert isinstance(info, GlobalParallelismInfo)
49+
assert isinstance(info.parallelism_max, int)
50+
assert isinstance(info.parallelism_count, int)

tests/asyncio/test_message.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ async def test_publish_to_url_group_async(async_client: AsyncQStash) -> None:
288288
)
289289

290290
res = await async_client.message.publish(
291+
method="GET",
291292
body="test-body",
292293
url_group=name,
293294
)
@@ -385,10 +386,10 @@ async def test_publish_to_api_llm_custom_provider_async(
385386
@pytest.mark.asyncio
386387
async def test_enqueue_api_llm_custom_provider_async(
387388
async_client: AsyncQStash,
388-
cleanup_queue: Callable[[AsyncQStash, str], None],
389+
cleanup_queue_async: Callable[[AsyncQStash, str], None],
389390
) -> None:
390391
name = "test_queue"
391-
cleanup_queue(async_client, name)
392+
cleanup_queue_async(async_client, name)
392393

393394
res = await async_client.message.enqueue_json(
394395
queue=name,

tests/test_flow_control.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import time
2+
3+
from qstash import QStash
4+
from qstash.flow_control_api import GlobalParallelismInfo
5+
from qstash.message import FlowControl, PublishResponse
6+
7+
8+
FLOW_CONTROL_KEY = "test-flow-control-key"
9+
10+
11+
def test_flow_control_get(client: QStash) -> None:
12+
# Publish a message with flow control to ensure the key exists
13+
result = client.message.publish_json(
14+
body={"test": "value"},
15+
url="https://httpstat.us/200?sleep=30000",
16+
flow_control=FlowControl(
17+
key=FLOW_CONTROL_KEY,
18+
parallelism=5,
19+
rate=10,
20+
period="1m",
21+
),
22+
)
23+
assert isinstance(result, PublishResponse)
24+
assert result.message_id
25+
26+
# Small delay to let flow control state propagate
27+
time.sleep(1)
28+
29+
# Get a single flow control by key
30+
single = client.flow_control.get(FLOW_CONTROL_KEY)
31+
assert single.key == FLOW_CONTROL_KEY
32+
assert isinstance(single.wait_list_size, int)
33+
assert isinstance(single.parallelism_max, int)
34+
assert isinstance(single.parallelism_count, int)
35+
36+
# Clean up message
37+
client.message.cancel(result.message_id)
38+
39+
40+
def test_flow_control_get_global_parallelism(client: QStash) -> None:
41+
info = client.flow_control.get_global_parallelism()
42+
assert isinstance(info, GlobalParallelismInfo)
43+
assert isinstance(info.parallelism_max, int)
44+
assert isinstance(info.parallelism_count, int)

0 commit comments

Comments
 (0)