Skip to content

Commit 754a382

Browse files
committed
Stop streams with the same request id
1 parent 3784265 commit 754a382

File tree

1 file changed

+22
-7
lines changed

1 file changed

+22
-7
lines changed

lighthouse-client/src/lighthouse.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,18 +188,28 @@ impl<S> Lighthouse<S>
188188

189189
/// Stops the given stream. **Should generally not be called manually**,
190190
/// since streams will automatically be stopped once dropped.
191-
pub async fn stop(&self, path: &[&str]) -> Result<ServerMessage<()>> {
192-
self.perform(&Verb::Stop, path, ()).await
191+
pub async fn stop(&self, request_id: i32, path: &[&str]) -> Result<ServerMessage<()>> {
192+
self.perform_with_id(request_id, &Verb::Stop, path, ()).await
193193
}
194194

195195
/// Performs a single request to the given path with the given payload.
196196
#[tracing::instrument(skip(self, payload))]
197197
pub async fn perform<P, R>(&self, verb: &Verb, path: &[&str], payload: P) -> Result<ServerMessage<R>>
198+
where
199+
P: Serialize,
200+
R: for<'de> Deserialize<'de> {
201+
let request_id = self.next_request_id();
202+
self.perform_with_id(request_id, verb, path, payload).await
203+
}
204+
205+
/// Performs a single request to the given path with the given request id.
206+
#[tracing::instrument(skip(self, payload))]
207+
async fn perform_with_id<P, R>(&self, request_id: i32, verb: &Verb, path: &[&str], payload: P) -> Result<ServerMessage<R>>
198208
where
199209
P: Serialize,
200210
R: for<'de> Deserialize<'de> {
201211
assert_ne!(verb, &Verb::Stream, "Lighthouse::perform may only be used for one-off requests, use Lighthouse::stream for streaming.");
202-
let request_id = self.send_request(verb, path, payload).await?;
212+
self.send_request(request_id, verb, path, payload).await?;
203213
let response = self.receive_single(request_id).await?.check()?.decode_payload()?;
204214
Ok(response)
205215
}
@@ -211,7 +221,8 @@ impl<S> Lighthouse<S>
211221
where
212222
P: Serialize,
213223
R: for<'de> Deserialize<'de> {
214-
let request_id = self.send_request(&Verb::Stream, path, payload).await?;
224+
let request_id = self.next_request_id();
225+
self.send_request(request_id, &Verb::Stream, path, payload).await?;
215226
let stream = self.receive_streaming(request_id).await?;
216227
Ok(stream.guard({
217228
// Stop the stream on drop
@@ -222,7 +233,7 @@ impl<S> Lighthouse<S>
222233
// TODO: Find a more elegant way to pass the path, ideally without
223234
// converting back and forth between Vec<String>, Vec<&str> and &[&str]
224235
let path: Vec<_> = path.iter().map(|s| &**s).collect();
225-
if let Err(error) = this.stop(&path).await {
236+
if let Err(error) = this.stop(request_id, &path).await {
226237
error! { ?path, %error, "Could not STOP stream" };
227238
}
228239
});
@@ -231,11 +242,10 @@ impl<S> Lighthouse<S>
231242
}
232243

233244
/// Sends a request to the given path with the given payload.
234-
async fn send_request<P>(&self, verb: &Verb, path: &[&str], payload: P) -> Result<i32>
245+
async fn send_request<P>(&self, request_id: i32, verb: &Verb, path: &[&str], payload: P) -> Result<i32>
235246
where
236247
P: Serialize {
237248
let path = path.into_iter().map(|s| s.to_string()).collect();
238-
let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);
239249
debug! { %request_id, "Sending request" };
240250
self.send_message(&ClientMessage {
241251
request_id,
@@ -309,6 +319,11 @@ impl<S> Lighthouse<S>
309319
Ok(self.ws_sink.lock().await.send(Message::Binary(bytes.into())).await?)
310320
}
311321

322+
/// Fetches the next request id.
323+
fn next_request_id(&self) -> i32 {
324+
self.request_id.fetch_add(1, Ordering::Relaxed)
325+
}
326+
312327
/// Fetches the credentials used to authenticate with the lighthouse.
313328
pub fn authentication(&self) -> &Authentication {
314329
&self.authentication

0 commit comments

Comments
 (0)