-
Notifications
You must be signed in to change notification settings - Fork 32
♻️ Adding lifespan support for FastAPI & migrated dynamic-scheduler to use it
#7149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c532b72
6480441
eb83410
a8408e7
2d90725
4515cc0
ef6ae25
64303c5
a4c88fc
94b9e14
b470790
66734d6
89e4551
bfaed0a
a53d334
9dfd51b
8638125
b0c4805
aa78a76
2bbf0e7
2981bd5
2b92ddf
84da950
6d3cadd
0317f2c
29dd23f
8758e10
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| from collections.abc import AsyncIterator | ||
| from typing import Protocol | ||
|
|
||
| from fastapi import FastAPI | ||
| from fastapi_lifespan_manager import LifespanManager, State | ||
|
|
||
|
|
||
| class LifespanGenerator(Protocol): | ||
| def __call__(self, app: FastAPI) -> AsyncIterator["State"]: | ||
| ... | ||
|
|
||
|
|
||
| def combine_lifespans(*generators: LifespanGenerator) -> LifespanManager: | ||
|
|
||
| manager = LifespanManager() | ||
|
|
||
| for generator in generators: | ||
| manager.add(generator) | ||
|
|
||
| return manager | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,28 +1,62 @@ | ||
| # pylint: disable=protected-access | ||
|
|
||
|
|
||
| from collections.abc import AsyncIterator | ||
|
|
||
| from fastapi import FastAPI | ||
| from fastapi_lifespan_manager import State | ||
| from prometheus_client import CollectorRegistry | ||
| from prometheus_fastapi_instrumentator import Instrumentator | ||
|
|
||
|
|
||
| def setup_prometheus_instrumentation(app: FastAPI) -> Instrumentator: | ||
| def initialize_prometheus_instrumentation(app: FastAPI) -> None: | ||
| # NOTE: this cannot be ran once the application is started | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. THOUGHT: ideally i would enforce this comment with code (e.g. with a decorator that if the app is started, it raises) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would not know how to do that. I think this is also fine since an error will be raised pointing you to this function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about checking that the state that you initialize in this |
||
|
|
||
| # NOTE: use that registry to prevent having a global one | ||
| app.state.prometheus_registry = registry = CollectorRegistry(auto_describe=True) | ||
| instrumentator = Instrumentator( | ||
| app.state.prometheus_instrumentator = Instrumentator( | ||
| should_instrument_requests_inprogress=False, # bug in https://github.com/trallnag/prometheus-fastapi-instrumentator/issues/317 | ||
| inprogress_labels=False, | ||
| registry=registry, | ||
| ).instrument(app) | ||
| ) | ||
| app.state.prometheus_instrumentator.instrument(app) | ||
|
|
||
|
|
||
| def _startup(app: FastAPI) -> None: | ||
| assert isinstance(app.state.prometheus_instrumentator, Instrumentator) # nosec | ||
| app.state.prometheus_instrumentator.expose(app, include_in_schema=False) | ||
|
|
||
|
|
||
| def _shutdown(app: FastAPI) -> None: | ||
| assert isinstance(app.state.prometheus_registry, CollectorRegistry) # nosec | ||
| registry = app.state.prometheus_registry | ||
| for collector in list(registry._collector_to_names.keys()): # noqa: SLF001 | ||
| registry.unregister(collector) | ||
|
|
||
|
|
||
| def get_prometheus_instrumentator(app: FastAPI) -> Instrumentator: | ||
| assert isinstance(app.state.prometheus_instrumentator, Instrumentator) # nosec | ||
| return app.state.prometheus_instrumentator | ||
|
|
||
|
|
||
| def setup_prometheus_instrumentation(app: FastAPI) -> Instrumentator: | ||
| initialize_prometheus_instrumentation(app) | ||
|
|
||
| async def _on_startup() -> None: | ||
| instrumentator.expose(app, include_in_schema=False) | ||
| _startup(app) | ||
|
|
||
| def _unregister() -> None: | ||
| # NOTE: avoid registering collectors multiple times when running unittests consecutively (https://stackoverflow.com/a/62489287) | ||
| for collector in list(registry._collector_to_names.keys()): # noqa: SLF001 | ||
| registry.unregister(collector) | ||
| def _on_shutdown() -> None: | ||
| _shutdown(app) | ||
|
|
||
| app.add_event_handler("startup", _on_startup) | ||
| app.add_event_handler("shutdown", _unregister) | ||
| return instrumentator | ||
| app.add_event_handler("shutdown", _on_shutdown) | ||
|
|
||
| return get_prometheus_instrumentator(app) | ||
|
|
||
|
|
||
| async def lifespan_prometheus_instrumentation(app: FastAPI) -> AsyncIterator[State]: | ||
| # NOTE: requires ``initialize_prometheus_instrumentation`` to be called before the | ||
GitHK marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # lifespan of the applicaiton runs, usually rigth after the ``FastAPI`` instance is created | ||
| _startup(app) | ||
| yield {} | ||
| _shutdown(app) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| from collections.abc import AsyncIterator | ||
|
|
||
| import asgi_lifespan | ||
| import pytest | ||
| from fastapi import FastAPI | ||
| from fastapi_lifespan_manager import State | ||
| from servicelib.fastapi.lifespan_utils import combine_lifespans | ||
|
|
||
|
|
||
| async def test_multiple_lifespan_managers(capsys: pytest.CaptureFixture): | ||
GitHK marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| async def database_lifespan(app: FastAPI) -> AsyncIterator[State]: | ||
| _ = app | ||
| print("setup DB") | ||
| yield {} | ||
| print("shutdown DB") | ||
|
|
||
| async def cache_lifespan(app: FastAPI) -> AsyncIterator[State]: | ||
| _ = app | ||
| print("setup CACHE") | ||
| yield {} | ||
| print("shutdown CACHE") | ||
|
|
||
| app = FastAPI(lifespan=combine_lifespans(database_lifespan, cache_lifespan)) | ||
|
|
||
| capsys.readouterr() | ||
|
|
||
| async with asgi_lifespan.LifespanManager(app): | ||
| messages = capsys.readouterr().out | ||
|
|
||
| assert "setup DB" in messages | ||
| assert "setup CACHE" in messages | ||
| assert "shutdown DB" not in messages | ||
| assert "shutdown CACHE" not in messages | ||
|
|
||
| messages = capsys.readouterr().out | ||
|
|
||
| assert "setup DB" not in messages | ||
| assert "setup CACHE" not in messages | ||
| assert "shutdown DB" in messages | ||
| assert "shutdown CACHE" in messages | ||
Uh oh!
There was an error while loading. Please reload this page.