-
Notifications
You must be signed in to change notification settings - Fork 404
Wrap the Rust HTTP client with make_deferred_yieldable
#18903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 20 commits
986e15c
5e555f3
4e0085c
763cdb9
ce6f16d
d4558df
dce9754
9def6f7
77970eb
b89fb2e
34b6f2a
8a9d682
2d341cb
58c1209
9a4e67d
59516a4
74f01a9
4424a88
e6df6ee
92ea10a
5c324bc
e243b0f
ab453b7
a0051f7
e35c7db
1d9ae7f
311969f
b670864
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Wrap the Rust HTTP client with `make_deferred_yieldable` so it follows Synapse logcontext rules. |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -19,6 +19,7 @@ use futures::TryStreamExt; | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use once_cell::sync::OnceCell; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use pyo3::{create_exception, exceptions::PyException, prelude::*}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use reqwest::RequestBuilder; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use std::sync::OnceLock; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use tokio::runtime::Runtime; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use crate::errors::HttpResponseException; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -218,29 +219,34 @@ impl HttpClient { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| builder: RequestBuilder, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| response_limit: usize, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> PyResult<Bound<'a, PyAny>> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| create_deferred(py, self.reactor.bind(py), async move { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let response = builder.send().await.context("sending request")?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let status = response.status(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let mut stream = response.bytes_stream(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let mut buffer = Vec::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while let Some(chunk) = stream.try_next().await.context("reading body")? { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if buffer.len() + chunk.len() > response_limit { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Err(anyhow::anyhow!("Response size too large"))?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // We use `make_deferred_yieldable` to make the returned deferred follow Synapse | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // logcontext rules. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Ok(make_deferred_yieldable( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| py, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| &create_deferred(py, self.reactor.bind(py), async move { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let response = builder.send().await.context("sending request")?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let status = response.status(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let mut stream = response.bytes_stream(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let mut buffer = Vec::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while let Some(chunk) = stream.try_next().await.context("reading body")? { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if buffer.len() + chunk.len() > response_limit { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Err(anyhow::anyhow!("Response size too large"))?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| buffer.extend_from_slice(&chunk); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| buffer.extend_from_slice(&chunk); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !status.is_success() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return Err(HttpResponseException::new(status, buffer)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !status.is_success() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return Err(HttpResponseException::new(status, buffer)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let r = Python::with_gil(|py| buffer.into_pyobject(py).map(|o| o.unbind()))?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let r = Python::with_gil(|py| buffer.into_pyobject(py).map(|o| o.unbind()))?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Ok(r) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Ok(r) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| })?, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| )) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -301,3 +307,23 @@ where | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Ok(deferred) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| static MAKE_DEFERRED_YIELDABLE: OnceLock<pyo3::Py<pyo3::PyAny>> = OnceLock::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Given a deferred, make it follow the Synapse logcontext rules | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| fn make_deferred_yieldable<'py>( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| py: Python<'py>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| deferred: &Bound<'py, PyAny>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> Bound<'py, PyAny> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let make_deferred_yieldable = MAKE_DEFERRED_YIELDABLE.get_or_init(|| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let sys = PyModule::import(py, "synapse.logging.context").unwrap(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let func = sys.getattr("make_deferred_yieldable").unwrap().unbind(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| make_deferred_yieldable | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .call1(py, (deferred,)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .unwrap() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .extract(py) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .unwrap() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
308
to
320
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is based on @reivilibre suggestion in #18903 (comment) But please double-check that I didn't fumble the lifetimes, etc trying to kludge things together. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally we'd avoid
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had to adjust it a bit. Updated to this: static MAKE_DEFERRED_YIELDABLE: OnceLock<pyo3::Py<pyo3::PyAny>> = OnceLock::new();
/// Given a deferred, make it follow the Synapse logcontext rules
fn make_deferred_yieldable<'py>(
py: Python<'py>,
deferred: &Bound<'py, PyAny>,
) -> PyResult<Bound<'py, PyAny>> {
let make_deferred_yieldable = MAKE_DEFERRED_YIELDABLE.get_or_init(|| {
let sys = PyModule::import(py, "synapse.logging.context").unwrap();
let func = sys.getattr("make_deferred_yieldable").unwrap().unbind();
func
});
make_deferred_yieldable.call1(py, (deferred,))?.extract(py)
} |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,6 +17,10 @@ from twisted.internet.defer import Deferred | |||||||||
| from synapse.types import ISynapseReactor | ||||||||||
|
|
||||||||||
| class HttpClient: | ||||||||||
| """ | ||||||||||
| The returned deferreds follow Synapse logcontext rules. | ||||||||||
| """ | ||||||||||
|
Comment on lines
+20
to
+22
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we should just remove this as we should just assume that everything in Synapse does follow the Synapse logcontext rules unless otherwise stated. For example: synapse/synapse/logging/context.py Lines 853 to 856 in d1c96ee
|
||||||||||
|
|
||||||||||
| def __init__(self, reactor: ISynapseReactor, user_agent: str) -> None: ... | ||||||||||
| def get(self, url: str, response_limit: int) -> Deferred[bytes]: ... | ||||||||||
| def post( | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| # This file is licensed under the Affero General Public License (AGPL) version 3. | ||
| # | ||
| # Copyright (C) 2025 New Vector, Ltd | ||
| # | ||
| # This program is free software: you can redistribute it and/or modify | ||
| # it under the terms of the GNU Affero General Public License as | ||
| # published by the Free Software Foundation, either version 3 of the | ||
| # License, or (at your option) any later version. | ||
| # | ||
| # See the GNU Affero General Public License for more details: | ||
| # <https://www.gnu.org/licenses/agpl-3.0.html>. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make the
make_deferred_yieldablelogic part ofcreate_deferred. I don't see a scenario where we don't want thisThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that makes sense 👍
Usually, we use
make_deferred_yieldable(...)on third-party code (out of our control) that doesn't follow Synapse logcontext rules. But we control this code and can make the deferred do the right thing as we create it.