Skip to content

Commit 3784265

Browse files
committed
Stop streams on drop
1 parent 31a615d commit 3784265

File tree

1 file changed

+18
-3
lines changed

1 file changed

+18
-3
lines changed

lighthouse-client/src/lighthouse.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@ impl<S> Lighthouse<S>
186186
self.perform(&Verb::Unlink, dest_path, src_path).await
187187
}
188188

189-
/// Stops the given stream.
189+
/// Stops the given stream. **Should generally not be called manually**,
190+
/// since streams will automatically be stopped once dropped.
190191
pub async fn stop(&self, path: &[&str]) -> Result<ServerMessage<()>> {
191192
self.perform(&Verb::Stop, path, ()).await
192193
}
@@ -204,15 +205,29 @@ impl<S> Lighthouse<S>
204205
}
205206

206207
/// Performs a STREAM request to the given path with the given payload.
208+
/// Automatically sends a STOP once dropped.
207209
#[tracing::instrument(skip(self, payload))]
208210
pub async fn stream<P, R>(&self, path: &[&str], payload: P) -> Result<impl Stream<Item = Result<ServerMessage<R>>>>
209211
where
210212
P: Serialize,
211213
R: for<'de> Deserialize<'de> {
212214
let request_id = self.send_request(&Verb::Stream, path, payload).await?;
213215
let stream = self.receive_streaming(request_id).await?;
214-
// TODO: Send STOP once dropped
215-
Ok(stream)
216+
Ok(stream.guard({
217+
// Stop the stream on drop
218+
let this = (*self).clone();
219+
let path: Vec<_> = path.into_iter().map(|s| s.to_string()).collect();
220+
move || {
221+
tokio::spawn(async move {
222+
// TODO: Find a more elegant way to pass the path, ideally without
223+
// converting back and forth between Vec<String>, Vec<&str> and &[&str]
224+
let path: Vec<_> = path.iter().map(|s| &**s).collect();
225+
if let Err(error) = this.stop(&path).await {
226+
error! { ?path, %error, "Could not STOP stream" };
227+
}
228+
});
229+
}
230+
}))
216231
}
217232

218233
/// Sends a request to the given path with the given payload.

0 commit comments

Comments
 (0)