Skip to content

Conversation

@FredyRivera-dev
Copy link
Contributor

@FredyRivera-dev FredyRivera-dev commented Sep 14, 2025

What does this PR do?

This PR introduces a request-scoped pipeline abstraction and several safety/compatibility improvements that enable running many inference requests in parallel while keeping a single copy of the heavy weights (UNet, VAE, text encoder) in memory.

Main changes:

  • Add RequestScopedPipeline (example implementation and utilities) which:
    • Creates a lightweight per-request view of a pipeline via a shallow-copy (copy.copy).
    • Clones only small, stateful components per-request (scheduler, RNG state, callbacks, small mutable attrs) while sharing large model weights.
    • Detects and skips read-only pipeline properties (e.g., components) to avoid "can't set attribute" errors.
    • Optionally enters a model_cpu_offload_context() to allow memory offload hooks during generation.
  • Add tokenizer concurrency safety:
    • The request-scoped wrapper manages an internal tokenizer lock to avoid Rust tokenizer race conditions (Already borrowed errors).
  • Add retrieve_timesteps(..., return_scheduler=True) helper:
    • Returns (timesteps, num_inference_steps, scheduler) without mutating the shared scheduler.
    • Fully retro-compatible: if return_scheduler=True is not passed, behavior is identical to the previous API.
  • Add fallback heuristics:
    • Prefer scheduler.clone_for_request() when available; otherwise attempt a safe deepcopy() and fall back to logging and safe defaults when cloning fails.
  • Documentation and examples:
    • Add an example/demo server (under examples/DiffusersServer/) showing how to run a single model in memory and serve concurrent inference requests safely.
    • Document recommended flags, environment, and an example POST request for /api/diffusers/inference.
  • Tests & CI:
    • (See "How to test") unit tests and a simple concurrency test harness are included to validate the tokenizer lock and retrieve_timesteps behavior.

Motivation and context

  • A naive concurrent server that calls pipe.__call__ concurrently can hit race conditions (e.g., scheduler.set_timesteps mutating shared scheduler) or accidentally duplicate the full pipeline in memory (deepcopy), exploding GPU memory.
  • This PR provides a light-weight pattern to isolate per-request mutable state while keeping heavy model parameters shared, solving both correctness (race conditions) and memory usage problems.

