|
| 1 | +use std::collections::HashMap; |
1 | 2 | use std::ops::ControlFlow; |
2 | 3 | use std::sync::Arc; |
3 | 4 | use std::time::Duration; |
@@ -61,16 +62,24 @@ pub fn build_client() -> anyhow::Result<reqwest::Client> { |
61 | 62 | .build()?) |
62 | 63 | } |
63 | 64 |
|
64 | | -fn topic_request(endpoint: &str, topic: &str, since: u64) -> anyhow::Result<reqwest::Request> { |
| 65 | +fn topic_request( |
| 66 | + client: &reqwest::Client, |
| 67 | + endpoint: &str, |
| 68 | + topic: &str, |
| 69 | + since: u64, |
| 70 | + username: Option<&str>, |
| 71 | + password: Option<&str>, |
| 72 | +) -> anyhow::Result<reqwest::Request> { |
65 | 73 | let url = models::Subscription::build_url(endpoint, topic, since)?; |
66 | | - let mut req = reqwest::Request::new(reqwest::Method::GET, url); |
67 | | - let headers = req.headers_mut(); |
68 | | - headers.append( |
69 | | - "Content-Type", |
70 | | - HeaderValue::from_static("application/x-ndjson"), |
71 | | - ); |
72 | | - headers.append("Transfer-Encoding", HeaderValue::from_static("chunked")); |
73 | | - Ok(req) |
| 74 | + let mut req = client |
| 75 | + .get(url) |
| 76 | + .header("Content-Type", "application/x-ndjson") |
| 77 | + .header("Transfer-Encoding", "chunked"); |
| 78 | + if let Some(username) = username { |
| 79 | + req = req.basic_auth(username, password); |
| 80 | + } |
| 81 | + |
| 82 | + Ok(req.build()?) |
74 | 83 | } |
75 | 84 |
|
76 | 85 | async fn response_lines( |
@@ -161,8 +170,36 @@ impl TopicListener { |
161 | 170 |
|
162 | 171 | #[instrument(skip_all)] |
163 | 172 | async fn recv_and_forward(&mut self) -> anyhow::Result<()> { |
164 | | - let req = topic_request(&self.endpoint, &self.topic, self.since)?; |
165 | | - let res = self.env.http.execute(req).await?; |
| 173 | + let (username, password) = { |
| 174 | + let attrs = HashMap::from([("type", "password"), ("server", &self.endpoint)]); |
| 175 | + let items = self |
| 176 | + .env |
| 177 | + .keyring |
| 178 | + .search_items(attrs) |
| 179 | + .await |
| 180 | + .map_err(|e| capnp::Error::failed(e.to_string()))?; |
| 181 | + |
| 182 | + if let Some(item) = items.into_iter().next() { |
| 183 | + let attrs = item |
| 184 | + .attributes() |
| 185 | + .await |
| 186 | + .map_err(|e| capnp::Error::failed(e.to_string()))?; |
| 187 | + let password = item.secret().await?; |
| 188 | + let password = std::str::from_utf8(&*password)?; |
| 189 | + (attrs.get("username").cloned(), Some(password.to_string())) |
| 190 | + } else { |
| 191 | + (None, None) |
| 192 | + } |
| 193 | + }; |
| 194 | + let req = topic_request( |
| 195 | + &self.env.http, |
| 196 | + &self.endpoint, |
| 197 | + &self.topic, |
| 198 | + self.since, |
| 199 | + username.as_deref(), |
| 200 | + password.as_deref(), |
| 201 | + ); |
| 202 | + let res = self.env.http.execute(req?).await?; |
166 | 203 | let reader = tokio_util::io::StreamReader::new( |
167 | 204 | res.bytes_stream() |
168 | 205 | .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string())), |
|
0 commit comments