Skip to content

Commit 0d2f489

Browse files
vmagrometa-codesync[bot]
authored andcommitted
[antlir2][sendstream_parser] convert to more idiomatic stream interface
Summary: The old callback style was pretty much required prior to this diff stack, but it's not very idiomatic in Rust. This diff converts it to a `futures::Stream` which is more standard and less confusing Test Plan: ``` ❯ buck2 test fbcode//antlir/antlir2/sendstream_parser/... Buck UI: https://www.internalfb.com/buck2/31c8c888-6847-4828-9261-32f3b78b7845 Tests finished: Pass 4. Fail 0. Fatal 0. Skip 0. Build failure 0 ``` https://www.internalfb.com/intern/testinfra/testrun/4785074928638786 Differential Revision: D86802448 fbshipit-source-id: 6d4240286d3b85ef473bcd8a5ef4d592b820cc2a
1 parent 5b9930c commit 0d2f489

File tree

2 files changed

+86
-67
lines changed

2 files changed

+86
-67
lines changed

antlir/antlir2/sendstream_parser/src/lib.rs

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,8 @@ mod tests {
719719
use std::fmt::Write;
720720
use std::io::Cursor;
721721

722+
use futures::StreamExt;
723+
use futures::future;
722724
use similar_asserts::SimpleDiff;
723725

724726
use super::*;
@@ -746,12 +748,13 @@ mod tests {
746748
let cursor = Cursor::new(data);
747749
let mut parsed_txt = String::new();
748750
let mut sendstream_index = 0;
749-
let num_cmds_parsed = wire::parse(cursor, |cmd| {
750-
serialize_cmd(&mut sendstream_index, &mut parsed_txt, cmd);
751-
wire::ParserControl::KeepGoing
752-
})
753-
.await
754-
.expect("while parsing");
751+
let mut num_cmds_parsed = 0;
752+
let mut parser = wire::parse(cursor);
753+
while let Some(cmd_res) = parser.next().await {
754+
let cmd = cmd_res.expect("failed to parse");
755+
serialize_cmd(&mut sendstream_index, &mut parsed_txt, &cmd);
756+
num_cmds_parsed += 1;
757+
}
755758
if let Some(dst) = std::env::var_os("UPDATE_DEMO_TXT") {
756759
std::fs::write(dst, parsed_txt).expect("while writing to {dst}");
757760
} else {
@@ -766,32 +769,6 @@ mod tests {
766769
assert_eq!(num_cmds_parsed, 106);
767770
}
768771

769-
/// Demonstrate how we might eagerly abort parsing after collecting information embedded in an
770-
/// early command.
771-
#[tokio::test]
772-
async fn partial_parse() {
773-
let data = include_bytes!("../testdata/demo.sendstream");
774-
let cursor = Cursor::new(data);
775-
let mut uuid: Option<Uuid> = None;
776-
let num_cmds_parsed = wire::parse(cursor, |cmd| {
777-
if let Command::Subvol(sv) = cmd {
778-
uuid = Some(sv.uuid());
779-
return wire::ParserControl::Enough;
780-
}
781-
wire::ParserControl::KeepGoing
782-
})
783-
.await
784-
.expect("while parsing");
785-
assert_eq!(
786-
uuid,
787-
Some(
788-
Uuid::parse_str("0fbf2b5f-ff82-a748-8b41-e35aec190b49")
789-
.expect("while parsing uuid")
790-
)
791-
);
792-
assert_eq!(num_cmds_parsed, 1);
793-
}
794-
795772
#[tokio::test]
796773
async fn sendstream_covers_all_commands() {
797774
let all_cmds: BTreeSet<_> = wire::cmd::PARSED_SUBTYPES
@@ -803,13 +780,15 @@ mod tests {
803780
.collect();
804781
let data = include_bytes!("../testdata/demo.sendstream");
805782
let cursor = Cursor::new(data);
806-
let mut seen_cmds: BTreeSet<wire::cmd::CommandType> = BTreeSet::new();
807-
wire::parse(cursor, |cmd| {
808-
seen_cmds.insert(cmd.command_type());
809-
wire::ParserControl::KeepGoing
810-
})
811-
.await
812-
.expect("while parsing");
783+
let seen_cmds: BTreeSet<_> = wire::parse(cursor)
784+
.filter_map(|i| {
785+
future::ready(match i {
786+
Ok(cmd) => Some(cmd.command_type()),
787+
Err(e) => panic!("failed to parse: {e}"),
788+
})
789+
})
790+
.collect()
791+
.await;
813792
assert_eq!(seen_cmds, all_cmds);
814793
}
815794
}

antlir/antlir2/sendstream_parser/src/wire/mod.rs

Lines changed: 68 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
* LICENSE file in the root directory of this source tree.
66
*/
77

8+
use futures::Stream;
89
use futures::StreamExt;
10+
use futures::future;
911
use tokio::io::AsyncRead;
1012
use tokio_util::codec::FramedRead;
1113

@@ -15,45 +17,83 @@ mod nombytes;
1517
mod tlv;
1618
pub use nombytes::NomBytes;
1719

18-
#[derive(Debug)]
19-
pub enum ParserControl {
20-
KeepGoing,
21-
Enough,
22-
}
23-
2420
/// Parse an async source of bytes, expecting to find it to contain one or more sendstreams.
2521
/// Because the parsed commands reference data owned by the source, we do not collect the commands.
2622
/// Instead, we allow the caller to process them via `f`, which can instruct the processing to
2723
/// continue or shut down gracefully via the returned `ParserControl`.
2824
///
2925
/// Each sendstream is expected to (1) start with a header, followed by (2) either a Subvol or
3026
/// Snapshot command, followed by (3) 0 or more additional commands, terminated by (4) an End
31-
/// command. Note that we don't validate #2 here, but we do expect #1 and #4.
32-
///
33-
/// Returns number of commands parsed.
27+
/// command. Note that only (1) is actually enforced, afterward every command
28+
/// will be emitted into the stream as long as it could be read and parsed.
3429
///
3530
/// See https://btrfs.readthedocs.io/en/latest/dev/dev-send-stream.html for reference.
36-
pub async fn parse<R, F>(reader: R, mut f: F) -> crate::Result<u128>
31+
pub fn parse<R>(reader: R) -> impl Stream<Item = crate::Result<crate::Command>>
3732
where
38-
R: AsyncRead + Unpin + Send,
39-
F: FnMut(&crate::Command) -> ParserControl + Send,
33+
R: AsyncRead,
4034
{
41-
let mut reader = FramedRead::new(reader, framed::SendstreamDecoder::new());
42-
let mut command_count = 0;
43-
while let Some(item_res) = reader.next().await {
44-
match item_res {
45-
Ok(framed::Item::Command(command)) => {
46-
command_count += 1;
47-
if let ParserControl::Enough = f(&command) {
48-
// caller got what they needed, no need to continue parsing
49-
break;
35+
let reader = FramedRead::new(reader, framed::SendstreamDecoder::new());
36+
reader.filter_map(|item_res| {
37+
future::ready(match item_res {
38+
Ok(framed::Item::Command(command)) => Some(Ok(command)),
39+
Ok(framed::Item::SendstreamStart(_)) => None,
40+
Err(e) => Some(Err(e)),
41+
})
42+
})
43+
}
44+
45+
#[cfg(test)]
46+
#[allow(clippy::expect_used)]
47+
mod tests {
48+
use std::io::Cursor;
49+
use std::time::Duration;
50+
51+
use futures::StreamExt;
52+
use tokio::io::AsyncWriteExt;
53+
use tokio::time::sleep;
54+
55+
use super::*;
56+
57+
/// Historically, we couldn't stream commands as they were parsed very well,
58+
/// so the early exit was implemented with a not-very-Rusty callback
59+
/// interface. This test proves that the parser stops operating as soon as
60+
/// the caller stops asking for commands (via a reader that will only
61+
/// provide enough bytes for a few commands before then stalling forever)
62+
#[tokio::test]
63+
async fn early_exit() {
64+
let make_parser = |truncate: Option<usize>| async move {
65+
let src = include_bytes!("../../testdata/demo.sendstream");
66+
// only use simplex stream if we're going to truncate it
67+
// prematurely, otherwise use a Cursor on top of the static byte
68+
// slice so that the framed codec actually sees the EOF
69+
let reader: Box<dyn AsyncRead + Unpin> = match truncate {
70+
Some(size) => {
71+
let (receiver, mut sender) = tokio::io::simplex(size);
72+
sender
73+
.write_all(&src[..size])
74+
.await
75+
.expect("failed to write input data");
76+
Box::new(receiver)
5077
}
51-
}
52-
Ok(framed::Item::SendstreamStart(_)) => {}
53-
Err(e) => {
54-
return Err(e);
55-
}
56-
}
78+
None => Box::new(Cursor::new(src)),
79+
};
80+
parse(reader)
81+
};
82+
83+
// empirically determined that 46 commands fit in the first 4k
84+
// first, prove that we can read the expected number of commands, before
85+
// dropping the stream which while then stop consuming its upstream input
86+
let parser = make_parser(Some(4 * 1024)).await;
87+
let count = parser
88+
.take_until(sleep(Duration::from_millis(100)))
89+
.count()
90+
.await;
91+
assert_eq!(count, 46);
92+
93+
// the entire stream should be consumable (but obviously only when given
94+
// the entire input)
95+
let parser = make_parser(None).await;
96+
let count = parser.count().await;
97+
assert_eq!(count, 106);
5798
}
58-
Ok(command_count)
5999
}

0 commit comments

Comments
 (0)