Skip to content

Commit c1fa860

Browse files
authored
feat(python): add max_concurrency and concurrency_timeout for UDF protection (#20)
* feat(python): add max_concurrency and concurrency_timeout for UDF protection - Add max_concurrency parameter to limit concurrent requests per function - Add concurrency_timeout parameter to control wait behavior - Default behavior: wait indefinitely when limit is reached - With timeout: reject with ConcurrencyLimitExceeded after timeout expires - Add Prometheus counter for rejected requests - Add comprehensive tests for concurrency limiting - Update README with documentation * fix: only release semaphore if successfully acquired Fix bug where semaphore.release() was called in finally block even when acquire() timed out and returned False. This would incorrectly increment the semaphore counter, allowing more concurrent requests than configured. * fix: only release semaphore if successfully acquired Fix bug where semaphore.release() was called in finally block even when acquire() timed out and returned False. This would incorrectly increment the semaphore counter, allowing more concurrent requests than configured. Also remove Chinese comments from comprehensive_server.py. * fix: remove unused imports in test_max_concurrency.py * style: format code with ruff
1 parent 7d7bc29 commit c1fa860

File tree

7 files changed

+497
-61
lines changed

7 files changed

+497
-61
lines changed

python/README.md

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,63 @@ SELECT * FROM expand_pairs([1, 2, 3], [10, 20, 30]);
147147

148148
### I/O Bound Functions
149149

150+
For I/O-bound functions (e.g., network requests, file operations), use `io_threads` to enable parallel processing of rows within a batch:
151+
150152
```python
151153
import time
152154

153155
@udf(input_types=["INT"], result_type="INT", io_threads=32)
154156
def fetch_data(id: int) -> int:
155-
time.sleep(0.1)
157+
time.sleep(0.1) # Simulates I/O operation
156158
return id * 2
157159
```
158160

161+
**Note:** `io_threads` controls parallelism within a single batch, not across requests.
162+
163+
### Concurrency Limiting
164+
165+
To protect your UDF server from being overwhelmed, use `max_concurrency` to limit the number of concurrent requests per function:
166+
167+
```python
168+
from databend_udf import udf, UDFServer
169+
170+
@udf(input_types=["INT"], result_type="INT", max_concurrency=10)
171+
def expensive_operation(x: int) -> int:
172+
# Only 10 concurrent requests allowed
173+
# Additional requests will wait for a slot
174+
return x * 2
175+
```
176+
177+
By default, when the limit is reached, new requests **wait** until a slot becomes available. You can set a timeout to reject requests that wait too long:
178+
179+
```python
180+
@udf(
181+
input_types=["INT"],
182+
result_type="INT",
183+
max_concurrency=10,
184+
concurrency_timeout=30, # Wait up to 30 seconds, then reject
185+
)
186+
def expensive_operation(x: int) -> int:
187+
return x * 2
188+
```
189+
190+
When the timeout expires, a `ConcurrencyLimitExceeded` error is raised.
191+
192+
You can combine `io_threads` and `max_concurrency`:
193+
194+
```python
195+
@udf(
196+
input_types=["INT"],
197+
result_type="INT",
198+
io_threads=32, # 32 threads for I/O within each request
199+
max_concurrency=5, # But only 5 concurrent requests allowed
200+
concurrency_timeout=60, # Wait up to 60s for a slot
201+
)
202+
def api_call(id: int) -> int:
203+
# Protected from too many concurrent requests
204+
return fetch_from_api(id)
205+
```
206+
159207
## Configuration
160208

161209
### Databend Config
@@ -170,14 +218,16 @@ udf_server_allow_list = ["http://0.0.0.0:8815"]
170218

171219
### UDF Decorator Parameters
172220

173-
| Parameter | Type | Default | Description |
174-
| ------------- | ------------------ | ------- | ---------------------------------------- |
175-
| `input_types` | `List[str]` | - | Input SQL types |
176-
| `result_type` | `str \| List[Tuple]` | - | Return SQL type or table schema |
177-
| `name` | `str` | None | Custom function name |
178-
| `skip_null` | `bool` | False | Auto-return NULL for NULL inputs |
179-
| `io_threads` | `int` | None | I/O thread pool size |
180-
| `batch_mode` | `bool` | False | Enable for table functions |
221+
| Parameter | Type | Default | Description |
222+
| --------------------- | ------------------ | ------- | ---------------------------------------------------- |
223+
| `input_types` | `List[str]` | - | Input SQL types |
224+
| `result_type` | `str \| List[Tuple]` | - | Return SQL type or table schema |
225+
| `name` | `str` | None | Custom function name |
226+
| `skip_null` | `bool` | False | Auto-return NULL for NULL inputs |
227+
| `io_threads` | `int` | 32 | Thread pool size for parallel row processing |
228+
| `batch_mode` | `bool` | False | Enable for table functions |
229+
| `max_concurrency` | `int` | None | Max concurrent requests (None = unlimited) |
230+
| `concurrency_timeout` | `float` | None | Seconds to wait for a slot (None = wait forever) |
181231

182232
## Additional Resources
183233

python/databend_udf/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from .udf import * # noqa
2+
from .udf import ConcurrencyLimitExceeded
23
from .client import UDFClient, create_client
34

4-
__all__ = ["UDFClient", "create_client"]
5+
__all__ = ["UDFClient", "create_client", "ConcurrencyLimitExceeded"]

0 commit comments

Comments
 (0)