You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: examples/server-async/README.md
+47-19Lines changed: 47 additions & 19 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -1,11 +1,10 @@
1
1
# Asynchronous server and parallel execution of models
2
2
3
3
> Example/demo server that keeps a single model in memory while safely running parallel inference requests by creating per-request lightweight views and cloning only small, stateful components (schedulers, RNG state, small mutable attrs). Works with StableDiffusion3/Flux pipelines and a custom `diffusers` fork.
4
-
> We recommend running about 10 to 50 inferences in parallel to have a good performance of 25-30s to 1-1:30min on average
4
+
> We recommend running 10 to 50 inferences in parallel for optimal performance, averaging between 25 and 30 seconds to 1 minute and 1 minute and 30 seconds. (This is only recommended if you have a GPU with 35GB of VRAM or more; otherwise, keep it to one or two inferences in parallel to avoid decoding or saving errors due to memory shortages.)
5
5
6
6
## ⚠️ IMPORTANT
7
7
8
-
* This example uses a custom Diffusers fork: `https://github.com/F4k3r22/diffusers-async`.
9
8
* The server and inference harness live in this repo: `https://github.com/F4k3r22/DiffusersServer`.
10
9
The example demonstrates how to run pipelines like `StableDiffusion3-3.5` and `Flux.1` concurrently while keeping a single copy of the heavy model parameters on GPU.
11
10
@@ -15,7 +14,11 @@ All the components needed to create the inference server are in `DiffusersServer
15
14
16
15
```
17
16
DiffusersServer/
18
-
├── **init**.py
17
+
├── utils/
18
+
├─────── __init__.py
19
+
├─────── scheduler.py # BaseAsyncScheduler wrapper and async_retrieve_timesteps for secure inferences
20
+
├─────── requestscopedpipeline.py # RequestScoped Pipeline for inference with a single in-memory model
21
+
├── __init__.py
19
22
├── create_server.py # helper script to build/run the app programmatically
@@ -29,10 +32,11 @@ Core problem: a naive server that calls `pipe.__call__` concurrently can hit **r
29
32
`diffusers-async` / this example addresses that by:
30
33
31
34
***Request-scoped views**: `RequestScopedPipeline` creates a shallow copy of the pipeline per request so heavy weights (UNet, VAE, text encoder) remain shared and *are not duplicated*.
32
-
***Per-request mutable state**: stateful small objects (scheduler, RNG state, small lists/dicts, callbacks) are cloned per request. Where available we call `scheduler.clone_for_request(...)`, otherwise we fallback to safe `deepcopy` or other heuristics.
33
-
***Tokenizer concurrency safety**: `RequestScopedPipeline` now manages an internal tokenizer lock. This ensures that Rust tokenizers are safe to use under concurrency — race condition errors like `Already borrowed` no longer occur.
34
-
***`retrieve_timesteps(..., return_scheduler=True)`**: fully retro-compatible helper that returns `(timesteps, num_inference_steps, scheduler)` without mutating the shared scheduler. For users not using `return_scheduler=True`, the behavior is identical to the original API.
35
-
***Robust attribute handling**: wrapper avoids writing to read-only properties (e.g., `components`) and auto-detects small mutable attributes to clone while avoiding duplication of large tensors.
35
+
***Per-request mutable state**: stateful small objects (scheduler, RNG state, small lists/dicts, callbacks) are cloned per request. The system uses `BaseAsyncScheduler.clone_for_request(...)` for scheduler cloning, with fallback to safe `deepcopy` or other heuristics.
36
+
***Tokenizer concurrency safety**: `RequestScopedPipeline` now manages an internal tokenizer lock with automatic tokenizer detection and wrapping. This ensures that Rust tokenizers are safe to use under concurrency — race condition errors like `Already borrowed` no longer occur.
37
+
***`async_retrieve_timesteps(..., return_scheduler=True)`**: fully retro-compatible helper that returns `(timesteps, num_inference_steps, scheduler)` without mutating the shared scheduler. For users not using `return_scheduler=True`, the behavior is identical to the original API.
38
+
***Robust attribute handling**: wrapper avoids writing to read-only properties (e.g., `components`) and auto-detects small mutable attributes to clone while avoiding duplication of large tensors. Configurable tensor size threshold prevents cloning of large tensors.
39
+
***Enhanced scheduler wrapping**: `BaseAsyncScheduler` automatically wraps schedulers with improved `__getattr__`, `__setattr__`, and debugging methods (`__repr__`, `__str__`).
36
40
37
41
## How the server works (high-level flow)
38
42
@@ -41,10 +45,12 @@ Core problem: a naive server that calls `pipe.__call__` concurrently can hit **r
41
45
42
46
* The server uses `RequestScopedPipeline.generate(...)` which:
43
47
48
+
* automatically wraps the base scheduler in `BaseAsyncScheduler` (if not already wrapped),
44
49
* obtains a *local scheduler* (via `clone_for_request()` or `deepcopy`),
45
50
* does `local_pipe = copy.copy(base_pipe)` (shallow copy),
mutable_attrs=None, # Custom list of attributes to clone
106
+
auto_detect_mutables=True, # Enable automatic detection of mutable attributes
107
+
tensor_numel_threshold=1_000_000, # Tensor size threshold for cloning
108
+
tokenizer_lock=None, # Custom threading lock for tokenizers
109
+
wrap_scheduler=True# Auto-wrap scheduler in BaseAsyncScheduler
110
+
)
111
+
```
112
+
113
+
### BaseAsyncScheduler Features
114
+
115
+
* Transparent proxy to the original scheduler with `__getattr__` and `__setattr__`
116
+
*`clone_for_request()` method for safe per-request scheduler cloning
117
+
* Enhanced debugging with `__repr__` and `__str__` methods
118
+
* Full compatibility with existing scheduler APIs
119
+
96
120
## Troubleshooting (quick)
97
121
98
122
*`Already borrowed` — previously a Rust tokenizer concurrency error.
99
-
✅ This is now fixed: `RequestScopedPipeline`manages an internal tokenizer lock so race conditions no longer happen.
123
+
✅ This is now fixed: `RequestScopedPipeline`automatically detects and wraps tokenizers with thread locks, so race conditions no longer happen.
100
124
101
125
*`can't set attribute 'components'` — pipeline exposes read-only `components`.
102
-
103
-
* The RequestScopedPipeline now detects read-only properties and skips setting them.
126
+
✅ The RequestScopedPipeline now detects read-only properties and skips setting them automatically.
104
127
105
128
* Scheduler issues:
129
+
* If the scheduler doesn't implement `clone_for_request` and `deepcopy` fails, we log and fallback — but prefer `async_retrieve_timesteps(..., return_scheduler=True)` to avoid mutating the shared scheduler.
130
+
✅ Note: `async_retrieve_timesteps` is fully retro-compatible — if you don't pass `return_scheduler=True`, the behavior is unchanged.
131
+
132
+
* Memory issues with large tensors:
133
+
✅ The system now has configurable `tensor_numel_threshold` to prevent cloning of large tensors while still cloning small mutable ones.
106
134
107
-
* If the scheduler doesn't implement `clone_for_request` and `deepcopy` fails, we log and fallback — but prefer `retrieve_timesteps(..., return_scheduler=True)` to avoid mutating the shared scheduler.
108
-
* ✅ Note: `retrieve_timesteps` is fully retro-compatible — if you don’t pass `return_scheduler=True`, the behavior is unchanged.
135
+
* Automatic tokenizer detection:
136
+
✅ The system automatically identifies tokenizer components by checking for tokenizer methods, class names, and attributes, then applies thread-safe wrappers.
0 commit comments