-
|
Hello again, I was using an older version of the crate and upon updating one of the features I implemented got booted, so I wanted some feedback on how to implement it now and if it could be needed by someone else. I am running processes over SSH, and need to collect stdout. However, I am also interested into the errors that could happend and wished to process that stream as well. All the convenience methods that transform a Before the update, I defined two With the changes made since, the modules making up the ChannelStream seem to be private and cannot be fooled around easily. I then created pub struct Wrap(Receiver<Vec<u8>>, BytesMut);
impl Wrap {
fn new(rx: Receiver<Vec<u8>>) -> Self {
Self {
0: rx,
1: BytesMut::new(),
}
}
}
/// Create streams for a channel's stdout and stderr, consuming the channel in the process
pub fn into_streams<S>(mut chan: Channel<S>) -> (Wrap, Wrap)
where
S: From<(ChannelId, ChannelMsg)> + std::marker::Send + 'static + Sync,
{
let (txo, rxo) = mpsc::channel::<Vec<u8>>(1000);
let (txe, rxe) = mpsc::channel::<Vec<u8>>(1000);
tokio::spawn(async move {
loop {
match chan.wait().await {
Some(ChannelMsg::Data { data }) => {
txo.send(data[..].into())
.await
.map_err(|_| SshError::SendError)?;
}
Some(ChannelMsg::ExtendedData { data, ext }) if ext == 1 => {
txe.send(data[..].into())
.await
.map_err(|_| SshError::SendError)?;
}
Some(ChannelMsg::ExtendedData { data: _, ext }) => {
log::debug!("Received surprise data on stream {ext}");
}
Some(ChannelMsg::Eof) => {
// Send a 0-length chunk to indicate EOF.
txo.send(vec![]).await.map_err(|_| SshError::SendError)?;
break;
}
None => break,
_ => (),
}
}
chan.close().await?;
Ok::<_, SshError>(())
});
(Wrap::new(rxo), Wrap::new(rxe))
}
impl AsyncRead for Wrap {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let cache_size = self.1.len();
buf.put_slice(&self.1.split_to(usize::min(buf.remaining(), cache_size)));
if buf.remaining() > 0 {
match self.0.poll_recv(cx) {
Poll::Ready(Some(msg)) => {
self.1 = BytesMut::from(&msg[..]);
let len = self.1.len();
buf.put_slice(&self.1.split_to(usize::min(buf.remaining(), len)));
Poll::Ready(Ok(()))
}
Poll::Ready(None) => Poll::Ready(Ok(())),
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(Ok(()))
}
}
}Please advise if you think there is a better |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
|
You're correct, right now there's no built-in way to obtain two |
Beta Was this translation helpful? Give feedback.
You're correct, right now there's no built-in way to obtain two
AsyncReadfor stdout and stderr/extended data as they would have to simultaneously read from the same internal mpscReceiver. Your solution is conceptually perfectly fine (without reviewing the AsyncRead impl in detail) and is how I'd implement it myself