Files changed / added (high level)

  • src/diffusers/pipelines/pipeline_utils.pyRequestScopedPipeline implementation and utilities
  • src/diffusers/pipelines/stable_diffusion_3/pipeline_stable_diffusion_3.py, src/diffusers/pipelines/flux/pipeline_flux.py, src/diffusers/pipelines/stable_diffusion/pipeline_stable_diffusion.py, src/diffusers/pipelines/stable_diffusion_xl/pipeline_stable_diffusion_xl.py, src/diffusers/pipelines/t2i_adapter/pipeline_stable_diffusion_adapter.py, src/diffusers/pipelines/t2i_adapter/pipeline_stable_diffusion_xl_adapter.pyretrieve_timesteps(..., return_scheduler=True) helper (backwards compatible)
  • src/diffusers/schedulers/* - Implementation of the clone_for_request(self, ...) method to avoid race condition errors (It was adapted for each scheduler)
  • examples/DiffusersServer/ — demo server and helper scripts:
    • serverasync.py (FastAPI app factory / server example)
    • Pipelines.py (pipeline loader classes)
    • uvicorn_diffu.py (recommended uvicorn flags)
    • create_server.py
  • Minor additions to project README describing the example server and the expected behavior

Backward compatibility

  • retrieve_timesteps is fully retro-compatible: if users do not pass return_scheduler=True, the call behaves exactly as before (it will call set_timesteps on the shared scheduler).
  • Existing pipelines and public APIs are not modified in a breaking way. The new RequestScopedPipeline is additive and opt-in for server authors who want safe concurrency.
  • RequestScopedPipeline creates a lightweight, per-request view of a pipeline via a shallow copy and clones only small, mutable components (scheduler, RNG state, callbacks, small lists/dicts). Large model weights (UNet, VAE, text encoder) remain shared and are not duplicated.
  • Scheduler handling and clone_for_request semantics:
    • When available, scheduler.clone_for_request(num_inference_steps, ...) is used as the preferred mechanism to obtain a scheduler configured for a single request. This ensures that any mutations performed by set_timesteps(...) are applied only to the local scheduler copy and never to the shared scheduler.
    • If a scheduler does not implement clone_for_request, retrieve_timesteps(..., return_scheduler=True) attempts safe fallbacks in this order: (1) deepcopy(scheduler) and configure the copy, (2) copy.copy(scheduler) with a logged warning about potential shared-state risk. Only if all cloning strategies fail will the code fall back to mutating the original scheduler (and this is logged as a last-resort warning).
    • This behavior is opt-in: callers who do not request a scheduler (or pass return_scheduler=False) preserve the original, pre-existing semantics.

How to test / reproduce

  1. Install the package in editable mode:
  pip install -e .
  pip install -r examples/DiffusersServer/requirements.txt
  1. Start the example server:
   python examples/DiffusersServer/serverasync.py

or

   python examples/DiffusersServer/uvicorn_diffu.py
  1. Run multiple concurrent requests (example):
   python -c "import requests, concurrent.futures, json
   def r(): return requests.post('http://localhost:8500/api/diffusers/inference', json={'prompt':'A futuristic cityscape','num_inference_steps':30,'num_images_per_prompt':1}).json()
   with concurrent.futures.ThreadPoolExecutor(max_workers=20) as ex: print([ex.submit(r).result() for _ in range(20)])"
  1. Verify that:

    • No Already borrowed tokenizer errors happen under load.
    • GPU memory usage does not grow linearly with requests (heavy weights remain shared).
    • retrieve_timesteps(..., return_scheduler=True) returns the scheduler instance for per-request use and does not mutate the shared scheduler.

Performance notes

  • Small per-request overhead for shallow copy and cloning of small mutable state.
  • Large tensors/weights are shared; this keeps memory usage low while enabling tens of parallel inferences (recommended ~10–50 inferences in parallel depending on hardware).

Security & maintenance notes

  • The example server is a demo/harness and not hardened for production by itself; recommend placing it behind a proper auth/gateway and rate limits.
  • Add monitoring for memory and request queue lengths when deploying.

Before submitting

  • This PR fixes a typo or improves the docs (you can dismiss the other checks if that's the case).
  • Did you read the contributor guideline?
  • Did you read our philosophy doc?
  • Was this discussed/approved via a GitHub issue or the [forum]? Please add a link to it if that's the case.
  • Did you make sure to update the documentation with your changes? (see docs/usage/async_server.md)
  • Did you write any new necessary tests? (see tests/test_request_scoped.py)

Who can review?

Core library / Schedulers / Pipelines: @yiyixuxu, @asomoza, @sayakpaul
General / integrations: @DN6, @stevhliu

@sayakpaul
Copy link
Member

Hello there,

Thanks for this PR. We're touching a bunch of core files in this PR without discussing their scope separately. Our pipelines aren't meant to be async.

To help expedite merging this, I would recommend:

  • Working with a folder under examples like we're doing -- that's perfect.
  • Subclassing a few classes instead of so many pipelines in the beginning. Perhaps, we could just pick one from SDXL, Flux, or Qwen (or whichever you would like to choose).
    • We could add clone_for_request() in the subclassed scheduler implementation, for example -- class AsyncDPMMultistepScheduler.
    • Similarly, we could just maintain async_friendly_retrieve_timesteps() under a utils.py script in the example folder.
  • Showing a full working example with the subclassed implementation.
  • Putting all the details for execution in a README.

This will likely be easier to follow for the future contributors and users looking into understanding what minimal elements they would need to change / add to have an async implementation going for Diffusers.

LMK if any of it is unclear and if you have any thoughts to share.

@FredyRivera-dev
Copy link
Contributor Author

FredyRivera-dev commented Sep 15, 2025

Hey thanks for the feedback, I've already reverted all the changes to the diffusers core and I'm moving all the logic to examples/server-async. In these days I will finish moving all the logic so that they can replicate the asynchronous operation and I will update the README :D @sayakpaul

@sayakpaul
Copy link
Member

sayakpaul commented Sep 15, 2025

Thanks!

It seems there are some remnant changes in the core files, still. Would you mind reverting them as well? But anyway, give me a ping once you think this PR is ready for another review.

@FredyRivera-dev
Copy link
Contributor Author

Sure, I'll review the files I haven't reverted yet, and I'll let you know when it's ready for review :D

@FredyRivera-dev
Copy link
Contributor Author

Hey @sayakpaul I have already completed the rollback of all the diffusers core changes and I have left all the logic in examples/server-async and updated the README in examples/server-async/README.md so that you can replicate all that async server execution, check it out and tell me if everything is ok :D

Copy link
Member

@sayakpaul sayakpaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes. Left two clarification comments.

@@ -0,0 +1,136 @@
# Asynchronous server and parallel execution of models

> Example/demo server that keeps a single model in memory while safely running parallel inference requests by creating per-request lightweight views and cloning only small, stateful components (schedulers, RNG state, small mutable attrs). Works with StableDiffusion3/Flux pipelines and a custom `diffusers` fork.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still have to use a custom diffusers when running this example even with all the changes included? If so, I think we would want to change that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh no no no, I forgot to change it in the README, sorry, I'll update that now


## ⚠️ IMPORTANT

* The server and inference harness live in this repo: `https://github.com/F4k3r22/DiffusersServer`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned earlier, let's try to just stick to a single pipeline example so that it's easier to follow for the users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll cut down the example

@FredyRivera-dev
Copy link
Contributor Author

Hey @sayakpaul , I've already simplified the server and only left the examples with the SD3-3.5 and Flux Pipelines, and updated the README with all the changes, no custom fork is needed

@FredyRivera-dev
Copy link
Contributor Author

In the end I decided to discard the Flux pipeline and only leave the SD3-3.5 ones

Copy link
Member

@sayakpaul sayakpaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for being patient with the feedback!

@sayakpaul
Copy link
Member

@bot /style

@github-actions
Copy link
Contributor

github-actions bot commented Sep 18, 2025

Style bot fixed some files and pushed the changes.

@HuggingFaceDocBuilderDev

The docs for this PR live here. All of your documentation changes will be reflected on that endpoint. The docs are available until 30 days after the last update.

@sayakpaul sayakpaul merged commit eda9ff8 into huggingface:main Sep 18, 2025
25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants